Context Object¶
The context is a mutable dictionary that flows through every step of a workflow.
Overview¶
When a workflow is triggered, the request body becomes the initial context. Each step can read from and write to this context, passing data to subsequent steps.
graph LR
A[HTTP Request] --> B[Initial Context]
B --> C[Step 1]
C --> D[Step 2]
D --> E[Step 3]
E --> F[Final Context]
F --> G[HTTP Response]
Initial Context¶
The request body is merged with engine-injected values:
# From HTTP POST body
{
"email": "jane@example.com",
"name": "Jane Doe",
"company": "Acme Inc"
}
# After engine injection
{
"email": "jane@example.com",
"name": "Jane Doe",
"company": "Acme Inc",
"_session": AsyncSession, # Database session
}
Modifying Context¶
Nodes receive and return the context:
@node("enrich")
async def enrich(ctx: dict[str, Any]) -> dict[str, Any]:
# Read from context
email = ctx["email"]
# Write to context
ctx["domain"] = email.split("@")[1]
ctx["status"] = "enriched"
return ctx
Reserved Keys¶
Keys starting with underscore (_) are reserved for internal use and are stripped from HTTP responses:
| Key | Set by | Description |
|---|---|---|
_session |
Engine | AsyncSession for the current request — passed to repository calls |
_db |
Engine | Unit-of-work repository accessor: ctx["_db"]["ModelName"].add(...) |
_user_id |
Auth middleware | Authenticated user ID from the Biscuit token |
_last_error |
Engine | String description of the last step error |
_last_error_type |
Engine | Error category for the last agent failure: "error", "timeout", or "parse_error" |
_api_status_code |
api_call step |
HTTP status code of the last outbound HTTP call |
_response |
response step |
Shaped payload that will be returned as the HTTP response body |
_context_model_versions |
Engine | {ModelName: schema_version} dict tracking which model version each model-aware step should use; populated from the workflow context.models[].version declarations |
Private Keys
Never store user data in keys starting with _. They are excluded from API responses and may be overwritten by the engine.
Context in Agent Steps¶
Agent prompts access context via Jinja2 templates:
- id: "classify"
kind: "agent"
agent:
prompt: |
Customer: {{ name }}
Company: {{ company }}
Email domain: {{ domain }}
Classify this lead's potential.
output:
map:
potential: lead_potential # Write back to context
Context Flow Example¶
# Workflow: lead_processing
steps:
- id: "save"
runner: "save_lead"
# Input: {email, name, company}
# Output: {email, name, company, id, created_at}
- id: "classify"
kind: "agent"
# Input: {email, name, company, id, created_at, _session}
# Output: {email, name, company, id, created_at, lead_score, _session}
- id: "route"
kind: "router"
# Input: {email, name, company, id, created_at, lead_score, _session}
# Returns signal based on lead_score
Accessing Context in Code¶
In Functional Nodes¶
@node("process")
async def process(ctx: dict[str, Any]) -> dict[str, Any]:
# Required field
email = ctx["email"]
# Optional field with default
priority = ctx.get("priority", "normal")
# Check existence
if "phone" in ctx:
send_sms(ctx["phone"])
return ctx
In Router Conditions¶
- id: "check_priority"
kind: "router"
router:
conditions:
- if: "priority == 'urgent'"
signal: "escalate"
- if: "lead_score > 80"
signal: "high_value"
- else: true
signal: "normal"
Context Scoping¶
Request Scope¶
Each workflow invocation has its own context:
Contexts are isolated — changes in one request don't affect others.
Step Scope¶
Context persists across steps within a single workflow:
# Step 1
ctx["computed_value"] = expensive_calculation()
# Step 2 - can access Step 1's output
result = ctx["computed_value"] # Available
Best Practices¶
1. Use Meaningful Keys¶
# Good
ctx["customer_id"] = customer.id
ctx["order_total"] = calculate_total(items)
ctx["shipping_address"] = address
# Avoid
ctx["x"] = customer.id
ctx["val"] = calculate_total(items)
2. Prefix by Source¶
Group related keys with prefixes:
# AI-generated values
ctx["ai_classification"] = response["category"]
ctx["ai_confidence"] = response["confidence"]
# External API data
ctx["api_weather_temp"] = weather["temperature"]
ctx["api_weather_conditions"] = weather["conditions"]
3. Don't Overwrite Input¶
Preserve original values:
# Good - new key
ctx["normalized_email"] = ctx["email"].lower()
# Risky - overwrites input
ctx["email"] = ctx["email"].lower()
4. Handle Missing Keys¶
# Good - with fallback
name = ctx.get("name", "Unknown")
# Good - explicit check
if "phone" not in ctx:
return ctx, "missing_phone"
# Risky - might raise KeyError
name = ctx["name"]
5. Clean Up Temporary Data¶
Remove intermediate values before returning:
@node("finalize")
async def finalize(ctx: dict[str, Any]) -> dict[str, Any]:
# Remove temporary keys
ctx.pop("_temp_calculation", None)
ctx.pop("_intermediate_result", None)
return ctx
Response Filtering¶
The workflow manager automatically filters the response:
# Context at end of workflow
{
"email": "jane@example.com",
"name": "Jane Doe",
"id": "uuid...",
"_session": AsyncSession, # ← Removed
"_last_error": None, # ← Removed
}
# API response
{
"success": true,
"data": {
"email": "jane@example.com",
"name": "Jane Doe",
"id": "uuid..."
}
}
Debugging Context¶
Log context at each step:
import logging
logger = logging.getLogger(__name__)
@node("debug_step")
async def debug_step(ctx: dict[str, Any]) -> dict[str, Any]:
# Log non-private keys
public_ctx = {k: v for k, v in ctx.items() if not k.startswith("_")}
logger.debug(f"Context: {public_ctx}")
return ctx
Type Hints¶
For better IDE support, use TypedDict (optional):
from typing import TypedDict, Any
class LeadContext(TypedDict, total=False):
email: str
name: str
company: str
id: str
lead_score: int
_session: Any
@node("process_lead")
async def process_lead(ctx: LeadContext) -> LeadContext:
# IDE knows ctx has email, name, etc.
ctx["lead_score"] = calculate_score(ctx["email"])
return ctx
Next Steps¶
- Nodes — Working with context in nodes
- Workflows — Context flow between steps
- Repositories — Persisting context data