Skip to content

Coordinator API Reference

This page combines API introspection with a practical overview of coordinator adapters and task variables.

Task Variables & Adapters (overview)

Task variables live under tasks.coordinator.vars. Adapters are exposed as coordinator functions and can also be imported and called directly in Python.

Quick reference

Adapter Signature (simplified) Semantics
vars.set vars_set(task_id, **kv) Set multiple dotted/nested keys atomically ($set). Optional block_sensitive=True.
vars.merge vars_merge(task_id, **kv) Deep-merge dicts into coordinator.vars; non-dict leaves overwrite. Optional block_sensitive=True.
vars.incr vars_incr(task_id, *, key, by=1) Atomic numeric increment on coordinator.vars.<key> ($inc). Creates the leaf if missing.
vars.unset vars_unset(task_id, *paths, keys=None) Remove one or more keys; prunes empty subtrees afterwards.
merge.generic merge_generic(task_id, from_nodes, target) Merge metadata across nodes, mark target artifact complete.
metrics.aggregate metrics_aggregate(task_id, node_id, mode="sum") Aggregate raw metrics, write to node stats.
noop noop(task_id, **_) No-op helper useful for tests.

Limits & validation

  • Max 256 keys per operation
  • Max depth ≈ 16 dot segments across the full path (including coordinator.vars.)
  • Max path length 512 chars; max segment length 128 chars
  • Max string/bytes value size: 64 KiB (UTF‑8 for strings)
  • Key segments cannot start with $, contain . or NUL (\x00)

Sensitive value detection

Adapters can block or flag sensitive-looking values (tokens, PEM/JWT, high-entropy blobs). Pass block_sensitive=True to make the adapter fail fast, otherwise the write succeeds and logs sensitive_hits > 0.


Coordinator Class

flowkit.coordinator.runner.Coordinator

Coordinator(*, db, cfg: CoordinatorConfig | None = None, worker_types: list[str] | None = None, clock: Clock | None = None, adapters: dict[str, Any] | None = None, sensitive_detector: Any | None = None)

Orchestrates DAG execution across workers via Kafka topics. db is injected (e.g., Motor client). clock is injectable for tests.

Source code in src/flowkit/coordinator/runner.py
def __init__(
    self,
    *,
    db,
    cfg: CoordinatorConfig | None = None,
    worker_types: list[str] | None = None,
    clock: Clock | None = None,
    adapters: dict[str, Any] | None = None,
    sensitive_detector: Any | None = None,
) -> None:
    self.db = db
    self.cfg = copy.deepcopy(cfg) if cfg is not None else CoordinatorConfig.load()
    if worker_types:
        self.cfg.worker_types = list(worker_types)

    self.clock: Clock = clock or SystemClock()
    self.bus = KafkaBus(self.cfg)
    self.outbox = OutboxDispatcher(db=db, bus=self.bus, cfg=self.cfg, clock=self.clock)
    self.adapters = adapters or dict(default_adapters(db=db, clock=self.clock, detector=sensitive_detector))

    self._tasks: set[asyncio.Task] = set()
    self._running = False

    self._announce_consumer: AIOKafkaConsumer | None = None
    self._status_consumers: dict[str, AIOKafkaConsumer] = {}
    self._query_reply_consumer: AIOKafkaConsumer | None = None

    self._gid = f"coord.{uuid.uuid4().hex[:6]}"

    # logging
    self.log = get_logger("coordinator")
    bind_context(role="coordinator", gid=self._gid)
    self.log.debug("coordinator.init", event="coord.init")

Functions

start async

