Skip to content

Worker API Reference

Worker Class

flowkit.worker.runner.Worker

Worker(*, db, cfg: WorkerConfig | None = None, clock: Clock | None = None, roles: list[str] | None = None, handlers: dict[str, RoleHandler] | None = None)

Stream-aware worker with cooperative cancellation and resilient batching. - Kafka I/O (producers/consumers per role) - Discovery (TASK_DISCOVER → TASK_SNAPSHOT) - Control-plane CANCEL via signals topic - DB-backed state for resume/takeover

Source code in src/flowkit/worker/runner.py
def __init__(
    self,
    *,
    db,
    cfg: WorkerConfig | None = None,
    clock: Clock | None = None,
    roles: list[str] | None = None,
    handlers: dict[str, RoleHandler] | None = None,
) -> None:
    self.db = db
    self.cfg = copy.deepcopy(cfg) if cfg is not None else WorkerConfig.load()
    if roles:
        self.cfg.roles = list(roles)
    self.clock: Clock = clock or SystemClock()

    # identity
    self.worker_id = self.cfg.worker_id or f"w-{uuid.uuid4().hex[:8]}"
    self.worker_version = self.cfg.worker_version

    # adapters registry
    self.input_adapters = build_input_adapters(db=db, clock=self.clock, cfg=self.cfg)

    # handlers registry (user injects custom handlers; Echo kept as example)
    self.handlers: dict[str, RoleHandler] = handlers or {}
    if "echo" in (self.cfg.roles or []):
        self.handlers.setdefault("echo", EchoHandler())

    # Kafka
    self._producer: AIOKafkaProducer | None = None
    self._cmd_consumers: dict[str, AIOKafkaConsumer] = {}
    self._query_consumer: AIOKafkaConsumer | None = None
    self._signals_consumer: AIOKafkaConsumer | None = None

    # run-state
    self._busy = False
    self._busy_lock = asyncio.Lock()
    self._cancel_flag = asyncio.Event()
    self._cancel_meta: dict[str, Any] = {"reason": None, "deadline_ts_ms": None}
    self._stopping = False

    self.state = LocalStateManager(db=self.db, clock=self.clock, worker_id=self.worker_id)
    self.active: ActiveRun | None = None

    # dedup of command envelopes
    self._dedup: OrderedDict[str, int] = OrderedDict()
    self._dedup_lock = asyncio.Lock()

    self._main_tasks: set[asyncio.Task] = set()

    # logging
    self.log = get_logger("worker")
    bind_context(role="worker", worker_id=self.worker_id, version=self.worker_version)
    self.log.debug("worker.init", event="worker.init", roles=self.cfg.roles, version=self.worker_version)

Functions

start async

