Streaming & IO

Use blazerules_io for Kafka, Debezium CDC, binary decoders, local files, and s3:// reads.

blazerules_io adds streaming sources/sinks and binary decoders. The release wheel and default source build include it, while custom lean builds can still disable it.

📘

Check capabilities at runtime

Full builds enable IO, Kafka, Avro, and Protobuf. Custom lean builds can turn them off, so check availability before wiring production code:

BLAZERULES_IO=ON
BLAZERULES_IO_KAFKA=ON
BLAZERULES_IO_AVRO=ON
BLAZERULES_IO_PROTOBUF=ON

See Installation and Configuration Reference.

What it provides

  • Kafka source/sink through librdkafka.
  • Debezium CDC unwrap.
  • Arrow IPC frames.
  • Avro binary records.
  • Protobuf binary records with descriptor sets.
  • Local and exact-object s3:// file reads.

Binary decoders produce Arrow RecordBatch objects and call evaluate_batch directly — they do not convert through JSON.

Capability flags

Connectors and decoders depend on build flags, so check availability at runtime before using them:

import blazerules_io

print(blazerules_io.has_kafka)
print(blazerules_io.has_avro)
print(blazerules_io.has_protobuf)

Kafka

Consume a batch, evaluate it, and produce decisions. See the full walkthrough in Kafka Streaming.

import blazerules, blazerules_io

engine = blazerules.RuleEngine()
engine.load_rules("rules.yaml")

consumer = blazerules_io.KafkaConsumer(
    brokers="localhost:9092",
    group_id="blazerules-workers",
    topics=["transactions"],
    conf={"enable.auto.commit": "false"},
)
producer = blazerules_io.KafkaProducer(
    brokers="localhost:9092",
    conf={},
)

The constructor signatures are:

KafkaConsumer(brokers: str, group_id: str, topics: list[str], conf: dict[str, str] = {})
KafkaProducer(brokers: str, conf: dict[str, str] = {})

For a fully C++-driven loop, the module also exposes run_stream(engine, config).

cfg = blazerules_io.StreamRunConfig()
cfg.brokers = "localhost:9092"
cfg.group_id = "blazerules-workers"
cfg.input_topics = ["transactions"]
cfg.output_topic = "decisions"
cfg.batch_size = 8192
cfg.poll_timeout_ms = 200
cfg.flush_timeout_ms = 5000
cfg.max_messages = 0
cfg.max_batches = 0
cfg.commit_offsets = True

stats = blazerules_io.run_stream(engine, cfg)
print(stats.batches, stats.messages, stats.matched, stats.emitted, stats.eval_us)

Debezium CDC

unwrap_debezium turns Debezium change events into evaluable NDJSON (the op field defaults to __op).

ndjson = blazerules_io.unwrap_debezium(messages, op_field="__op")
result = engine.evaluate_ndjson(ndjson)

unwrap_debezium(...) returns one contiguous NDJSON bytes object. Pass that directly to RuleEngine.evaluate_ndjson(...).

Binary decoders

Each decoder turns binary frames into an Arrow RecordBatch you pass straight to evaluate_batch.

decoder = blazerules_io.ArrowIpcDecoder()
batch = decoder.decode_batch([frame_bytes])
result = engine.evaluate_batch(batch)

File readers

Read local or s3:// files into Arrow batches or NDJSON bytes.

# Iterate Arrow RecordBatches from Parquet/Arrow/CSV:
for batch in blazerules_io.read_record_batches("history/day.parquet", batch_size=65536):
    result = engine.evaluate_batch(batch)

# Read an NDJSON file (local or s3://) as bytes:
payload = blazerules_io.read_ndjson_bytes("s3://bucket/events/day.ndjson")
result = engine.evaluate_ndjson(payload)

Signatures:

read_record_batches(path: str, format: str = "auto", batch_size: int = 65536) -> list[pyarrow.RecordBatch]
read_ndjson_bytes(path: str) -> bytes

See S3 resources for AWS profile/region/endpoint configuration.

Where to go next