start() -> None
Source code in src/flowkit/coordinator/runner.py
async def start(self) -> None:
    # Helpful config print (silent by default unless tests enable stdout)
    cfg_dump: Any
    if hasattr(self.cfg, "model_dump"):  # pydantic v2
        cfg_dump = self.cfg.model_dump()
    elif hasattr(self.cfg, "dict"):  # pydantic v1
        cfg_dump = self.cfg.dict()
    else:
        cfg_dump = getattr(self.cfg, "__dict__", str(self.cfg))
    self.log.debug("coordinator.start", event="coord.start", cfg=cfg_dump)

    await self._ensure_indexes()
    await self.bus.start()
    await self._start_consumers()
    await self.outbox.start()
    self._running = True
    self._spawn(self._scheduler_loop(), name="scheduler")
    self._spawn(self._heartbeat_monitor(), name="hb-monitor")
    self._spawn(self._finalizer_loop(), name="finalizer")
    self._spawn(self._resume_inflight(), name="resume-inflight")
    self.log.debug("coordinator.started", event="coord.started", gid=self._gid)

stop async

stop() -> None
Source code in src/flowkit/coordinator/runner.py
async def stop(self) -> None:
    self._running = False
    for t in list(self._tasks):
        t.cancel()
    self._tasks.clear()
    with swallow(
        logger=self.log, code="outbox.stop", msg="outbox stop failed", level=logging.ERROR, expected=False
    ):
        await self.outbox.stop()
    with swallow(logger=self.log, code="bus.stop", msg="bus stop failed", level=logging.ERROR, expected=False):
        await self.bus.stop()
    self.log.debug("coordinator.stopped", event="coord.stopped")

create_task async

create_task(*, params: dict[str, Any], graph: dict[str, Any]) -> str
Source code in src/flowkit/coordinator/runner.py
async def create_task(self, *, params: dict[str, Any], graph: dict[str, Any]) -> str:
    task_id = str(uuid.uuid4())
    graph.setdefault("nodes", [])
    graph.setdefault("edges", [])
    graph.setdefault("edges_ex", [])
    doc = TaskDoc(
        id=task_id,
        pipeline_id=task_id,
        status=RunState.queued,
        params=params,
        graph=graph,
        status_history=[{"from": None, "to": RunState.queued, "at": self.clock.now_dt()}],
        started_at=self.clock.now_dt().isoformat(),
        last_event_recv_ms=self.clock.now_ms(),
    ).model_dump(mode="json")
    await self.db.tasks.insert_one(doc)
    self.log.debug("task.created", event="coord.task.created", task_id=task_id)
    return task_id

Adapters (API via mkdocstrings)

flowkit.coordinator.adapters

Classes

CoordinatorAdapters

CoordinatorAdapters(*, db, clock: Clock | None = None, detector: SensitiveDetectorProto | Callable[[str, Any], bool] | str | None = None)

Coordinator-facing helpers that operate via the injected db. Side effects should be idempotent.

Source code in src/flowkit/coordinator/adapters.py
def __init__(
    self,
    *,
    db,
    clock: Clock | None = None,
    detector: SensitiveDetectorProto | Callable[[str, Any], bool] | str | None = None,
) -> None:
    self.db = db
    self.clock = clock or SystemClock()
    self.log = get_logger("coord.adapters")
    self._limits = {
        "max_paths_per_op": 256,
        "max_key_depth": 16,  # max number of dots in the stored path
        "max_path_len": 512,
        "max_value_bytes": 64 * 1024,
        "max_seg_len": 128,
    }
    if isinstance(detector, str):
        detector = _import_from_string(detector)
    self._detector = _DetectorShim(detector) if detector is not None else _SensitiveDetector()
Functions
vars_set async
vars_set(task_id: str, **kv: Any) -> dict[str, Any]

Set multiple keys atomically under coordinator.vars. Accepts dotted keys via {"kv": {"sla.max_delay": 500, "qps": 7}} or nested dict via {"kv": {"sla": {"max_delay": 500}}}. Direct kwargs are accepted for identifier-safe keys.

