Workers¶
Workers are the execution engine of FlowKit. They execute handler code, stream batches, and report status back to the Coordinator.
This page documents the orchestrator-first contract, input adapter precedence, deterministic rechunking, and error visibility that make worker behavior predictable and easy to operate.
Overview¶
A Worker:
- Executes tasks via user-provided
RoleHandlers. - Streams input either through input adapters (pulling from upstream artifacts) or via the handler’s own
iter_batchesfallback. - Reports progress to Kafka status topics.
- Handles cancellation quickly and cooperatively.
- Persists artifacts and emits metrics.
Quick start¶
from flowkit.worker.runner import Worker, WorkerConfig
from flowkit.worker.handlers.base import RoleHandler, Batch, BatchResult
class MyHandler(RoleHandler):
role = "processor"
async def iter_batches(self, loaded):
yield Batch(batch_uid="b1", payload={"items": [1, 2, 3]})
async def process_batch(self, batch, ctx):
return BatchResult(success=True, metrics={"count": len(batch.payload.get("items", []))})
worker = Worker(db=my_db, cfg=WorkerConfig(roles=["processor"]), handlers={"processor": MyHandler()})
await worker.start()
Input routing: the orchestrator-first contract¶
There is one source of truth for the input route: the Coordinator.
Adapter selection precedence
- If the Coordinator provides
cmd.input_inline.input_adapter→ the worker must use it. - Else, the worker may use a handler-proposed adapter from
load_input()if it is known. - Else, the worker falls back to
handler.iter_batches().
Additional rules:
- Unknown adapter in cmd → permanent
TASK_FAILEDwith reasonbad_input_adapter. - Missing/invalid adapter args → permanent
TASK_FAILEDwith reasonbad_input_args(no hiddenTypeError). - We accept the alias
from_nodeand normalize it tofrom_nodes=[...]. - The core is domain-agnostic: it never guesses schema keys like
skusoritems.
Example: Coordinator wins over handler¶
{
"io": {
"start_when": "first_batch",
"input_inline": {
"input_adapter": "pull.from_artifacts.rechunk:size",
"input_args": {"from_nodes": ["u"], "size": 2, "meta_list_key": "skus"}
}
}
}
load_input(), it is ignored and the route above is used.
Adapters¶
pull.from_artifacts¶
Pull partial upstream artifacts as they are produced.
{
"io": {
"input_inline": {
"input_adapter": "pull.from_artifacts",
"input_args": {"from_nodes": ["u"], "poll_ms": 50}
}
}
}
pull.from_artifacts.rechunk:size¶
Deterministically reshape upstream artifact meta to fixed-size slices.
Key semantics (deterministic, schema-agnostic)¶
meta_list_keyis optional.- If provided and
meta[meta_list_key]is a list → chunk that list. - If not provided (or key is not a list) → treat each artifact meta as a single logical item (
items=[meta]). - No heuristics over domain keys. No guessing
skus|items|…. - In strict installations, you may require
meta_list_keyvia a product policy flag (outside of worker core).
{
"io": {
"input_inline": {
"input_adapter": "pull.from_artifacts.rechunk:size",
"input_args": {
"from_node": "u",
"size": 3,
"poll_ms": 25,
"meta_list_key": "skus"
}
}
}
}
Empty upstream is not an error. If the route/args are valid but there is no data, the node finishes normally (
count=0).
Handler interface¶
from typing import AsyncIterator
from flowkit.worker.handlers.base import RoleHandler, Batch, BatchResult, FinalizeResult
class MyRole(RoleHandler):
role = "probe"
async def init(self, run_info: dict): # optional
pass
async def load_input(self, input_ref, input_inline):
# May return a *suggestion* for adapter when cmd doesn't provide one
return {"input_ref": input_ref or {}, "input_inline": input_inline or {}}
async def iter_batches(self, loaded) -> AsyncIterator[Batch]:
# Used only when no adapter is selected
yield Batch(batch_uid="b1", payload={"items": [1, 2]})
async def process_batch(self, batch: Batch, ctx) -> BatchResult:
return BatchResult(success=True, metrics={"count": len(batch.payload.get("items", []))})
async def finalize(self, ctx) -> FinalizeResult | None:
return FinalizeResult(metrics={})
def classify_error(self, exc: BaseException) -> tuple[str, bool]:
# Default behavior in core:
# TypeError/ValueError/KeyError -> permanent ("bad_input")
# otherwise -> transient ("unexpected_error")
if isinstance(exc, (TypeError, ValueError, KeyError)):
return ("bad_input", True)
return ("unexpected_error", False)
Validation & observability¶
The worker validates adapter names and arguments before streaming:
- Unknown adapter →
TASK_FAILED(bad_input_adapter, permanent). - Bad/missing args →
TASK_FAILED(bad_input_args, permanent). - Structured logs include:
adapter.selected(name, args, source=cmd|handler)handler_conflict_ignoredwhen cmd overrides handler suggestion
Cancellation¶
Workers check ctx.cancel_flag between batches and within long operations and transition to cancelling quickly on:
- Coordinator CANCEL signal
- DB cancel flags
- Lease/heartbeat policy
- Graceful shutdown
Metrics & artifacts¶
Handlers return metrics with each BatchResult. The worker upserts partial artifact rows and marks completion at the end, attaching final metrics.
Configuration (essentials)¶
from flowkit.worker.runner import WorkerConfig
cfg = WorkerConfig(
kafka_bootstrap="kafka:9092",
roles=["indexer", "probe"],
lease_ttl_sec=60,
hb_interval_sec=20,
pull_poll_ms_default=300,
pull_empty_backoff_ms_max=4000,
)
Load with overrides:
Best practices¶
- Keep handlers stateless; put state/offsets in artifacts or DB.
- Classify errors carefully; return permanent for configuration mistakes.
- Don’t encode domain semantics in core—use
meta_list_keywhere needed. - Prefer
start_when=first_batchfor true streaming, otherwise wait for upstream completion.