Skip to main content

Trace → Dataset Materializer

Overview

This document covers the implementation of stages 1-2 of the Continuous Training Pipeline: Trace Collection → Reward Labeling. It loads production inference traces stored in Langfuse into S3 Parquet and scores each trace's quality on a 0-1 scale with Ragas + LLM Judge Fleet to construct GRPO/DPO training datasets.

Langfuse OTel → S3 Parquet

Langfuse collects inference traces via the OpenTelemetry protocol. Store them in S3 in Parquet format to enable large-scale batch analysis.

Langfuse Trace Schema

-- Langfuse traces table structure (PostgreSQL)
CREATE TABLE traces (
id UUID PRIMARY KEY,
timestamp TIMESTAMP,
user_id TEXT,
session_id TEXT,
input TEXT,
output TEXT,
model TEXT,
latency_ms INT,
token_count INT,
metadata JSONB,
user_consent BOOLEAN -- GDPR consent status
);

-- Example data
{
"id": "trace-12345",
"timestamp": "2026-04-18T03:15:00Z",
"user_id": "user-abc",
"input": "What's the difference between EKS Auto Mode and Karpenter?",
"output": "EKS Auto Mode is an AWS fully-managed node group...",
"model": "glm-5-32b",
"latency_ms": 850,
"token_count": 512,
"metadata": {
"domain": "eks-documentation",
"feedback_score": 4.5
},
"user_consent": true
}

S3 Partitioning Strategy

s3://training-data-lake/
└── langfuse-traces/
├── date=2026-04-18/
│ ├── model=glm-5-32b/
│ │ ├── consent=true/
│ │ │ └── traces-000001.parquet (10k rows)
│ │ └── consent=false/
│ │ └── traces-000002.parquet
│ └── model=qwen3-coder/
│ └── consent=true/
│ └── traces-000003.parquet
└── date=2026-04-19/
└── ...

Partitioning Reasons:

  • Date: Optimize time-range queries (e.g., last 7 days of data)
  • Model: Per-model performance tracking, A/B test separation
  • Consent: GDPR/CCPA compliance, exclude non-consented data from training

Apache Iceberg vs Hudi

FeatureApache IcebergApache Hudi
Snapshot IsolationFull ACID transactionsTimeline-based consistency
Schema EvolutionAutomatic column add/deleteManual migration required
Query PerformancePartition pruning optimizationCOW/MOR mode selection
AWS IntegrationNative Glue CatalogEMR optimized
Recommended UseLarge-scale analytical queriesReal-time upsert focus
Iceberg Recommended

Continuous Training is a read-heavy workload (batch learning), so Iceberg is recommended. Schema changes (adding new metadata fields) are frequent, making automatic Schema Evolution advantageous.

Airflow DAG Example

# dags/langfuse_to_s3.py
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

def export_langfuse_traces(**context):
"""Langfuse Postgres → S3 Parquet conversion"""

# Connect to Langfuse DB
pg_hook = PostgresHook(postgres_conn_id='langfuse_db')

# Extract yesterday's data (user_consent=true only)
yesterday = context['ds']
query = f"""
SELECT
id, timestamp, user_id, session_id,
input, output, model, latency_ms, token_count,
metadata
FROM traces
WHERE DATE(timestamp) = '{yesterday}'
AND user_consent = true
AND output IS NOT NULL
ORDER BY timestamp
"""

df = pg_hook.get_pandas_df(query)

# Group by model and save Parquet
for model, group in df.groupby('model'):
table = pa.Table.from_pandas(group)

# S3 path: s3://bucket/date=2026-04-18/model=glm-5-32b/consent=true/
s3_key = f"langfuse-traces/date={yesterday}/model={model}/consent=true/traces-{context['ti'].xcom_pull()}.parquet"

# S3 upload
s3_hook = S3Hook(aws_conn_id='aws_default')
with s3_hook.get_conn().open(f"s3://training-data-lake/{s3_key}", 'wb') as f:
pq.write_table(table, f, compression='snappy')

return len(df)

