Tutorial: Kafka Streaming

Consume a batch of records from Kafka, evaluate them with BlazeRules, and produce the decisions to an output topic.

Goal: run BlazeRules as a streaming operator — consume records from a Kafka topic in batches, evaluate them, and publish the decisions to an output topic.

🚧

Requires the IO module

The release wheel and default source build include blazerules_io with Kafka enabled. Confirm at runtime with blazerules_io.has_kafka, especially if you are using a custom lean build. See Streaming & IO.

Prerequisites

  • blazerules and blazerules_io on PYTHONPATH, built with IO + Kafka.
  • A running Kafka broker and an input topic with JSON records.
  • A rules.yaml.

Step 1 — Load the engine and check Kafka

import json
import blazerules, blazerules_io

assert blazerules_io.has_kafka, "Kafka support is not available in this build"

engine = blazerules.RuleEngine()
engine.load_rules("rules.yaml")

Step 2 — Create a consumer and producer

consumer = blazerules_io.KafkaConsumer(
    brokers="localhost:9092",
    group_id="blazerules-workers",
    topics=["transactions"],
    conf={"enable.auto.commit": "false"},
)

producer = blazerules_io.KafkaProducer(
    brokers="localhost:9092",
    conf={},
)

Step 3 — Consume, evaluate, produce

Poll a batch, evaluate it in one call, then publish per-record decisions.

while True:
    messages = consumer.poll_batch(max_messages=8192, timeout_ms=200)
    if not messages:
        continue

    payload = b"\n".join(messages) + b"\n"
    result = engine.evaluate_ndjson(payload)

    for i, decision in enumerate(result.decisions):
        producer.produce("decisions", value=decision_record(i, decision))

    producer.flush()
    consumer.commit()
📘

Prefer the native loop for throughput

For maximum throughput the IO module also exposes run_stream(engine, config), which runs the consume → evaluate → produce loop inside the native runtime. See Streaming & IO for the StreamRunConfig fields.

Metadata-preserving variant

Use poll_records(...) when you need topic, partition, offset, key, and timestamp metadata in the output.

records = consumer.poll_records(max_messages=8192, timeout_ms=200)
payload = b"\n".join(r.value for r in records) + b"\n"
result = engine.evaluate_ndjson(payload)

for i, decision in enumerate(result.decisions):
    source = records[i]
    out = {
        "source_topic": source.topic,
        "partition": source.partition,
        "offset": source.offset,
        "decision": decision,
        "score": result.scores[i],
        "winning_rule_id": result.winning_rule_ids[i],
    }
    producer.produce("decisions", value=json.dumps(out), key=source.key)

Expected output

Each input record yields one decision record on the output topic. A consumer on decisions sees a steady stream keyed/labelled by the engine's verdict (APPROVE, REVIEW, BLOCK, …).

Validation

  • The output topic receives exactly result.n_records decisions per processed batch.
  • consumer.commit() advances the group offset only after a batch is produced — at-least-once delivery.
  • Watch result.messages_skipped for malformed input.

Cleanup

Close the consumer and flush the producer on shutdown so no decisions are lost.

Where to go next