Skip to content

Bus API Reference

Kafka Bus

flowkit.bus.kafka

Classes

KafkaBus

KafkaBus(cfg: CoordinatorConfig)

Thin wrapper around AIOKafka with a minimal reply correlator (by corr_id).

Source code in src/flowkit/bus/kafka.py
def __init__(self, cfg: CoordinatorConfig) -> None:
    self.cfg = cfg
    self._producer: AIOKafkaProducer | None = None
    self._consumers: list[AIOKafkaConsumer] = []
    self._replies: dict[str, list[Envelope]] = {}
    self._reply_events: dict[str, asyncio.Event] = {}
    self.bootstrap = cfg.kafka_bootstrap