Source code in src/flowkit/coordinator/adapters.py
async def vars_set(self, task_id: str, **kv: Any) -> dict[str, Any]:
    """
    Set multiple keys atomically under coordinator.vars.
    Accepts dotted keys via {"kv": {"sla.max_delay": 500, "qps": 7}}
    or nested dict via {"kv": {"sla": {"max_delay": 500}}}.
    Direct kwargs are accepted for identifier-safe keys.
    """
    block_sensitive_flag = bool(kv.pop("block_sensitive", False))

    if "kv" in kv and isinstance(kv["kv"], dict):
        data = dict(kv["kv"])  # copy to avoid caller mutation
        block_sensitive = bool(data.pop("block_sensitive", False)) or block_sensitive_flag
    else:
        data = {k: v for k, v in kv.items() if k not in ("block_sensitive",)}
        block_sensitive = block_sensitive_flag

    set_doc: dict[str, Any] = {}
    for k, v in list(data.items()):
        if isinstance(v, dict) and "." not in k:
            set_doc.update(self._flatten_for_set(v, prefix=f"coordinator.vars.{k}"))
            continue
        for seg in k.split("."):
            self._validate_key_segment(seg)
        set_doc[f"coordinator.vars.{k}"] = v

    if not set_doc:
        return {"ok": True, "touched": 0}

    self._validate_paths_and_sizes(set_doc)

    suspicious = [p for p, v in set_doc.items() if self._detector.is_sensitive(p, v)]
    if suspicious and block_sensitive:
        raise AdapterError(f"vars.set blocked: {len(suspicious)} sensitive-looking values")

    await self.db.tasks.update_one(
        {"id": task_id},
        {"$set": set_doc, "$currentDate": {"updated_at": True}},
    )

    keys = sorted(set_doc.keys())
    self.log.info(
        "vars.set",
        event="coord.vars.set",
        task_id=task_id,
        keys=keys,
        n=len(keys),
        sensitive_hits=len(suspicious),
    )
    return {"ok": True, "touched": len(keys)}
vars_merge async
vars_merge(task_id: str, **kv: Any) -> dict[str, Any]

Deep-merge dict(s) into coordinator.vars. Non-dict leaves overwrite. Accepts {"data": {...}} or {"kv": {...}}.

Source code in src/flowkit/coordinator/adapters.py
async def vars_merge(self, task_id: str, **kv: Any) -> dict[str, Any]:
    """
    Deep-merge dict(s) into coordinator.vars. Non-dict leaves overwrite.
    Accepts {"data": {...}} or {"kv": {...}}.
    """
    src = kv.get("data", kv.get("kv", kv))
    if not isinstance(src, dict):
        raise AdapterError("vars.merge expects a mapping under 'data' or 'kv'")

    set_doc = self._flatten_for_set(src, prefix="coordinator.vars")
    if not set_doc:
        return {"ok": True, "touched": 0}

    self._validate_paths_and_sizes(set_doc)
    suspicious = [p for p, v in set_doc.items() if self._detector.is_sensitive(p, v)]
    if kv.get("block_sensitive") and suspicious:
        raise AdapterError(f"vars.merge blocked: {len(suspicious)} sensitive-looking values")

    existing = await self.db.tasks.find_one({"id": task_id}, {"coordinator": 1})
    has_existing_vars = bool(((existing or {}).get("coordinator") or {}).get("vars"))
    num_paths = len(set_doc)

    await self.db.tasks.update_one(
        {"id": task_id},
        {"$set": set_doc, "$currentDate": {"updated_at": True}},
    )

    keys_sorted = sorted(set_doc.keys())
    if not has_existing_vars and num_paths > 1:
        # Initial population with multiple paths → align logs with vars.set
        self.log.info(
            "vars.set",
            event="coord.vars.set",
            task_id=task_id,
            keys=keys_sorted,
            n=num_paths,
            sensitive_hits=len(suspicious),
        )
    else:
        self.log.info(
            "vars.merge",
            event="coord.vars.merge",
            task_id=task_id,
            keys=keys_sorted,
            n=num_paths,
            sensitive_hits=len(suspicious),
        )
    return {"ok": True, "touched": len(set_doc)}
