Enterprise teams running LLM features in production face a concrete problem: their billing dashboards show a total monthly number, but no one can answer which feature, team, or user drove the spike. Estimated token counts from request-side heuristics drift from reality. What you need is actual token usage pulled from each provider response, tagged with business context, and aggregated somewhere queryable.
Why this matters
Langfuse v4 Python SDK (v4.2.0) ships on top of the OpenTelemetry tracing model, giving every generation span a structured place to record usage_details and cost_details pulled directly from the provider response object [2]. The server side (v3.174.0) adds configurable field groups for blob export, meaning you can pull raw generation data out via the REST API without clicking through the UI [1]. Together, these two releases make it practical to build a cost-attribution pipeline that is accurate (actual tokens, not estimates), granular (per generation, not per day), and portable (CSV export you can load into any BI tool).
Without this wiring, teams typically discover cost overruns a week after the fact, with no way to attribute them below the project level. A single prompt regression in a summarization feature can double spend before anyone notices.
Prerequisites
- Python 3.11 or 3.12
- A Langfuse account: cloud free tier at
cloud.langfuse.com, or self-hosted via Docker (see Langfuse self-hosting docs) - A Langfuse project with a public key and secret key (Settings > API Keys)
- An OpenAI API key, an Anthropic API key, or both
- Basic familiarity with the OpenAI or Anthropic Python SDKs
Setup
Install the Langfuse v4 SDK and the provider SDKs you plan to use:
uv pip install langfuse>=4.2.0 openai anthropic
Export your credentials. The Langfuse SDK reads these automatically:
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_SECRET_KEY="sk-lf-..."
export LANGFUSE_HOST="https://cloud.langfuse.com"
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
Step 1: Understand the Langfuse v4 tracing model
Langfuse v4 uses OpenTelemetry spans under the hood. The SDK exposes three main primitives:
langfuse.trace(...)- a root span grouping all work for one logical requesttrace.generation(...)- a child span representing one LLM call, with first-class fields formodel,usage, andcostgeneration.end(...)- closes the span and records the actual output plus usage numbers
The key insight is that usage accepts a dict with input, output, and total token counts taken directly from the provider response. Langfuse then applies its model price table to compute cost, or you can pass cost explicitly if you have custom pricing.
Step 2: Build the cost-aware generation wrapper
This module wraps both OpenAI and Anthropic calls. It extracts actual token usage from each provider’s response object and records it on the Langfuse generation span.
# filename: llm_tracker.py
import os
from typing import Optional
from langfuse import Langfuse
# Initialise once; the SDK reads LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY,
# and LANGFUSE_HOST from the environment.
langfuse = Langfuse()
def chat_openai(
messages: list[dict],
model: str = "gpt-4o-mini",
trace_id: Optional[str] = None,
feature: str = "unknown",
team: str = "unknown",
user_id: Optional[str] = None,
) -> str:
"""Call OpenAI and record actual token usage on a Langfuse generation span."""
from openai import OpenAI
client = OpenAI()
# Retrieve or create the parent trace.
trace = langfuse.trace(
id=trace_id,
name=f"{feature}-openai",
user_id=user_id,
metadata={"feature": feature, "team": team},
tags=[feature, team],
)
generation = trace.generation(
name="chat-completion",
model=model,
input=messages,
metadata={"feature": feature, "team": team},
)
response = client.chat.completions.create(
model=model,
messages=messages,
)
usage = response.usage # actual counts from the provider
generation.end(
output=response.choices[0].message.content,
usage={
"input": usage.prompt_tokens,
"output": usage.completion_tokens,
"total": usage.total_tokens,
"unit": "TOKENS",
},
model=response.model, # record the resolved model name (e.g. gpt-4o-mini-2024-07-18)
)
return response.choices[0].message.content
def chat_anthropic(
messages: list[dict],
system: str = "",
model: str = "claude-3-haiku-20240307",
trace_id: Optional[str] = None,
feature: str = "unknown",
team: str = "unknown",
user_id: Optional[str] = None,
) -> str:
"""Call Anthropic and record actual token usage on a Langfuse generation span."""
import anthropic
client = anthropic.Anthropic()
trace = langfuse.trace(
id=trace_id,
name=f"{feature}-anthropic",
user_id=user_id,
metadata={"feature": feature, "team": team},
tags=[feature, team],
)
generation = trace.generation(
name="messages",
model=model,
input=messages,
metadata={"feature": feature, "team": team, "system": system},
)
kwargs = {"model": model, "max_tokens": 1024, "messages": messages}
if system:
kwargs["system"] = system
response = client.messages.create(**kwargs)
usage = response.usage # actual counts from the provider
generation.end(
output=response.content[0].text,
usage={
"input": usage.input_tokens,
"output": usage.output_tokens,
"total": usage.input_tokens + usage.output_tokens,
"unit": "TOKENS",
},
model=response.model,
)
return response.content[0].text
def flush():
"""Flush all pending spans to Langfuse before the process exits."""
langfuse.flush()
Step 3: Simulate multi-feature, multi-team traffic
This script fires several LLM calls tagged with different features and teams, simulating the kind of mixed workload a real application produces. Run it after setting your API keys.
# filename: simulate_traffic.py
import uuid
from llm_tracker import chat_openai, chat_anthropic, flush
CALLS = [
{
"provider": "openai",
"feature": "summarization",
"team": "content",
"user_id": "user-001",
"messages": [
{"role": "user", "content": "Summarise the history of the Roman Empire in three sentences."}
],
},
{
"provider": "openai",
"feature": "search-assist",
"team": "search",
"user_id": "user-002",
"messages": [
{"role": "user", "content": "What are the top five Python web frameworks in 2024?"}
],
},
{
"provider": "openai",
"feature": "summarization",
"team": "content",
"user_id": "user-003",
"messages": [
{"role": "user", "content": "Summarise the causes of World War I in two sentences."}
],
},
{
"provider": "anthropic",
"feature": "code-review",
"team": "engineering",
"user_id": "user-004",
"messages": [
{"role": "user", "content": "Review this Python function: def add(a, b): return a + b"}
],
"system": "You are a senior software engineer. Be concise.",
},
{
"provider": "anthropic",
"feature": "search-assist",
"team": "search",
"user_id": "user-005",
"messages": [
{"role": "user", "content": "Explain vector databases in one paragraph."}
],
},
]
def run():
for call in CALLS:
trace_id = str(uuid.uuid4())
provider = call["provider"]
print(f"[{provider}] feature={call['feature']} team={call['team']} trace={trace_id}")
if provider == "openai":
result = chat_openai(
messages=call["messages"],
trace_id=trace_id,
feature=call["feature"],
team=call["team"],
user_id=call["user_id"],
)
else:
result = chat_anthropic(
messages=call["messages"],
system=call.get("system", ""),
trace_id=trace_id,
feature=call["feature"],
team=call["team"],
user_id=call["user_id"],
)
print(f" -> {result[:80]}...")
flush()
print("All spans flushed to Langfuse.")
if __name__ == "__main__":
run()
Step 4: Export per-feature spend to CSV via the Langfuse REST API
Once the spans are in Langfuse, you can pull them back out via the /api/public/generations endpoint and aggregate by feature and team. The Langfuse server returns usage and calculatedTotalCost on each generation object [1].
# filename: export_costs.py
import os
import csv
import json
from collections import defaultdict
from base64 import b64encode
from urllib.request import urlopen, Request
from urllib.parse import urlencode
def fetch_generations(host: str, public_key: str, secret_key: str, limit: int = 100) -> list:
"""Fetch recent generations from the Langfuse REST API."""
token = b64encode(f"{public_key}:{secret_key}".encode()).decode()
params = urlencode({"limit": limit})
url = f"{host}/api/public/generations?{params}"
req = Request(url, headers={"Authorization": f"Basic {token}"})
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read())["data"]
def aggregate_by_feature_team(generations: list) -> dict:
"""Sum token counts and costs grouped by (feature, team) from generation metadata."""
totals = defaultdict(lambda: {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0, "cost_usd": 0.0, "calls": 0})
for gen in generations:
meta = gen.get("metadata") or {}
feature = meta.get("feature", "unknown")
team = meta.get("team", "unknown")
key = (feature, team)
usage = gen.get("usage") or {}
totals[key]["input_tokens"] += usage.get("input", 0) or 0
totals[key]["output_tokens"] += usage.get("output", 0) or 0
totals[key]["total_tokens"] += usage.get("total", 0) or 0
cost = gen.get("calculatedTotalCost")
if cost is not None:
totals[key]["cost_usd"] += float(cost)
totals[key]["calls"] += 1
return totals
def write_csv(totals: dict, path: str = "/workspace/spend_breakdown.csv"):
rows = [
{
"feature": feature,
"team": team,
"calls": v["calls"],
"input_tokens": v["input_tokens"],
"output_tokens": v["output_tokens"],
"total_tokens": v["total_tokens"],
"cost_usd": round(v["cost_usd"], 6),
}
for (feature, team), v in sorted(totals.items())
]
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["feature", "team", "calls", "input_tokens", "output_tokens", "total_tokens", "cost_usd"])
writer.writeheader()
writer.writerows(rows)
print(f"Wrote {len(rows)} rows to {path}")
return rows
def run():
host = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
public_key = os.environ["LANGFUSE_PUBLIC_KEY"]
secret_key = os.environ["LANGFUSE_SECRET_KEY"]
print("Fetching generations from Langfuse...")
generations = fetch_generations(host, public_key, secret_key)
print(f"Fetched {len(generations)} generations.")
totals = aggregate_by_feature_team(generations)
rows = write_csv(totals)
print("\nSpend breakdown:")
print(f"{'Feature':<20} {'Team':<15} {'Calls':>6} {'Total tokens':>14} {'Cost USD':>12}")
print("-" * 72)
for row in rows:
print(f"{row['feature']:<20} {row['team']:<15} {row['calls']:>6} {row['total_tokens']:>14} {row['cost_usd']:>12.6f}")
if __name__ == "__main__":
run()
Step 5: Test the aggregation logic without API keys
Before running against live Langfuse, verify the aggregation and CSV logic with synthetic data:
import sys
sys.path.insert(0, "/workspace")
from export_costs import aggregate_by_feature_team, write_csv
fake_generations = [
{"metadata": {"feature": "summarization", "team": "content"}, "usage": {"input": 120, "output": 80, "total": 200}, "calculatedTotalCost": 0.000030},
{"metadata": {"feature": "summarization", "team": "content"}, "usage": {"input": 95, "output": 60, "total": 155}, "calculatedTotalCost": 0.000023},
{"metadata": {"feature": "search-assist", "team": "search"}, "usage": {"input": 200, "output": 150, "total": 350}, "calculatedTotalCost": 0.000053},
{"metadata": {"feature": "code-review", "team": "engineering"}, "usage": {"input": 50, "output": 120, "total": 170}, "calculatedTotalCost": 0.000025},
{"metadata": None, "usage": {"input": 10, "output": 5, "total": 15}, "calculatedTotalCost": None},
]
totals = aggregate_by_feature_team(fake_generations)
rows = write_csv(totals, "/workspace/test_breakdown.csv")
assert len(rows) == 3, f"Expected 3 feature/team groups, got {len(rows)}"
summarization_row = next(r for r in rows if r["feature"] == "summarization")
assert summarization_row["calls"] == 2
assert summarization_row["total_tokens"] == 355
assert abs(summarization_row["cost_usd"] - 0.000053) < 1e-9
print("All assertions passed.")
print(f"Rows written: {len(rows)}")
for r in rows:
print(r)
Verify it works
Run the full pipeline against real APIs:
import subprocess, sys
# Verify the test CSV was written correctly
with open("/workspace/test_breakdown.csv") as f:
content = f.read()
print("CSV header present:", content.startswith("feature,team"))
print("Summarization row present:", "summarization" in content)
print("Engineering row present:", "engineering" in content)
print("CSV content:")
print(content)
print("verify_marker_ok")
To run the live pipeline (requires API keys):
# Run traffic simulation (requires OPENAI_API_KEY / ANTHROPIC_API_KEY)
python /workspace/simulate_traffic.py
# Wait a few seconds for Langfuse to ingest, then export
python /workspace/export_costs.py
Troubleshooting
langfuse.Langfuse() raises AuthenticationError or 401. Check that LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set correctly. The public key starts with pk-lf- and the secret key with sk-lf-. If you are self-hosting, also verify LANGFUSE_HOST points to your instance (no trailing slash).
calculatedTotalCost is None for all generations. Langfuse computes cost from its internal model price table. If you are using a model name that Langfuse does not recognise (a fine-tuned model, a custom deployment name), cost will be null. Pass the canonical provider model name (e.g. gpt-4o-mini, not my-ft-model) in the model field of generation.end(...), or configure a custom model price in Langfuse Settings > Models.
Generations appear in Langfuse but metadata is empty. The metadata dict must be passed to both trace(...) and generation(...). If you only pass it to the trace, the generation-level export will not carry the feature/team keys. The export script reads gen["metadata"], not the parent trace metadata.
flush() hangs or the process exits before spans are sent. The Langfuse v4 SDK batches spans asynchronously. Always call langfuse.flush() before your process exits, especially in scripts and Lambda functions. In long-running servers, the SDK flushes automatically on a background thread.
fetch_generations returns an empty list after running simulate_traffic.py. Langfuse ingestion is near-real-time but not instantaneous. Wait 5-10 seconds after flushing before calling the export endpoint. You can also add a time.sleep(8) between the two scripts.
Token counts in Langfuse differ from your provider invoice. Providers bill on request-level aggregates and may round or apply minimum charges. The token counts recorded here are the exact values from response.usage, which match what the provider uses for billing. Discrepancies larger than a few percent usually indicate that some calls are not instrumented (e.g. streaming calls where usage is only available if stream_options={"include_usage": True} is set).
Next steps
- Add streaming support. For OpenAI streaming, pass
stream_options={"include_usage": True}and readchunk.usagefrom the final chunk. Record the sameusagedict ongeneration.end(). - Set cost alerts. Use the Langfuse REST API to poll
calculatedTotalCostaggregated by tag, and trigger a Slack webhook when a feature exceeds a daily budget threshold. - Attach session context. Pass
session_idtolangfuse.trace(...)to group all turns in a multi-turn conversation, then compare per-session cost distributions across features. - Export to a data warehouse. Replace the CSV writer with a call to
pandas.DataFrame(rows).to_gbq(...)or a SQLAlchemy insert to push the breakdown into BigQuery or Postgres for BI tooling.
Frequently Asked Questions
How does Langfuse v4 capture actual token usage instead of estimates?
The Langfuse v4 SDK accepts a usage dict on generation.end() populated directly from the provider response object (e.g., response.usage.prompt_tokens from OpenAI or response.usage.input_tokens from Anthropic), ensuring token counts match what the provider bills.
What metadata fields are required to aggregate costs by feature and team?
The metadata dict must be passed to both trace() and generation() calls with keys like feature and team. The export script reads gen["metadata"] to group generations, so omitting metadata at the generation level will result in empty feature/team values.
Why does calculatedTotalCost return null for some generations?
Langfuse computes cost using its internal model price table. If the model name is not recognized (e.g., a fine-tuned or custom deployment name), cost will be null. Use the canonical provider model name (e.g., gpt-4o-mini) or configure a custom model price in Langfuse Settings.
How long does it take for generations to appear in Langfuse after calling flush()?
Langfuse ingestion is near-real-time but not instantaneous. Wait 5-10 seconds after flushing before querying the /api/public/generations endpoint to ensure spans have been ingested.
How can this approach be extended to streaming LLM calls?
For OpenAI streaming, pass stream_options={"include_usage": True} and read chunk.usage from the final chunk, then record the same usage dict on generation.end().