Complex Workflows¶
This page shows composable patterns for larger pipelines. Below is a compact example that uses task variables to set SLA knobs, stream processing early, and collect metrics — without touching worker code.
Vars-driven SLA with streaming fan-in¶
{
"nodes": [
{"node_id": "indexer", "type": "indexer"},
{"node_id": "proc", "type": "processor", "depends_on": ["indexer"],
"io": {"start_when": "first_batch"}},
{"node_id": "update_sla", "type": "coordinator_fn", "depends_on": ["indexer"],
"io": {"fn": "vars.merge", "fn_args": {"data": {"sla": {"max_delay": 500}, "flags": {"ab": true}}}}},
{"node_id": "agg_metrics", "type": "coordinator_fn", "depends_on": ["proc"],
"io": {"fn": "metrics.aggregate", "fn_args": {"node_id": "proc", "mode": "sum"}}}
],
"edges": [
["indexer", "proc"],
["indexer", "update_sla"],
["proc", "agg_metrics"]
],
"edges_ex": [
{"from": "indexer", "to": "proc", "mode": "async", "trigger": "on_batch"}
]
}
What happens
update_slaseedscoordinator.vars.sla.max_delayand a feature flagproccan start as soon asindexerproduces the first batchagg_metricsaggregates raw metrics fromprocintograph.nodes[].stats
Inspecting coordinator.vars¶
A coordinator function (or an external diagnostic tool) can read the task document to act on vars:
task = await db.tasks.find_one({"id": task_id}, {"coordinator": 1})
vars = ((task or {}).get("coordinator") or {}).get("vars") or {}
assert isinstance(vars, dict)
Cleaning up vars¶
You can remove keys and prune empty objects with vars.unset:
{
"node_id": "cleanup_vars",
"type": "coordinator_fn",
"io": {
"fn": "vars.unset",
"fn_args": {"keys": ["flags.ab", "sla.max_delay"]}
}
}
Empty parent objects are removed in a second pass by the adapter.