Coordinator API Reference¶
This page combines API introspection with a practical overview of coordinator adapters and task variables.
Task Variables & Adapters (overview)¶
Task variables live under tasks.coordinator.vars. Adapters are exposed as coordinator functions and can also be imported and called directly in Python.
Quick reference¶
| Adapter | Signature (simplified) | Semantics |
|---|---|---|
vars.set |
vars_set(task_id, **kv) |
Set multiple dotted/nested keys atomically ($set). Optional block_sensitive=True. |
vars.merge |
vars_merge(task_id, **kv) |
Deep-merge dicts into coordinator.vars; non-dict leaves overwrite. Optional block_sensitive=True. |
vars.incr |
vars_incr(task_id, *, key, by=1) |
Atomic numeric increment on coordinator.vars.<key> ($inc). Creates the leaf if missing. |
vars.unset |
vars_unset(task_id, *paths, keys=None) |
Remove one or more keys; prunes empty subtrees afterwards. |
merge.generic |
merge_generic(task_id, from_nodes, target) |
Merge metadata across nodes, mark target artifact complete. |
metrics.aggregate |
metrics_aggregate(task_id, node_id, mode="sum") |
Aggregate raw metrics, write to node stats. |
noop |
noop(task_id, **_) |
No-op helper useful for tests. |
Limits & validation¶
- Max 256 keys per operation
- Max depth ≈ 16 dot segments across the full path (including
coordinator.vars.) - Max path length 512 chars; max segment length 128 chars
- Max string/bytes value size: 64 KiB (UTF‑8 for strings)
- Key segments cannot start with
$, contain.or NUL (\x00)
Sensitive value detection¶
Adapters can block or flag sensitive-looking values (tokens, PEM/JWT, high-entropy blobs). Pass block_sensitive=True to make the adapter fail fast, otherwise the write succeeds and logs sensitive_hits > 0.
Coordinator Class¶
flowkit.coordinator.runner.Coordinator ¶
Coordinator(*, db, cfg: CoordinatorConfig | None = None, worker_types: list[str] | None = None, clock: Clock | None = None, adapters: dict[str, Any] | None = None, sensitive_detector: Any | None = None)
Orchestrates DAG execution across workers via Kafka topics.
db is injected (e.g., Motor client). clock is injectable for tests.
Source code in src/flowkit/coordinator/runner.py
Functions¶
start
async
¶
Source code in src/flowkit/coordinator/runner.py
stop
async
¶
Source code in src/flowkit/coordinator/runner.py
create_task
async
¶
Source code in src/flowkit/coordinator/runner.py
Adapters (API via mkdocstrings)¶
flowkit.coordinator.adapters ¶
Classes¶
CoordinatorAdapters ¶
CoordinatorAdapters(*, db, clock: Clock | None = None, detector: SensitiveDetectorProto | Callable[[str, Any], bool] | str | None = None)
Coordinator-facing helpers that operate via the injected db.
Side effects should be idempotent.
Source code in src/flowkit/coordinator/adapters.py
Functions¶
vars_set
async
¶
Set multiple keys atomically under coordinator.vars. Accepts dotted keys via {"kv": {"sla.max_delay": 500, "qps": 7}} or nested dict via {"kv": {"sla": {"max_delay": 500}}}. Direct kwargs are accepted for identifier-safe keys.
Source code in src/flowkit/coordinator/adapters.py
vars_merge
async
¶
Deep-merge dict(s) into coordinator.vars. Non-dict leaves overwrite. Accepts {"data": {...}} or {"kv": {...}}.
Source code in src/flowkit/coordinator/adapters.py
vars_incr
async
¶
Atomically increment a numeric leaf under coordinator.vars.
Source code in src/flowkit/coordinator/adapters.py
vars_unset
async
¶
Remove keys under coordinator.vars (dot-notation). No-op for missing keys.
Supports
await vars_unset(tid, "a.b", "c.d")
and: await vars_unset(tid, keys=["a.b", "c.d"])
Source code in src/flowkit/coordinator/adapters.py
merge_generic
async
¶
Merge metadata from multiple nodes and mark a target artifact as complete.
Source code in src/flowkit/coordinator/adapters.py
metrics_aggregate
async
¶
Aggregate metrics from raw entries and store the result on the node.