Skip to content

Example: Custom Nodes

A collection of common node patterns and implementations.

Basic Patterns

Validation Node

from typing import Any
from tuvl_engine.nodes.base import node

@node("validate_email")
async def validate_email(ctx: dict[str, Any]) -> tuple[dict[str, Any], str]:
    """Validate email format and uniqueness."""
    import re

    email = ctx.get("email", "")

    # Format validation
    if not email:
        ctx["validation_error"] = "Email is required"
        return ctx, "invalid"

    if not re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email):
        ctx["validation_error"] = "Invalid email format"
        return ctx, "invalid"

    # Uniqueness check
    session = ctx["_session"]
    from tuvl_engine.repositories.registry import get_repository
    repo = get_repository("User", session)

    existing = await repo.list(criteria={"email": email}, limit=1)
    if existing:
        ctx["validation_error"] = "Email already registered"
        return ctx, "duplicate"

    return ctx, "valid"

Use in workflow:

- id: "validate"
  kind: "functional"
  runner: "validate_email"
  routes:
    valid: "create_user"
    invalid: "reject"
    duplicate: "merge_accounts"

Data Transformation Node

@node("normalize_contact")
async def normalize_contact(ctx: dict[str, Any]) -> dict[str, Any]:
    """Normalize and clean contact data."""

    # Normalize email
    if "email" in ctx:
        ctx["email"] = ctx["email"].lower().strip()

    # Normalize name
    if "name" in ctx:
        ctx["name"] = ctx["name"].title().strip()

    # Clean phone number
    if "phone" in ctx:
        import re
        ctx["phone"] = re.sub(r"[^\d+]", "", ctx["phone"])
        ctx["phone_formatted"] = format_phone(ctx["phone"])

    # Parse address
    if "address" in ctx:
        ctx["address_parts"] = parse_address(ctx["address"])

    return ctx

Conditional Logic Node

@node("route_by_value")
async def route_by_value(ctx: dict[str, Any]) -> str:
    """Route based on order value."""

    amount = ctx.get("total_amount", 0)
    customer_type = ctx.get("customer_type", "regular")

    if amount > 10000:
        return "high_value"
    elif customer_type == "vip":
        return "vip_processing"
    elif amount > 1000:
        return "standard"
    else:
        return "express"

Database Operations

Bulk Insert Node

@node("bulk_import")
async def bulk_import(ctx: dict[str, Any]) -> dict[str, Any]:
    """Import multiple records from a list."""

    session = ctx["_session"]
    from tuvl_engine.repositories.registry import get_repository
    repo = get_repository("Contact", session)

    items = ctx.get("items", [])
    imported_ids = []
    errors = []

    for item in items:
        try:
            record = await repo.add(item)
            imported_ids.append(str(record.id))
        except Exception as e:
            errors.append({
                "item": item,
                "error": str(e)
            })

    ctx["imported_count"] = len(imported_ids)
    ctx["imported_ids"] = imported_ids
    ctx["error_count"] = len(errors)
    ctx["errors"] = errors

    return ctx

Search Node

@node("search_records")
async def search_records(ctx: dict[str, Any]) -> dict[str, Any]:
    """Search with multiple criteria."""

    session = ctx["_session"]
    from sqlmodel import select, or_, and_
    from tuvl_engine.models.loader import MODEL_REGISTRY

    Contact = MODEL_REGISTRY["Contact"]
    query = ctx.get("query", "")

    # Build search statement
    statement = select(Contact).where(
        or_(
            Contact.name.ilike(f"%{query}%"),
            Contact.email.ilike(f"%{query}%"),
            Contact.company.ilike(f"%{query}%"),
        )
    ).limit(ctx.get("limit", 20))

    result = await session.exec(statement)
    contacts = result.all()

    ctx["results"] = [c.model_dump() for c in contacts]
    ctx["result_count"] = len(contacts)

    return ctx

Transaction Node

