Recipe: Run the Native Kafka Loop
Configure StreamRunConfig and run_stream for Kafka microbatch evaluation.
For maximum throughput, keep the Kafka consume → evaluate → produce loop inside the IO module.
import blazerules
import blazerules_io
assert blazerules_io.has_kafka
engine = blazerules.RuleEngine()
engine.load_rules("rules.yaml")
cfg = blazerules_io.StreamRunConfig()
cfg.brokers = "localhost:9092"
cfg.group_id = "blazerules-workers"
cfg.input_topics = ["transactions"]
cfg.output_topic = "decisions"
cfg.batch_size = 8192
cfg.poll_timeout_ms = 200
cfg.flush_timeout_ms = 5000
cfg.commit_offsets = True
stats = blazerules_io.run_stream(engine, cfg)
print(stats.batches, stats.messages, stats.matched, stats.emitted, stats.eval_us)CLI equivalent: run the same loop as a script
python kafka_loop.pykafka_loop.py contains the Python snippet above. The native loop is exposed through the Python API so it can share an already configured RuleEngine.
Set limits for local tests:
cfg.max_messages = 1_000_000
cfg.max_batches = 100CLI equivalent: pass limits through environment
MAX_MESSAGES=1000000 MAX_BATCHES=100 python kafka_loop.pyRead those environment variables in your script and assign them to cfg.max_messages / cfg.max_batches.
Production guidance:
- Key messages by entity so windows stay partition-affine.
- Commit offsets only after decisions are emitted.
- Use
OutputDetail.DECISIONSunless you need per-rule bitmasks. - Keep rules hot-reloaded between batches, not mid-batch.