Skip to content

Basic Concepts

Understanding FlowKit's core concepts will help you build robust, scalable workflows.

Core Components

Coordinator

The Coordinator is the brain of FlowKit. It:

  • Schedules tasks based on DAG dependencies
  • Monitors worker health and task progress
  • Handles task retries and failure recovery
  • Manages task state transitions
  • Coordinates multiple workers across different types
from flowkit import Coordinator, CoordinatorConfig

coordinator = Coordinator(
    db=mongodb_instance,
    cfg=CoordinatorConfig(worker_types=["indexer", "processor"])
)

Worker

A Worker executes individual tasks. Workers:

  • Pull tasks from Kafka topics
  • Execute custom handler logic
  • Report progress and results back to coordinators
  • Handle task cancellation and cleanup
  • Support multiple roles/task types
from flowkit.worker import Worker, WorkerConfig

worker = Worker(
    db=mongodb_instance,
    cfg=WorkerConfig(roles=["processor"]),
    handlers={"processor": MyProcessorHandler()}
)

Task Graph (DAG)

Tasks are organized into Directed Acyclic Graphs (DAGs) that define:

  • Nodes: Individual processing steps
  • Edges: Dependencies between nodes
  • Fan-in/Fan-out: How data flows between stages
graph = {
    "nodes": [
        {"node_id": "extract", "type": "extractor", "depends_on": []},
        {"node_id": "transform", "type": "transformer", "depends_on": ["extract"]},
        {"node_id": "load", "type": "loader", "depends_on": ["transform"]}
    ],
    "edges": [["extract", "transform"], ["transform", "load"]]
}

Key Concepts

Orchestrator-first contract

For input routing, the Coordinator is the single source of truth. If cmd.input_inline.input_adapter is present, the worker must use it. Otherwise, it may use a handler-suggested adapter or fall back to iter_batches.

This eliminates ambiguity and makes runs reproducible.

Handlers

Handlers define the actual processing logic for each task type:

from flowkit.worker.handlers.base import RoleHandler, Batch, BatchResult, FinalizeResult

class MyHandler(RoleHandler):
    async def iter_batches(self, input_data) -> AsyncIterator[Batch]:
        # Generate batches from input
        for item in input_data:
            yield Batch(batch_uid=item.id, payload=item.data)

    async def process_batch(self, batch: Batch, ctx: RunContext) -> BatchResult:
        # Process single batch
        result = process_data(batch.payload)
        return BatchResult(success=True, artifacts_ref=result)

    async def finalize(self, ctx: RunContext) -> FinalizeResult:
        # Clean up after all batches processed
        return FinalizeResult(metrics={"total_processed": ctx.processed_count})

Artifacts

Artifacts are the data outputs from each processing stage:

  • Stored in MongoDB with metadata
  • Can be partial (streaming) or complete
  • Referenced by downstream stages
  • Include processing metrics and status

Input Adapters

Input Adapters define how data flows between stages:

{
    "input_inline": {
    "input_adapter": "pull.from_artifacts",
    "input_args": {"from_nodes": ["upstream_node"]}
    }
}
{
    "input_inline": {
    "input_adapter": "pull.from_artifacts.rechunk:size",
    "input_args": {"from_nodes": ["upstream"], "size": 10}
    }
}

Deterministic rechunking

  • If meta_list_key is provided and meta[meta_list_key] is a list → chunk that list to size.
  • Otherwise, each artifact meta is treated as one logical item (items=[meta]).
  • No heuristics over domain keys (no skus|items|… guessing).

Aliases

  • from_node (single) is accepted and normalized to from_nodes=[...].

Empty upstream

Valid routes with no data complete normally (count=0).

Task States

Tasks progress through several states:

  • queued: Waiting to be scheduled
  • running: Currently being processed
  • deferred: Temporarily paused (e.g., for retry)
  • finished: Completed successfully
  • failed: Terminated with error
  • cancelling: Being cancelled

Flow Control Patterns

Fan-in Strategies

Control when a node starts based on dependencies:

{
    "node_id": "combiner",
    "depends_on": ["node1", "node2", "node3"],
    "fan_in": "all"
    # "fan_in": "any"
    # "fan_in": "count:2"
}

Streaming vs Batch Processing

# Start processing as soon as first batch is available
{
    "node_id": "processor",
    "io": {"start_when": "first_batch"}
}

# Wait for upstream to completely finish
{
    "node_id": "processor",
    "io": {"start_when": "ready"}  # default
}

Coordinator Functions

Execute logic directly in the coordinator (no worker needed):

{
    "node_id": "merger",
    "type": "coordinator_fn",
    "io": {
        "fn": "merge.generic",
        "fn_args": {
            "from_nodes": ["node1", "node2"],
            "target": {"key": "merged_result"}
        }
    }
}

Error Handling

Retry Policies

{
    "node_id": "flaky_task",
    "retry_policy": {
        "max": 3,
        "backoff_sec": 300,
        "permanent_on": ["bad_input", "schema_mismatch"]
    }
}

Error Classification

Handlers can classify errors as permanent or transient:

def classify_error(self, error: Exception) -> tuple[str, bool]:
    if isinstance(error, ValidationError):
        return "validation_failed", True  # Permanent - don't retry
    else:
        return "temporary_failure", False  # Transient - retry

Worker core also normalizes common configuration errors: - unknown adapter → bad_input_adapter (permanent), bad/missing args → bad_input_args (permanent).

Message Flow

  1. Coordinator publishes task commands to Kafka topics
  2. Workers consume from role-specific topics (e.g., cmd.processor.v1)
  3. Workers publish status updates to status topics
  4. Coordinator consumes status updates and updates task state
  5. Artifacts are stored in MongoDB for inter-stage communication

Scalability Patterns

Horizontal Scaling

  • Run multiple coordinators (they coordinate through MongoDB)
  • Run multiple workers of the same type for parallel processing
  • Workers auto-discover tasks and distribute load

Partitioning

  • Use Kafka partitioning for parallel processing
  • Workers process partitions independently
  • Results are merged by downstream stages

Resource Management

  • Configure worker capacity limits
  • Set concurrency limits per worker type
  • Use backpressure mechanisms for flow control

Next Steps