Tutorial: Kafka Streaming
Consume a batch of records from Kafka, evaluate them with BlazeRules, and produce the decisions to an output topic.
Goal: run BlazeRules as a streaming operator — consume records from a Kafka topic in batches, evaluate them, and publish the decisions to an output topic.
Requires the IO moduleThe release wheel and default source build include
blazerules_iowith Kafka enabled. Confirm at runtime withblazerules_io.has_kafka, especially if you are using a custom lean build. See Streaming & IO.
Prerequisites
blazerulesandblazerules_ioonPYTHONPATH, built with IO + Kafka.- A running Kafka broker and an input topic with JSON records.
- A
rules.yaml.
Step 1 — Load the engine and check Kafka
import json
import blazerules, blazerules_io
assert blazerules_io.has_kafka, "Kafka support is not available in this build"
engine = blazerules.RuleEngine()
engine.load_rules("rules.yaml")Step 2 — Create a consumer and producer
consumer = blazerules_io.KafkaConsumer(
brokers="localhost:9092",
group_id="blazerules-workers",
topics=["transactions"],
conf={"enable.auto.commit": "false"},
)
producer = blazerules_io.KafkaProducer(
brokers="localhost:9092",
conf={},
)Step 3 — Consume, evaluate, produce
Poll a batch, evaluate it in one call, then publish per-record decisions.
while True:
messages = consumer.poll_batch(max_messages=8192, timeout_ms=200)
if not messages:
continue
payload = b"\n".join(messages) + b"\n"
result = engine.evaluate_ndjson(payload)
for i, decision in enumerate(result.decisions):
producer.produce("decisions", value=decision_record(i, decision))
producer.flush()
consumer.commit()
Prefer the native loop for throughputFor maximum throughput the IO module also exposes
run_stream(engine, config), which runs the consume → evaluate → produce loop inside the native runtime. See Streaming & IO for theStreamRunConfigfields.
Metadata-preserving variant
Use poll_records(...) when you need topic, partition, offset, key, and timestamp metadata in the output.
records = consumer.poll_records(max_messages=8192, timeout_ms=200)
payload = b"\n".join(r.value for r in records) + b"\n"
result = engine.evaluate_ndjson(payload)
for i, decision in enumerate(result.decisions):
source = records[i]
out = {
"source_topic": source.topic,
"partition": source.partition,
"offset": source.offset,
"decision": decision,
"score": result.scores[i],
"winning_rule_id": result.winning_rule_ids[i],
}
producer.produce("decisions", value=json.dumps(out), key=source.key)Expected output
Each input record yields one decision record on the output topic. A consumer on decisions sees a steady stream keyed/labelled by the engine's verdict (APPROVE, REVIEW, BLOCK, …).
Validation
- The output topic receives exactly
result.n_recordsdecisions per processed batch. consumer.commit()advances the group offset only after a batch is produced — at-least-once delivery.- Watch
result.messages_skippedfor malformed input.
Cleanup
Close the consumer and flush the producer on shutdown so no decisions are lost.