with DAG(
dag_id='langfuse_to_s3_daily',
schedule_interval='0 6 * * *', # Daily at 6 AM
start_date=datetime(2026, 4, 1),
catchup=False,
default_args={
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
) as dag:

export_task = PythonOperator(
task_id='export_traces',
python_callable=export_langfuse_traces,
)

AWS Glue Catalog Registration

# glue_iceberg_table.py
import boto3

glue = boto3.client('glue')

# Define Iceberg table
glue.create_table(
DatabaseName='training_data',
TableInput={
'Name': 'langfuse_traces',
'StorageDescriptor': {
'Columns': [
{'Name': 'id', 'Type': 'string'},
{'Name': 'timestamp', 'Type': 'timestamp'},
{'Name': 'user_id', 'Type': 'string'},
{'Name': 'input', 'Type': 'string'},
{'Name': 'output', 'Type': 'string'},
{'Name': 'model', 'Type': 'string'},
{'Name': 'latency_ms', 'Type': 'int'},
{'Name': 'metadata', 'Type': 'struct<feedback_score:double,domain:string>'},
],
'Location': 's3://training-data-lake/langfuse-traces/',
'InputFormat': 'org.apache.iceberg.mr.hive.HiveIcebergInputFormat',
'OutputFormat': 'org.apache.iceberg.mr.hive.HiveIcebergOutputFormat',
'SerdeInfo': {
'SerializationLibrary': 'org.apache.iceberg.mr.hive.HiveIcebergSerDe'
}
},
'PartitionKeys': [
{'Name': 'date', 'Type': 'date'},
{'Name': 'model', 'Type': 'string'},
{'Name': 'consent', 'Type': 'boolean'},
],
'Parameters': {
'table_type': 'ICEBERG',
'format': 'parquet',
'write.parquet.compression-codec': 'snappy',
}
}
)

Reward Labeler Fleet

Reward Labeling Concept

Reward Labeling is the process of evaluating each trace's quality with a 0-1 score. This score is used as a preference signal in GRPO/DPO training.

High-score traces (0.8-1.0) → Preferred examples (higher weight in training)
Low-score traces (0.0-0.3) → Non-preferred examples (lower weight in training)

Evaluation Metrics Combination

Ragas Metrics

The Ragas evaluation framework objectively measures RAG system quality.

from ragas.metrics import faithfulness, answer_relevancy, context_precision

# Ragas batch evaluation
scores = {
'faithfulness': 0.92, # Is answer faithful to context?
'answer_relevancy': 0.88, # Is answer relevant to question?
'context_precision': 0.85 # Is retrieved context accurate?
}

# Calculate final Reward with weighted average
reward = (
0.5 * scores['faithfulness'] +
0.3 * scores['answer_relevancy'] +
0.2 * scores['context_precision']
)
# → reward = 0.896

LLM-as-a-Judge

Use a small model (Qwen3-4B) as a judge to evaluate answer quality.

# LLM Judge prompt
JUDGE_PROMPT = """
Evaluate the following question and answer.

**Question**: {question}
**Answer**: {answer}

**Evaluation Criteria**:
1. Accuracy: Are there technical errors?
2. Completeness: Does it cover all aspects of the question?
3. Clarity: Is it easy to understand?

Output a score between 0.0-1.0. Respond only in JSON format:
{{"score": 0.85, "reasoning": "..."}}
"""

# Evaluate with Qwen3-4B (vLLM batch inference)
judge_response = vllm_client.chat.completions.create(
model="qwen3-coder-4b",
messages=[{"role": "user", "content": JUDGE_PROMPT.format(question=q, answer=a)}],
temperature=0.1,
max_tokens=200,
)

judge_score = json.loads(judge_response.choices[0].message.content)['score']
# → judge_score = 0.85

Final Reward Aggregation

# Ragas + LLM Judge combination
final_reward = (
0.6 * ragas_reward + # Ragas weight 60%
0.4 * judge_score # Judge weight 40%
)

KServe InferenceService Deployment

Deploy Qwen3-4B Judge model with KServe to build a high-availability fleet.

# reward-labeler-inference.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: reward-labeler-qwen3
namespace: training-pipeline
spec:
predictor:
minReplicas: 3
maxReplicas: 10
containers:
- name: kserve-container
image: vllm/vllm-openai:v0.18.2
args:
- --model=Qwen/Qwen3-Coder-4B-Instruct
- --served-model-name=qwen3-judge
- --tensor-parallel-size=1
- --max-model-len=8192
- --gpu-memory-utilization=0.9
resources:
requests:
nvidia.com/gpu: 1
memory: 16Gi
limits:
nvidia.com/gpu: 1
memory: 24Gi
env:
- name: SERVED_MODEL_NAME
value: "qwen3-judge"
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: reward-labeler-scaler
namespace: training-pipeline
spec:
scaleTargetRef:
name: reward-labeler-qwen3
minReplicaCount: 3
maxReplicaCount: 10
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus:9090
metricName: vllm_requests_running
threshold: "10"
query: |
avg(vllm_requests_running{model="qwen3-judge"})

Autoscaling Strategy:

  • Minimum 3 replicas: Guarantee baseline throughput
  • Maximum 10 replicas: Handle spikes during batch evaluation
  • Trigger: Scale out when vLLM waiting requests > 10

Batch Evaluation Job

# batch_reward_labeling.py
import pandas as pd
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision
import openai
import json
from concurrent.futures import ThreadPoolExecutor

# Load last 7 days of traces from S3
df = pd.read_parquet(
's3://training-data-lake/langfuse-traces/',
filters=[
('date', '>=', '2026-04-11'),
('date', '<=', '2026-04-18'),
('model', '=', 'glm-5-32b'),
('consent', '=', True),
]
)

# Ragas evaluation
ragas_results = evaluate(
df,
metrics=[faithfulness, answer_relevancy, context_precision]
)

# LLM Judge evaluation (parallel processing)
def judge_single_trace(row):
response = openai.ChatCompletion.create(
model="qwen3-judge",
messages=[{
"role": "user",
"content": JUDGE_PROMPT.format(
question=row['input'],
answer=row['output']
)
}],
temperature=0.1,
max_tokens=200,
# KServe InferenceService endpoint
api_base="http://reward-labeler-qwen3.training-pipeline.svc.cluster.local:8000/v1"
)
return json.loads(response.choices[0].message.content)['score']

with ThreadPoolExecutor(max_workers=50) as executor:
judge_scores = list(executor.map(judge_single_trace, df.to_dict('records')))

# Calculate final Reward
df['ragas_reward'] = (
0.5 * ragas_results['faithfulness'] +
0.3 * ragas_results['answer_relevancy'] +
0.2 * ragas_results['context_precision']
)
df['judge_score'] = judge_scores
df['final_reward'] = 0.6 * df['ragas_reward'] + 0.4 * df['judge_score']

# Save labeled dataset to S3
df.to_parquet('s3://training-data-lake/labeled-dataset/2026-04-18.parquet')

Cost Example

ResourceSpecHourly CostDaily Cost (10 hours)
Qwen3-4B Judge Fleetg6.xlarge × 3$0.93$9.30
Ragas Evaluation (Bedrock Claude)-Per API call$5-10 (10K traces)
Airflow/KubernetesExisting infrastructure--
Total Cost--$15-20/day

Annual cost of $5,000-7,000, achieving 95% savings compared to manual labeling ($10K/month).

Next Steps

References

Official Documentation

Papers & Technical Blogs