Worker API Reference¶
Worker Class¶
flowkit.worker.runner.Worker ¶
Worker(*, db, cfg: WorkerConfig | None = None, clock: Clock | None = None, roles: list[str] | None = None, handlers: dict[str, RoleHandler] | None = None)
Stream-aware worker with cooperative cancellation and resilient batching. - Kafka I/O (producers/consumers per role) - Discovery (TASK_DISCOVER → TASK_SNAPSHOT) - Control-plane CANCEL via signals topic - DB-backed state for resume/takeover
Source code in src/flowkit/worker/runner.py
Functions¶
start
async
¶
Source code in src/flowkit/worker/runner.py
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | |
stop
async
¶
Source code in src/flowkit/worker/runner.py
Handlers¶
flowkit.worker.handlers.base.RoleHandler ¶
Functions¶
load_input
async
¶
Return any structure needed later by iter_batches.
Important: if the Coordinator specifies input_inline.input_adapter,
the worker selects and runs that adapter. Data returned from
load_input must not override the explicitly requested adapter or
its arguments.
Source code in src/flowkit/worker/handlers/base.py
iter_batches
async
¶
Yield batches from the input when no adapter is selected.
If an adapter is specified by the Coordinator, the worker will not call
iter_batches and will stream via the selected pull adapter instead.
Source code in src/flowkit/worker/handlers/base.py
flowkit.worker.handlers.base.Batch ¶
Bases: BaseModel
flowkit.worker.handlers.base.BatchResult ¶
Bases: BaseModel
flowkit.worker.handlers.base.FinalizeResult ¶
Bases: BaseModel
Worker Context¶
flowkit.worker.context ¶
Classes¶
RunContext ¶
RunContext(*, cancel_flag: Event, cancel_meta: dict[str, Any], artifacts_writer, clock: Clock, task_id: str, node_id: str, attempt_epoch: int, worker_id: str)
Cancellation-aware runtime utilities for handlers: - shared cancel flag + meta (reason, deadline ts) - cancellable awaits - subprocess group termination with escalation - resource cleanup registry
Source code in src/flowkit/worker/context.py
Worker State¶
flowkit.worker.state ¶
Classes¶
LocalStateManager ¶
DB-backed state manager for crash-resume and cross-host takeover.
Source code in src/flowkit/worker/state.py
Functions¶
refresh
async
¶
read_active ¶
write_active
async
¶
Atomically persist active run to DB and update cache.
Source code in src/flowkit/worker/state.py
Artifacts¶
flowkit.worker.artifacts ¶
Behavior contract (summary)¶
- Adapter precedence:
cmd.input_inline.input_adapter(Coordinator) → handler suggestion →iter_batchesfallback. - Validation: unknown adapter →
bad_input_adapter(permanent); bad args →bad_input_args(permanent). - Aliases:
from_nodeis accepted and normalized tofrom_nodes=[...]. - Rechunk: with
meta_list_key(list) chunk that list; otherwise each artifact meta is a single logical item.