Basic Concepts¶
Understanding FlowKit's core concepts will help you build robust, scalable workflows.
Core Components¶
Coordinator¶
The Coordinator is the brain of FlowKit. It:
- Schedules tasks based on DAG dependencies
- Monitors worker health and task progress
- Handles task retries and failure recovery
- Manages task state transitions
- Coordinates multiple workers across different types
from flowkit import Coordinator, CoordinatorConfig
coordinator = Coordinator(
db=mongodb_instance,
cfg=CoordinatorConfig(worker_types=["indexer", "processor"])
)
Worker¶
A Worker executes individual tasks. Workers:
- Pull tasks from Kafka topics
- Execute custom handler logic
- Report progress and results back to coordinators
- Handle task cancellation and cleanup
- Support multiple roles/task types
from flowkit.worker import Worker, WorkerConfig
worker = Worker(
db=mongodb_instance,
cfg=WorkerConfig(roles=["processor"]),
handlers={"processor": MyProcessorHandler()}
)
Task Graph (DAG)¶
Tasks are organized into Directed Acyclic Graphs (DAGs) that define:
- Nodes: Individual processing steps
- Edges: Dependencies between nodes
- Fan-in/Fan-out: How data flows between stages
graph = {
"nodes": [
{"node_id": "extract", "type": "extractor", "depends_on": []},
{"node_id": "transform", "type": "transformer", "depends_on": ["extract"]},
{"node_id": "load", "type": "loader", "depends_on": ["transform"]}
],
"edges": [["extract", "transform"], ["transform", "load"]]
}
Key Concepts¶
Orchestrator-first contract¶
For input routing, the Coordinator is the single source of truth. If cmd.input_inline.input_adapter is present, the worker must use it. Otherwise, it may use a handler-suggested adapter or fall back to iter_batches.
This eliminates ambiguity and makes runs reproducible.
Handlers¶
Handlers define the actual processing logic for each task type:
from flowkit.worker.handlers.base import RoleHandler, Batch, BatchResult, FinalizeResult
class MyHandler(RoleHandler):
async def iter_batches(self, input_data) -> AsyncIterator[Batch]:
# Generate batches from input
for item in input_data:
yield Batch(batch_uid=item.id, payload=item.data)
async def process_batch(self, batch: Batch, ctx: RunContext) -> BatchResult:
# Process single batch
result = process_data(batch.payload)
return BatchResult(success=True, artifacts_ref=result)
async def finalize(self, ctx: RunContext) -> FinalizeResult:
# Clean up after all batches processed
return FinalizeResult(metrics={"total_processed": ctx.processed_count})
Artifacts¶
Artifacts are the data outputs from each processing stage:
- Stored in MongoDB with metadata
- Can be partial (streaming) or complete
- Referenced by downstream stages
- Include processing metrics and status
Input Adapters¶
Input Adapters define how data flows between stages:
{
"input_inline": {
"input_adapter": "pull.from_artifacts",
"input_args": {"from_nodes": ["upstream_node"]}
}
}
{
"input_inline": {
"input_adapter": "pull.from_artifacts.rechunk:size",
"input_args": {"from_nodes": ["upstream"], "size": 10}
}
}
Deterministic rechunking
- If
meta_list_keyis provided andmeta[meta_list_key]is a list → chunk that list tosize. - Otherwise, each artifact meta is treated as one logical item (
items=[meta]). - No heuristics over domain keys (no
skus|items|…guessing).
Aliases
from_node(single) is accepted and normalized tofrom_nodes=[...].
Empty upstream
Valid routes with no data complete normally (count=0).
Task States¶
Tasks progress through several states:
- queued: Waiting to be scheduled
- running: Currently being processed
- deferred: Temporarily paused (e.g., for retry)
- finished: Completed successfully
- failed: Terminated with error
- cancelling: Being cancelled
Flow Control Patterns¶
Fan-in Strategies¶
Control when a node starts based on dependencies:
{
"node_id": "combiner",
"depends_on": ["node1", "node2", "node3"],
"fan_in": "all"
# "fan_in": "any"
# "fan_in": "count:2"
}
Streaming vs Batch Processing¶
# Start processing as soon as first batch is available
{
"node_id": "processor",
"io": {"start_when": "first_batch"}
}
# Wait for upstream to completely finish
{
"node_id": "processor",
"io": {"start_when": "ready"} # default
}
Coordinator Functions¶
Execute logic directly in the coordinator (no worker needed):
{
"node_id": "merger",
"type": "coordinator_fn",
"io": {
"fn": "merge.generic",
"fn_args": {
"from_nodes": ["node1", "node2"],
"target": {"key": "merged_result"}
}
}
}
Error Handling¶
Retry Policies¶
{
"node_id": "flaky_task",
"retry_policy": {
"max": 3,
"backoff_sec": 300,
"permanent_on": ["bad_input", "schema_mismatch"]
}
}
Error Classification¶
Handlers can classify errors as permanent or transient:
def classify_error(self, error: Exception) -> tuple[str, bool]:
if isinstance(error, ValidationError):
return "validation_failed", True # Permanent - don't retry
else:
return "temporary_failure", False # Transient - retry
Worker core also normalizes common configuration errors:
- unknown adapter → bad_input_adapter (permanent), bad/missing args → bad_input_args (permanent).
Message Flow¶
- Coordinator publishes task commands to Kafka topics
- Workers consume from role-specific topics (e.g.,
cmd.processor.v1) - Workers publish status updates to status topics
- Coordinator consumes status updates and updates task state
- Artifacts are stored in MongoDB for inter-stage communication
Scalability Patterns¶
Horizontal Scaling¶
- Run multiple coordinators (they coordinate through MongoDB)
- Run multiple workers of the same type for parallel processing
- Workers auto-discover tasks and distribute load
Partitioning¶
- Use Kafka partitioning for parallel processing
- Workers process partitions independently
- Results are merged by downstream stages
Resource Management¶
- Configure worker capacity limits
- Set concurrency limits per worker type
- Use backpressure mechanisms for flow control