Recipe: Ingest HTTP Logs

Start an HTTP ingest endpoint, send NDJSON logs/events, and write compact decisions.

Use HTTP input when applications can POST log or event batches to a local BlazeRules listener.

Start the listener

blazerules_agent \
  --rules rules.yaml \
  --input http \
  --host 127.0.0.1 \
  --port 9480 \
  --batch-size 4096 \
  --flush-ms 50 \
  --output ndjson \
  --output-path decisions.ndjson \
  --service payments-api \
  --source http-json \
  --dedupe-key event_id \
  --dedupe-ttl-seconds 86400
Python equivalent: start the same listener
import subprocess

agent = subprocess.Popen([
    "blazerules_agent",
    "--rules", "rules.yaml",
    "--input", "http",
    "--host", "127.0.0.1",
    "--port", "9480",
    "--batch-size", "4096",
    "--flush-ms", "50",
    "--output", "ndjson",
    "--output-path", "decisions.ndjson",
    "--service", "payments-api",
    "--source", "http-json",
    "--dedupe-key", "event_id",
    "--dedupe-ttl-seconds", "86400",
])

The agent exposes:

EndpointMethodBody
/v1/logsPOSTNDJSON: one JSON object per line.
/healthzGETHealth response for the instance.

Send events

curl -X POST http://127.0.0.1:9480/v1/logs \
  --header 'Content-Type: application/x-ndjson' \
  --data-binary $'{"event_id":"e1","message":"payment approved","amount":30}\n{"event_id":"e2","message":"payment error","amount":900}\n'
Python equivalent: post the same NDJSON
import urllib.request

payload = (
    b'{"event_id":"e1","message":"payment approved","amount":30}\n'
    b'{"event_id":"e2","message":"payment error","amount":900}\n'
)

req = urllib.request.Request(
    "http://127.0.0.1:9480/v1/logs",
    data=payload,
    headers={"Content-Type": "application/x-ndjson"},
    method="POST",
)

with urllib.request.urlopen(req) as res:
    print(res.read().decode())

The response is:

{"ok":true}

Batch Behavior

The listener calls the engine when either condition is met:

  • batch_size records have accumulated.
  • flush_ms has elapsed since the last flush.

That keeps the HTTP path batch-first while still allowing low-latency partial flushes.

YAML Instance Form

instances:
  - name: payments-http
    rules: rules.yaml
    batch_size: 4096
    flush_ms: 50
    service: payments-api
    source: http-json
    input:
      type: http
      host: 127.0.0.1
      port: 9480
    output:
      type: ndjson
      path: decisions.ndjson
    dedupe:
      enabled: true
      key_fields: [event_id]
      ttl_seconds: 86400

Run it:

blazerules_agent --config agent.yaml
Python equivalent: run the config from a script
import subprocess

subprocess.run(["blazerules_agent", "--config", "agent.yaml"], check=True)

Output

Each decision is written as compact NDJSON:

{"ts_ms":1782150000000,"service":"payments-api","source":"http-json","batch_row":1,"decision":"REVIEW","score":40.0,"risk_band":"MEDIUM","winning_rule_id":"payment_error"}

Route this file to another service, tail it, archive it, or point the dashboard at it.