start() -> None
Source code in src/flowkit/worker/runner.py
async def start(self) -> None:
    # Helpful config print (silent by default unless tests enable stdout)
    cfg_dump: Any
    if hasattr(self.cfg, "model_dump"):  # pydantic v2
        cfg_dump = self.cfg.model_dump()
    elif hasattr(self.cfg, "dict"):  # pydantic v1
        cfg_dump = self.cfg.dict()
    else:
        cfg_dump = getattr(self.cfg, "__dict__", str(self.cfg))
    self.log.debug("worker.start", event="worker.start", cfg=cfg_dump)

    await self._ensure_indexes()
    self._producer = AIOKafkaProducer(
        bootstrap_servers=self.cfg.kafka_bootstrap, value_serializer=dumps, enable_idempotence=True
    )
    await self._producer.start()

    await self.state.refresh()
    self.active = self.state.read_active()
    if self.active and self.active.step_type not in self.cfg.roles:
        self.active = None
        await self.state.write_active(None)

    await self._send_announce(
        EventKind.WORKER_ONLINE,
        extra={
            "worker_id": self.worker_id,
            "type": ",".join(self.cfg.roles),
            "capabilities": {"roles": self.cfg.roles},
            "version": self.worker_version,
            "capacity": {"tasks": 1},
            "resume": self.active.__dict__ if self.active else None,
        },
    )

    # command consumers per role
    for role in self.cfg.roles:
        topic = self.cfg.topic_cmd(role)
        c = AIOKafkaConsumer(
            topic,
            bootstrap_servers=self.cfg.kafka_bootstrap,
            value_deserializer=loads,
            enable_auto_commit=False,
            auto_offset_reset="latest",
            group_id=f"workers.{role}.v1",
        )
        await c.start()
        self._cmd_consumers[role] = c
        self._spawn(self._cmd_loop(role, c))

    # query consumer (discovery)
    self._query_consumer = AIOKafkaConsumer(
        self.cfg.topic_query,
        bootstrap_servers=self.cfg.kafka_bootstrap,
        value_deserializer=loads,
        enable_auto_commit=False,
        auto_offset_reset="latest",
        group_id="workers.query.v1",
    )
    await self._query_consumer.start()
    self._spawn(self._query_loop(self._query_consumer))

    # signals consumer (control plane; unique group per worker)
    self._signals_consumer = AIOKafkaConsumer(
        self.cfg.topic_signals,
        bootstrap_servers=self.cfg.kafka_bootstrap,
        value_deserializer=loads,
        enable_auto_commit=False,
        auto_offset_reset="latest",
        group_id=f"workers.signals.{self.worker_id}",
    )
    await self._signals_consumer.start()
    self._spawn(self._signals_loop(self._signals_consumer))

    # periodic announce
    self._spawn(self._periodic_announce())

    if self.active:
        self.log.debug(
            "worker.recovery_present",
            event="worker.recovery_present",
            task_id=self.active.task_id,
            node_id=self.active.node_id,
        )

    self.log.debug("worker.started", event="worker.started", worker_id=self.worker_id)

stop async

stop() -> None
Source code in src/flowkit/worker/runner.py
async def stop(self) -> None:
    self._stopping = True
    for t in list(self._main_tasks):
        t.cancel()
    self._main_tasks.clear()

    if self._query_consumer:
        with swallow(
            logger=self.log,
            code="worker.query_consumer.stop",
            msg="query consumer stop failed",
            level=logging.WARNING,
        ):
            await self._query_consumer.stop()
    if self._signals_consumer:
        with swallow(
            logger=self.log,
            code="worker.signals_consumer.stop",
            msg="signals consumer stop failed",
            level=logging.WARNING,
        ):
            await self._signals_consumer.stop()
    for c in self._cmd_consumers.values():
        with swallow(
            logger=self.log, code="worker.cmd_consumer.stop", msg="cmd consumer stop failed", level=logging.WARNING
        ):
            await c.stop()
    self._cmd_consumers.clear()

    if self._producer:
        with swallow(
            logger=self.log, code="worker.announce.offline", msg="announce offline failed", level=logging.WARNING
        ):
            await self._send_announce(EventKind.WORKER_OFFLINE, extra={"worker_id": self.worker_id})
        with swallow(
            logger=self.log, code="worker.producer.stop", msg="producer stop failed", level=logging.WARNING
        ):
            await self._producer.stop()
    self._producer = None

Handlers

flowkit.worker.handlers.base.RoleHandler

Functions

load_input async

load_input(input_ref: dict[str, Any] | None, input_inline: dict[str, Any] | None) -> Any

Return any structure needed later by iter_batches.

Important: if the Coordinator specifies input_inline.input_adapter, the worker selects and runs that adapter. Data returned from load_input must not override the explicitly requested adapter or its arguments.

Source code in src/flowkit/worker/handlers/base.py
async def load_input(self, input_ref: dict[str, Any] | None, input_inline: dict[str, Any] | None) -> Any:
    """
    Return any structure needed later by `iter_batches`.

    Important: if the Coordinator specifies `input_inline.input_adapter`,
    the worker selects and runs that adapter. Data returned from
    `load_input` must not override the explicitly requested adapter or
    its arguments.
    """
    return {"input_ref": input_ref or {}, "input_inline": input_inline or {}}

iter_batches async

iter_batches(loaded: Any) -> AsyncIterator[Batch]

Yield batches from the input when no adapter is selected.