@node("transfer_funds")
async def transfer_funds(ctx: dict[str, Any]) -> tuple[dict[str, Any], str]:
    """Transfer funds between accounts."""

    session = ctx["_session"]
    from tuvl_engine.repositories.registry import get_repository

    account_repo = get_repository("Account", session)

    source = await account_repo.get(ctx["from_account_id"])
    dest = await account_repo.get(ctx["to_account_id"])
    amount = ctx["amount"]

    if not source or not dest:
        ctx["error"] = "Account not found"
        return ctx, "error"

    if source.balance < amount:
        ctx["error"] = "Insufficient funds"
        return ctx, "insufficient_funds"

    # Perform transfer (both updates in same transaction)
    await account_repo.update(source.id, {
        "balance": source.balance - amount
    })
    await account_repo.update(dest.id, {
        "balance": dest.balance + amount
    })

    ctx["new_source_balance"] = source.balance - amount
    ctx["new_dest_balance"] = dest.balance + amount
    ctx["transfer_id"] = generate_transfer_id()

    return ctx, "success"

External Services

HTTP Request Node

import httpx

@node("call_external_api")
async def call_external_api(ctx: dict[str, Any]) -> tuple[dict[str, Any], str]:
    """Make authenticated API request."""

    async with httpx.AsyncClient() as client:
        try:
            response = await client.post(
                "https://api.example.com/process",
                json={
                    "data": ctx["payload"]
                },
                headers={
                    "Authorization": f"Bearer {ctx.get('api_key', '')}",
                    "Content-Type": "application/json"
                },
                timeout=30
            )
            response.raise_for_status()

            ctx["api_response"] = response.json()
            return ctx, "success"

        except httpx.HTTPStatusError as e:
            ctx["api_error"] = f"HTTP {e.response.status_code}"
            return ctx, "api_error"

        except httpx.RequestError as e:
            ctx["api_error"] = str(e)
            return ctx, "network_error"

Email Node

import aiosmtplib
from email.message import EmailMessage

@node("send_email")
async def send_email(ctx: dict[str, Any]) -> dict[str, Any]:
    """Send email notification."""

    message = EmailMessage()
    message["From"] = ctx.get("from_email", "noreply@example.com")
    message["To"] = ctx["to_email"]
    message["Subject"] = ctx["subject"]
    message.set_content(ctx["body"])

    try:
        await aiosmtplib.send(
            message,
            hostname=ctx.get("smtp_host", "localhost"),
            port=ctx.get("smtp_port", 587),
            start_tls=True,
            username=ctx.get("smtp_user"),
            password=ctx.get("smtp_password"),
        )
        ctx["email_sent"] = True
    except Exception as e:
        ctx["email_sent"] = False
        ctx["email_error"] = str(e)

    return ctx

Webhook Node

@node("send_webhook")
async def send_webhook(ctx: dict[str, Any]) -> tuple[dict[str, Any], str]:
    """Send webhook notification."""

    import httpx
    import hmac
    import hashlib
    import json

    payload = {
        "event": ctx["event_type"],
        "data": ctx["event_data"],
        "timestamp": datetime.now().isoformat()
    }

    # Sign payload
    secret = ctx.get("webhook_secret", "")
    signature = hmac.new(
        secret.encode(),
        json.dumps(payload).encode(),
        hashlib.sha256
    ).hexdigest()

    async with httpx.AsyncClient() as client:
        try:
            response = await client.post(
                ctx["webhook_url"],
                json=payload,
                headers={
                    "X-Signature": signature,
                    "Content-Type": "application/json"
                },
                timeout=10
            )

            ctx["webhook_status"] = response.status_code
            ctx["webhook_sent"] = response.status_code < 400

            return ctx, "success" if ctx["webhook_sent"] else "failed"

        except Exception as e:
            ctx["webhook_error"] = str(e)
            return ctx, "error"

File Operations

File Upload Handler

