Thin wrapper around AIOKafka with a minimal reply correlator (by corr_id).
Source code in src/flowkit/bus/kafka.py
| def __init__(self, cfg: CoordinatorConfig) -> None:
self.cfg = cfg
self._producer: AIOKafkaProducer | None = None
self._consumers: list[AIOKafkaConsumer] = []
self._replies: dict[str, list[Envelope]] = {}
self._reply_events: dict[str, asyncio.Event] = {}
self.bootstrap = cfg.kafka_bootstrap
|