Streaming & IO
Use blazerules_io for Kafka, Debezium CDC, binary decoders, local files, and s3:// reads.
blazerules_io adds streaming sources/sinks and binary decoders. The release wheel and default source build include it, while custom lean builds can still disable it.
Check capabilities at runtimeFull builds enable IO, Kafka, Avro, and Protobuf. Custom lean builds can turn them off, so check availability before wiring production code:
BLAZERULES_IO=ON BLAZERULES_IO_KAFKA=ON BLAZERULES_IO_AVRO=ON BLAZERULES_IO_PROTOBUF=ONSee Installation and Configuration Reference.
What it provides
- Kafka source/sink through librdkafka.
- Debezium CDC unwrap.
- Arrow IPC frames.
- Avro binary records.
- Protobuf binary records with descriptor sets.
- Local and exact-object
s3://file reads.
Binary decoders produce Arrow RecordBatch objects and call evaluate_batch directly — they do not convert through JSON.
Capability flags
Connectors and decoders depend on build flags, so check availability at runtime before using them:
import blazerules_io
print(blazerules_io.has_kafka)
print(blazerules_io.has_avro)
print(blazerules_io.has_protobuf)Kafka
Consume a batch, evaluate it, and produce decisions. See the full walkthrough in Kafka Streaming.
import blazerules, blazerules_io
engine = blazerules.RuleEngine()
engine.load_rules("rules.yaml")
consumer = blazerules_io.KafkaConsumer(
brokers="localhost:9092",
group_id="blazerules-workers",
topics=["transactions"],
conf={"enable.auto.commit": "false"},
)
producer = blazerules_io.KafkaProducer(
brokers="localhost:9092",
conf={},
)The constructor signatures are:
KafkaConsumer(brokers: str, group_id: str, topics: list[str], conf: dict[str, str] = {})
KafkaProducer(brokers: str, conf: dict[str, str] = {})For a fully C++-driven loop, the module also exposes run_stream(engine, config).
cfg = blazerules_io.StreamRunConfig()
cfg.brokers = "localhost:9092"
cfg.group_id = "blazerules-workers"
cfg.input_topics = ["transactions"]
cfg.output_topic = "decisions"
cfg.batch_size = 8192
cfg.poll_timeout_ms = 200
cfg.flush_timeout_ms = 5000
cfg.max_messages = 0
cfg.max_batches = 0
cfg.commit_offsets = True
stats = blazerules_io.run_stream(engine, cfg)
print(stats.batches, stats.messages, stats.matched, stats.emitted, stats.eval_us)Debezium CDC
unwrap_debezium turns Debezium change events into evaluable NDJSON (the op field defaults to __op).
ndjson = blazerules_io.unwrap_debezium(messages, op_field="__op")
result = engine.evaluate_ndjson(ndjson)unwrap_debezium(...) returns one contiguous NDJSON bytes object. Pass that directly to RuleEngine.evaluate_ndjson(...).
Binary decoders
Each decoder turns binary frames into an Arrow RecordBatch you pass straight to evaluate_batch.
decoder = blazerules_io.ArrowIpcDecoder()
batch = decoder.decode_batch([frame_bytes])
result = engine.evaluate_batch(batch)File readers
Read local or s3:// files into Arrow batches or NDJSON bytes.
# Iterate Arrow RecordBatches from Parquet/Arrow/CSV:
for batch in blazerules_io.read_record_batches("history/day.parquet", batch_size=65536):
result = engine.evaluate_batch(batch)
# Read an NDJSON file (local or s3://) as bytes:
payload = blazerules_io.read_ndjson_bytes("s3://bucket/events/day.ndjson")
result = engine.evaluate_ndjson(payload)Signatures:
read_record_batches(path: str, format: str = "auto", batch_size: int = 65536) -> list[pyarrow.RecordBatch]
read_ndjson_bytes(path: str) -> bytesSee S3 resources for AWS profile/region/endpoint configuration.