Recipe: Run the Native Kafka Loop

Configure StreamRunConfig and run_stream for Kafka microbatch evaluation.

For maximum throughput, keep the Kafka consume → evaluate → produce loop inside the IO module.

import blazerules
import blazerules_io

assert blazerules_io.has_kafka

engine = blazerules.RuleEngine()
engine.load_rules("rules.yaml")

cfg = blazerules_io.StreamRunConfig()
cfg.brokers = "localhost:9092"
cfg.group_id = "blazerules-workers"
cfg.input_topics = ["transactions"]
cfg.output_topic = "decisions"
cfg.batch_size = 8192
cfg.poll_timeout_ms = 200
cfg.flush_timeout_ms = 5000
cfg.commit_offsets = True

stats = blazerules_io.run_stream(engine, cfg)
print(stats.batches, stats.messages, stats.matched, stats.emitted, stats.eval_us)
CLI equivalent: run the same loop as a script
python kafka_loop.py

kafka_loop.py contains the Python snippet above. The native loop is exposed through the Python API so it can share an already configured RuleEngine.

Set limits for local tests:

cfg.max_messages = 1_000_000
cfg.max_batches = 100
CLI equivalent: pass limits through environment
MAX_MESSAGES=1000000 MAX_BATCHES=100 python kafka_loop.py

Read those environment variables in your script and assign them to cfg.max_messages / cfg.max_batches.

Production guidance:

  • Key messages by entity so windows stay partition-affine.
  • Commit offsets only after decisions are emitted.
  • Use OutputDetail.DECISIONS unless you need per-rule bitmasks.
  • Keep rules hot-reloaded between batches, not mid-batch.