vars_incr async
vars_incr(task_id: str, *, key: str, by: int | float = 1) -> dict[str, Any]

Atomically increment a numeric leaf under coordinator.vars..

Source code in src/flowkit/coordinator/adapters.py
async def vars_incr(self, task_id: str, *, key: str, by: int | float = 1) -> dict[str, Any]:
    """
    Atomically increment a numeric leaf under coordinator.vars.<key>.
    """
    if not isinstance(by, int | float):
        raise AdapterError("'by' must be int or float")
    if not math.isfinite(float(by)):
        raise AdapterError("'by' must be finite")
    if not isinstance(key, str) or not key:
        raise AdapterError("'key' must be a non-empty string")
    for seg in key.split("."):
        self._validate_key_segment(seg)

    path = f"coordinator.vars.{key}"

    # Pre-check the existing type to surface a clean AdapterError instead of
    # relying on storage-layer $inc behavior.
    try:
        existing = await self.db.tasks.find_one({"id": task_id}, {"coordinator": 1})
    except Exception:
        existing = None
    if isinstance(existing, dict):
        cur_val = self._get_path_value(existing, path)
        if cur_val is not None and not isinstance(cur_val, int | float):
            raise AdapterError("vars.incr expects a numeric leaf at target key")

    await self.db.tasks.update_one(
        {"id": task_id},
        {"$inc": {path: by}, "$currentDate": {"updated_at": True}},
    )
    self.log.info("vars.incr", event="coord.vars.incr", task_id=task_id, key=path, by=by)
    return {"ok": True, "key": key, "by": by}
vars_unset async
vars_unset(task_id: str, *paths: str, keys: Iterable[str] | None = None) -> dict[str, Any]

Remove keys under coordinator.vars (dot-notation). No-op for missing keys.

Supports

await vars_unset(tid, "a.b", "c.d")

and: await vars_unset(tid, keys=["a.b", "c.d"])

Source code in src/flowkit/coordinator/adapters.py
async def vars_unset(
    self,
    task_id: str,
    *paths: str,
    keys: Iterable[str] | None = None,
) -> dict[str, Any]:
    r"""
    Remove keys under ``coordinator.vars`` (dot-notation). No-op for missing keys.

    Supports:
        await vars_unset(tid, "a.b", "c.d")
    and:
        await vars_unset(tid, keys=["a.b", "c.d"])
    """
    all_keys: list[str] = list(paths)
    if keys is not None:
        if isinstance(keys, str | bytes):
            raise AdapterError("'keys' must be an iterable of strings")
        all_keys.extend([str(k) for k in keys])

    if not all_keys:
        return {"ok": True, "touched": 0}

    unset_doc: dict[str, Any] = {}
    for k in all_keys:
        if not isinstance(k, str) or k == "":
            raise AdapterError("unset key must be a non-empty string")

        for seg in k.split("."):
            self._validate_key_segment(seg)

        path = f"coordinator.vars.{k}"
        if len(path) > self._limits["max_path_len"]:
            raise AdapterError(f"path too long: {path!r}")
        if path.count(".") >= self._limits["max_key_depth"]:
            raise AdapterError(f"path too deep: {path!r}")

        unset_doc[path] = True  # value is ignored by Mongo

    await self.db.tasks.update_one(
        {"id": task_id},
        {"$unset": unset_doc, "$currentDate": {"updated_at": True}},
    )
    self.log.info(
        "vars.unset",
        event="coord.vars.unset",
        task_id=task_id,
        keys=sorted(unset_doc.keys()),
        n=len(unset_doc),
    )

    # Second pass: prune empty parent objects that may remain after the unset.
    try:
        doc_after = await self.db.tasks.find_one({"id": task_id}, {"coordinator": 1})
    except Exception:
        doc_after = None

    vars_tree = (((doc_after or {}).get("coordinator") or {}).get("vars")) or {}
    if isinstance(vars_tree, dict):
        prune_paths, _ = self._collect_empty_paths(vars_tree, prefix="coordinator.vars")
        if prune_paths:
            # Issue a single $unset for all empty subtrees.
            await self.db.tasks.update_one(
                {"id": task_id},
                {"$unset": {p: True for p in prune_paths}, "$currentDate": {"updated_at": True}},
            )
            self.log.info(
                "vars.unset.prune",
                event="coord.vars.unset.prune",
                task_id=task_id,
                keys=sorted(prune_paths),
                n=len(prune_paths),
            )
    return {"ok": True, "touched": len(unset_doc)}