import aiofiles
from pathlib import Path

@node("save_uploaded_file")
async def save_uploaded_file(ctx: dict[str, Any]) -> dict[str, Any]:
    """Save uploaded file and extract metadata."""

    import hashlib

    file_data = ctx["file_content"]  # Base64 or bytes
    filename = ctx["filename"]

    # Generate unique filename
    file_hash = hashlib.md5(file_data).hexdigest()[:8]
    safe_name = f"{file_hash}_{filename}"

    upload_dir = Path("uploads")
    upload_dir.mkdir(exist_ok=True)
    file_path = upload_dir / safe_name

    async with aiofiles.open(file_path, "wb") as f:
        await f.write(file_data)

    ctx["file_path"] = str(file_path)
    ctx["file_size"] = len(file_data)
    ctx["file_hash"] = file_hash

    return ctx

Utilities

Logging Node

import logging

logger = logging.getLogger(__name__)

@node("log_context")
async def log_context(ctx: dict[str, Any]) -> dict[str, Any]:
    """Log context for debugging."""

    # Filter sensitive keys
    safe_ctx = {
        k: v for k, v in ctx.items()
        if not k.startswith("_") and k not in ["password", "token", "secret"]
    }

    logger.info(f"Context at {ctx.get('_step', 'unknown')}: {safe_ctx}")

    return ctx

Timing Node

import time

@node("measure_time")
async def measure_time(ctx: dict[str, Any]) -> dict[str, Any]:
    """Measure execution time of workflow section."""

    if "_start_time" not in ctx:
        ctx["_start_time"] = time.time()
    else:
        elapsed = time.time() - ctx["_start_time"]
        ctx["execution_time_seconds"] = round(elapsed, 3)
        del ctx["_start_time"]

    return ctx

Rate Limiter Node

import asyncio
from collections import defaultdict

# Simple in-memory rate limiter (use Redis in production)
_rate_limits = defaultdict(list)

@node("rate_limit")
async def rate_limit(ctx: dict[str, Any]) -> tuple[dict[str, Any], str]:
    """Apply rate limiting per user."""

    user_id = ctx.get("user_id", "anonymous")
    limit = ctx.get("rate_limit", 10)  # requests per minute
    window = 60  # seconds

    now = time.time()

    # Clean old entries
    _rate_limits[user_id] = [
        t for t in _rate_limits[user_id]
        if now - t < window
    ]

    if len(_rate_limits[user_id]) >= limit:
        ctx["rate_limited"] = True
        ctx["retry_after"] = window - (now - _rate_limits[user_id][0])
        return ctx, "rate_limited"

    _rate_limits[user_id].append(now)
    ctx["rate_limited"] = False

    return ctx, "allowed"

Testing Nodes

import pytest
from unittest.mock import AsyncMock, MagicMock, patch

@pytest.mark.asyncio
async def test_validate_email_valid():
    ctx = {
        "_session": AsyncMock(),
        "email": "test@example.com"
    }

    mock_repo = AsyncMock()
    mock_repo.list.return_value = []

    with patch("nodes.validation.get_repository", return_value=mock_repo):
        result, signal = await validate_email(ctx)

    assert signal == "valid"
    assert "validation_error" not in result


@pytest.mark.asyncio
async def test_validate_email_invalid_format():
    ctx = {
        "_session": AsyncMock(),
        "email": "not-an-email"
    }

    result, signal = await validate_email(ctx)

    assert signal == "invalid"
    assert "validation_error" in result


@pytest.mark.asyncio
async def test_validate_email_duplicate():
    ctx = {
        "_session": AsyncMock(),
        "email": "existing@example.com"
    }

    mock_repo = AsyncMock()
    mock_repo.list.return_value = [MagicMock()]  # Existing user

    with patch("nodes.validation.get_repository", return_value=mock_repo):
        result, signal = await validate_email(ctx)

    assert signal == "duplicate"

Next Steps