If an adapter is specified by the Coordinator, the worker will not call iter_batches and will stream via the selected pull adapter instead.

Source code in src/flowkit/worker/handlers/base.py
async def iter_batches(self, loaded: Any) -> AsyncIterator[Batch]:
    """
    Yield batches from the input when no adapter is selected.

    If an adapter is specified by the Coordinator, the worker will not call
    `iter_batches` and will stream via the selected pull adapter instead.
    """
    yield Batch(batch_uid=None, payload=loaded or {})

flowkit.worker.handlers.base.Batch

Bases: BaseModel

flowkit.worker.handlers.base.BatchResult

Bases: BaseModel

flowkit.worker.handlers.base.FinalizeResult

Bases: BaseModel

Worker Context

flowkit.worker.context

Classes

RunContext

RunContext(*, cancel_flag: Event, cancel_meta: dict[str, Any], artifacts_writer, clock: Clock, task_id: str, node_id: str, attempt_epoch: int, worker_id: str)

Cancellation-aware runtime utilities for handlers: - shared cancel flag + meta (reason, deadline ts) - cancellable awaits - subprocess group termination with escalation - resource cleanup registry

Source code in src/flowkit/worker/context.py
def __init__(
    self,
    *,
    cancel_flag: asyncio.Event,
    cancel_meta: dict[str, Any],
    artifacts_writer,
    clock: Clock,
    task_id: str,
    node_id: str,
    attempt_epoch: int,
    worker_id: str,
):
    self._cancel_flag = cancel_flag
    self._cancel_meta = cancel_meta
    self.artifacts = artifacts_writer
    self.clock = clock

    self.task_id = task_id
    self.node_id = node_id
    self.attempt_epoch = attempt_epoch
    self.worker_id = worker_id

    self.kv: dict[str, Any] = {}
    self._cleanup_callbacks: list[Callable[[], Any]] = []
    self._subprocesses: list[Any] = []
    self._temp_paths: list[str] = []

Worker State

flowkit.worker.state

Classes

LocalStateManager

LocalStateManager(*, db, clock: Clock, worker_id: str)

DB-backed state manager for crash-resume and cross-host takeover.

Source code in src/flowkit/worker/state.py
def __init__(self, *, db, clock: Clock, worker_id: str) -> None:
    self.db = db
    self.clock = clock
    self.worker_id = worker_id
    self._lock = asyncio.Lock()
    self._cache: dict[str, Any] = {}
Functions
refresh async
refresh() -> None

Pull the latest state from DB into in-memory cache.

Source code in src/flowkit/worker/state.py
async def refresh(self) -> None:
    """Pull the latest state from DB into in-memory cache."""
    doc = await self.db.worker_state.find_one({"_id": self.worker_id})
    self._cache = doc or {}
read_active
read_active() -> ActiveRun | None

Read active run from cache.

Source code in src/flowkit/worker/state.py
def read_active(self) -> ActiveRun | None:
    """Read active run from cache."""
    d = (self._cache or {}).get("active_run")
    return ActiveRun(**d) if d else None
write_active async
write_active(ar: ActiveRun | None) -> None

Atomically persist active run to DB and update cache.

Source code in src/flowkit/worker/state.py
async def write_active(self, ar: ActiveRun | None) -> None:
    """Atomically persist active run to DB and update cache."""
    async with self._lock:
        payload = {
            "active_run": asdict(ar) if ar else None,
            "updated_at": self.clock.now_dt(),  # must be datetime for TTL index
        }
        await self.db.worker_state.update_one(
            {"_id": self.worker_id},
            {"$set": payload},
            upsert=True,
        )
        self._cache.update(payload)

Artifacts

flowkit.worker.artifacts


Behavior contract (summary)

  • Adapter precedence: cmd.input_inline.input_adapter (Coordinator) → handler suggestion → iter_batches fallback.
  • Validation: unknown adapter → bad_input_adapter (permanent); bad args → bad_input_args (permanent).
  • Aliases: from_node is accepted and normalized to from_nodes=[...].
  • Rechunk: with meta_list_key (list) chunk that list; otherwise each artifact meta is a single logical item.