merge_generic async
merge_generic(task_id: str, from_nodes: list[str], target: dict[str, Any]) -> dict[str, Any]

Merge metadata from multiple nodes and mark a target artifact as complete.

Source code in src/flowkit/coordinator/adapters.py
async def merge_generic(self, task_id: str, from_nodes: list[str], target: dict[str, Any]) -> dict[str, Any]:
    """
    Merge metadata from multiple nodes and mark a target artifact as complete.
    """
    if not target or not isinstance(target, dict):
        raise AdapterError("merge_generic: 'target' must be a dict with at least node_id")
    target_node = target.get("node_id") or "coordinator"

    partial_batches = 0
    complete_nodes = set()
    batch_uids = set()

    cur = self.db.artifacts.find({"task_id": task_id, "node_id": {"$in": from_nodes}})
    async for a in cur:
        st = a.get("status")
        if st == "complete":
            complete_nodes.add(a.get("node_id"))
        elif st == "partial":
            uid = a.get("batch_uid")
            if uid:
                batch_uids.add(uid)
            partial_batches += 1

    meta = {
        "merged_from": from_nodes,
        "complete_nodes": sorted(list(complete_nodes)),
        "partial_batches": partial_batches,
        "distinct_batch_uids": len(batch_uids),
        "merged_at": self.clock.now_dt().isoformat(),
    }

    await self.db.artifacts.update_one(
        {"task_id": task_id, "node_id": target_node},
        {
            "$set": {"status": "complete", "meta": meta, "updated_at": self.clock.now_dt()},
            "$setOnInsert": {
                "task_id": task_id,
                "node_id": target_node,
                "attempt_epoch": 0,
                "created_at": self.clock.now_dt(),
            },
        },
        upsert=True,
    )
    return {"ok": True, "meta": meta}
metrics_aggregate async
metrics_aggregate(task_id: str, node_id: str, *, mode: str = 'sum') -> dict[str, Any]

Aggregate metrics from raw entries and store the result on the node.

Source code in src/flowkit/coordinator/adapters.py
async def metrics_aggregate(self, task_id: str, node_id: str, *, mode: str = "sum") -> dict[str, Any]:
    """
    Aggregate metrics from raw entries and store the result on the node.
    """
    cur = self.db.metrics_raw.find({"task_id": task_id, "node_id": node_id, "failed": {"$ne": True}})
    acc: dict[str, float] = {}
    cnt: dict[str, int] = {}
    async for m in cur:
        for k, v in (m.get("metrics") or {}).items():
            try:
                x = float(v)
            except Exception:
                continue
            acc[k] = acc.get(k, 0.0) + x
            cnt[k] = cnt.get(k, 0) + 1

    out = {k: (acc[k] / max(1, cnt[k])) for k in acc} if mode == "mean" else {k: acc[k] for k in acc}

    await self.db.tasks.update_one(
        {"id": task_id, "graph.nodes.node_id": node_id},
        {"$set": {"graph.nodes.$.stats": out, "graph.nodes.$.stats_cached_at": self.clock.now_dt()}},
    )
    return {"ok": True, "mode": mode, "stats": out}

Functions