Nodes¶
Nodes are Python functions that execute functional workflow steps. Every step in a workflow has a kind — this page covers all available step kinds, their UI colours, and how to write custom functional nodes.
Step Kind Reference¶
tuvl has 8 built-in step kinds. The table below maps each kind to its icon and colour in the workflow canvas UI (sourced from ui/src/components/StepNode.tsx), plus its purpose and the signals it can emit.
| Kind | UI colour | Icon | Purpose | Emits |
|---|---|---|---|---|
functional |
Blue | Run a custom Python @node() function |
Any string / tuple | |
agent |
Purple | Call an LLM via LiteLLM or an llms/ preset |
default · error · timeout · parse_error · custom from signal_from |
|
router |
Amber | Evaluate a condition on context, branch true/false | "true" · "false" · "error" |
|
api_call |
Teal | Make an outbound HTTP request | default · error |
|
mcp |
Pink | Call a tool on an MCP server (SSE or stdio) | default · error |
|
model-op |
Emerald | CRUD operation on a registered model (no Python needed) | default · error |
|
response |
Red | Shape the HTTP response body | default · error |
|
HumanInTheLoop |
Orange | Pause execution for a human reviewer | (suspends — never routes) |
functional¶
UI colour: Blue — border-blue-600 bg-blue-950
Run any custom Python async function registered with @node("name").
- id: "normalize"
kind: "functional"
runner: "normalize_email" # must exist in NODE_REGISTRY
routes:
default: "save"
error: "handle_error"
from tuvl.core.nodes.base import node
@node("normalize_email")
async def normalize_email(ctx: dict) -> dict:
ctx["email"] = ctx.get("email", "").lower().strip()
return ctx
Signals: return a str for a named signal, a dict for "default", or a tuple[dict, str] for both.
agent¶
UI colour: Purple — border-purple-600 bg-purple-950
Call an LLM. Supports any LiteLLM model string or a named preset from llms/<name>.yaml.
- id: "evaluate"
kind: "agent"
agent:
model: "default" # llms/default.yaml — or "gpt-4o-mini", "ollama/llama3" etc.
system: "You are an HR evaluator."
prompt: |
Evaluate {{ name }}'s application.
Experience: {{ experience_years }} years.
Return JSON: {"score": <0-100>, "recommendation": "<hire|reject|maybe>"}
output:
format: json
map:
score: score
recommendation: recommendation
signal_from: recommendation # route by LLM output
retry:
attempts: 3
on: [parse_error, timeout]
timeout: 30
routes:
hire: "send_offer"
reject: "send_rejection"
maybe: "manual_review"
error: "handle_error"
Signals: default, error, timeout, parse_error, or any value from signal_from.
router¶
UI colour: Amber — border-amber-600 bg-amber-950
Evaluate a condition on the context dictionary and branch "true" / "false". Chain routers for multi-way branching.
- id: "check_score"
kind: "router"
condition:
field: "score" # context key (dot-path supported: "candidate.score")
operator: "gte" # eq | neq | gt | gte | lt | lte | in | contains | is_empty | is_not_empty
value: 70
routes:
"true": "send_offer"
"false": "send_rejection"
Signals: "true", "false", "error".
api_call¶
UI colour: Teal — border-teal-600 bg-teal-950
Make an outbound HTTP request. Supports {{ context }} templating in URL, headers, and body.
- id: "enrich_company"
kind: "api_call"
http:
url: "https://api.clearbit.com/v2/companies/find?domain={{ email_domain }}"
method: "GET"
headers:
Authorization: "Bearer ${CLEARBIT_API_KEY}"
timeout: 15
response:
output_key: "company_data" # full response body stored here
extract:
- path: "name"
as: "company_name"
- path: "metrics.employees"
as: "company_size"
routes:
default: "next_step"
error: "fallback"
On HTTP errors: sets _last_error and _api_status_code in context, emits "error".
Signals: default, error.
mcp¶
UI colour: Pink — border-pink-600 bg-pink-950
Call a tool on any Model Context Protocol server. Supports SSE and stdio transports.
Signals: default, error.
model-op¶
UI colour: Emerald — border-emerald-600 bg-emerald-950
Direct CRUD on any registered model — no Python node required. The model must be declared in the workflow's context: field.
- id: "save_candidate"
kind: "model-op"
model: "Candidate"
operation: "create" # create | read | list | update | delete
payload: "{{ candidate }}" # dict or {{template}} — used by create / update
output: "saved_candidate" # context key for the result
- id: "fetch_with_relations"
kind: "model-op"
model: "Application"
operation: "read"
record_id: "{{ application_id }}"
include: "candidate,education" # comma-separated relation names
output: "application"
- id: "list_pending"
kind: "model-op"
model: "Application"
operation: "list"
filters:
status: "pending"
limit: 50
output: "pending_list"
Signals: default, error.
response¶
UI colour: Red — border-red-500 bg-red-950
Shape the HTTP response body. Usually placed as the last step. The payload is stored in context["_response"] and returned verbatim to the API caller.
Signals: default, error.
HumanInTheLoop¶
UI colour: Orange — border-orange-500 bg-orange-950
Suspend the workflow and hand off to a human reviewer. The engine persists a SystemWorkflowInstance snapshot and returns HTTP 202 Accepted with a hitl_request payload. Execution resumes when the reviewer submits their response.
- id: "approve_application"
kind: "HumanInTheLoop"
ui:
title: "Review Application"
instruction: "Approve or reject {{ name }}'s application for {{ role }}."
display_context: # allowlist of context keys shown to reviewer
- name
- role
- cv_summary
- score
human_feedback:
- name: approved
type: boolean
required: true
label: "Approve?"
- name: notes
type: string
label: "Reviewer notes"
output_key: "approval_result" # merged into context under this key on resume
auth:
required_group: "hr_manager"
assignee_user: "{{ assigned_reviewer }}"
Resume endpoint:
POST /hitl/{instance_id}/respond
Content-Type: application/json
{ "approved": true, "notes": "Strong candidate." }
Signals: (suspends — does not pass through routes)
Writing a functional Node¶
The @node Decorator¶
The decorator registers your function in the global NODE_REGISTRY:
from tuvl.core.nodes.base import node, NODE_REGISTRY
@node("my_node")
async def my_node(ctx):
return ctx
# The function is now accessible as:
# NODE_REGISTRY["my_node"]
Unique Names
Node names must be unique. Registering a duplicate name raises a ValueError.
Return Values¶
Nodes can return values in three formats:
1. Return Context Dict¶
Most common — return the modified context:
@node("enrich_data")
async def enrich_data(ctx: dict[str, Any]) -> dict[str, Any]:
ctx["enriched"] = True
ctx["timestamp"] = datetime.now().isoformat()
return ctx
The workflow continues with "default" signal.
2. Return Signal String¶
Return a routing signal to control flow:
@node("validate")
async def validate(ctx: dict[str, Any]) -> str:
if not ctx.get("email"):
return "invalid"
if "@" not in ctx["email"]:
return "invalid"
return "valid"
Use with routes:
3. Return Tuple (Context, Signal)¶
Update context AND specify routing:
@node("process_order")
async def process_order(ctx: dict[str, Any]) -> tuple[dict[str, Any], str]:
try:
ctx["order_id"] = create_order(ctx)
ctx["status"] = "created"
return ctx, "success"
except InsufficientStock:
ctx["error"] = "Out of stock"
return ctx, "out_of_stock"
Accessing the Database¶
The engine injects two database helpers into the context:
ctx["_db"]— aWorkflowUoWthat gates access to the models declared in the workflow'scontext:field (recommended).ctx["_session"]— the rawAsyncSessionfor advanced use-cases (pass toget_repositoryif you need access to a model outside the workflow context).
Using _db (recommended)¶
from tuvl.core.nodes.base import node
@node("save_contact")
async def save_contact(ctx: dict) -> dict:
repo = ctx["_db"]["Contact"] # WorkflowUoW — model must be in workflow's context:
contact = await repo.add({
"email": ctx["email"],
"name": ctx["name"],
})
ctx["id"] = str(contact.id)
return ctx
Using get_repository directly¶
Use this when you need to access a model that is not declared in the workflow's context: field:
from tuvl.core.nodes.base import node
from tuvl.core.repositories.registry import get_repository
@node("save_contact")
async def save_contact(ctx: dict) -> dict:
session = ctx["_session"]
repo = get_repository("Contact", session)
contact = await repo.add({
"email": ctx["email"],
"name": ctx["name"],
})
ctx["id"] = str(contact.id)
return ctx
Repository Methods¶
| Method | Description |
|---|---|
await repo.add(data) |
Create a new record |
await repo.get(id) |
Fetch by primary key |
await repo.list(criteria, limit, offset) |
Query with filters |
await repo.update(id, data) |
Partial update |
await repo.remove(id) |
Delete a record |
Querying Data¶
@node("find_duplicates")
async def find_duplicates(ctx: dict) -> dict:
repo = ctx["_db"]["Contact"]
existing = await repo.list(
criteria={"email": ctx["email"]},
limit=1
)
ctx["is_duplicate"] = len(existing) > 0
return ctx
Error Handling¶
Graceful Error Handling¶
Handle expected errors and return signals:
@node("external_api")
async def external_api(ctx: dict[str, Any]) -> tuple[dict[str, Any], str]:
try:
response = await httpx.get(f"https://api.example.com/{ctx['id']}")
response.raise_for_status()
ctx["api_data"] = response.json()
return ctx, "success"
except httpx.HTTPStatusError as e:
ctx["_last_error"] = f"API error: {e.response.status_code}"
return ctx, "error"
except httpx.RequestError as e:
ctx["_last_error"] = f"Network error: {str(e)}"
return ctx, "retry"
Raising Exceptions¶
Unhandled exceptions stop the workflow and trigger rollback:
@node("critical_step")
async def critical_step(ctx: dict[str, Any]) -> dict[str, Any]:
if not ctx.get("required_field"):
raise ValueError("required_field is missing")
return ctx
The workflow engine will:
- Log the error
- Set
_last_errorin context - Check for an
errorroute - Roll back the database transaction if no error route
Async Operations¶
Nodes are async by default. Use await for I/O operations:
import httpx
@node("fetch_user_data")
async def fetch_user_data(ctx: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.example.com/users/{ctx['user_id']}"
)
ctx["user_profile"] = response.json()
return ctx
Parallel Operations¶
Use asyncio.gather for concurrent tasks:
import asyncio
@node("enrich_all")
async def enrich_all(ctx: dict[str, Any]) -> dict[str, Any]:
tasks = [
fetch_company_info(ctx["company"]),
fetch_social_profiles(ctx["email"]),
fetch_credit_score(ctx["ssn"]),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
ctx["company_info"] = results[0] if not isinstance(results[0], Exception) else None
ctx["social"] = results[1] if not isinstance(results[1], Exception) else None
ctx["credit"] = results[2] if not isinstance(results[2], Exception) else None
return ctx
Node Organization¶
File Structure¶
Organize nodes by domain:
nodes/
├── contacts.py # Contact-related nodes
├── orders.py # Order processing nodes
├── notifications.py # Email, SMS, push notifications
└── integrations.py # External API integrations
Auto-Discovery¶
tuvl automatically imports all .py files from the project's nodes/ directory at startup — no __init__.py required. Every @node()-decorated function in those files is registered in NODE_REGISTRY and becomes available to workflow steps.
Utility Functions¶
Keep node functions focused. Extract utilities:
# nodes/utils/email.py
async def send_email(to: str, subject: str, body: str) -> bool:
...
# nodes/notifications.py
from .utils.email import send_email
@node("send_welcome_email")
async def send_welcome_email(ctx: dict[str, Any]) -> dict[str, Any]:
success = await send_email(
to=ctx["email"],
subject="Welcome!",
body=f"Hello {ctx['name']}, welcome aboard!"
)
ctx["email_sent"] = success
return ctx
Testing Nodes¶
Nodes are simple async functions — easy to test:
import pytest
from unittest.mock import AsyncMock, MagicMock
from nodes.contacts import save_contact
@pytest.mark.asyncio
async def test_save_contact():
# Mock the session and repository
mock_session = AsyncMock()
mock_repo = AsyncMock()
mock_repo.add.return_value = MagicMock(id="test-uuid")
with patch("nodes.contacts.get_repository", return_value=mock_repo):
ctx = {
"_session": mock_session,
"email": "test@example.com",
"name": "Test User",
}
result = await save_contact(ctx)
assert result["id"] == "test-uuid"
mock_repo.add.assert_called_once()
Common Patterns¶
Validation Node¶
@node("validate_input")
async def validate_input(ctx: dict[str, Any]) -> str:
errors = []
if not ctx.get("email"):
errors.append("Email is required")
elif "@" not in ctx["email"]:
errors.append("Invalid email format")
if not ctx.get("name"):
errors.append("Name is required")
if errors:
ctx["validation_errors"] = errors
return "invalid"
return "valid"
Transformation Node¶
@node("normalize_data")
async def normalize_data(ctx: dict[str, Any]) -> dict[str, Any]:
ctx["email"] = ctx.get("email", "").lower().strip()
ctx["name"] = ctx.get("name", "").title().strip()
ctx["phone"] = re.sub(r"[^\d+]", "", ctx.get("phone", ""))
return ctx
Conditional Logic Node¶
@node("route_by_amount")
async def route_by_amount(ctx: dict[str, Any]) -> str:
amount = ctx.get("amount", 0)
if amount > 10000:
return "high_value"
elif amount > 1000:
return "medium_value"
else:
return "low_value"
Notification Node¶
@node("send_notification")
async def send_notification(ctx: dict[str, Any]) -> dict[str, Any]:
notifications_sent = []
if ctx.get("email"):
await send_email(ctx["email"], "Update", ctx["message"])
notifications_sent.append("email")
if ctx.get("phone"):
await send_sms(ctx["phone"], ctx["message"])
notifications_sent.append("sms")
ctx["notifications_sent"] = notifications_sent
return ctx
Next Steps¶
- Repositories — Working with data
- Workflows — Using nodes in workflows
- Examples — More node examples