Skip to content

This guide shows you how to connect pipelines using publish and subscribe operators. You’ll learn to split event streams for parallel processing and merge multiple sources into a single pipeline.

The publish operator sends events to a named channel (topic) on a node. The subscribe operator receives events from that channel. Multiple subscribers can receive the same events, and multiple publishers can write to the same topic.

// Publisher pipeline
from_file "events.json"
publish "events"

A separate pipeline subscribes to receive these events:

// Subscriber pipeline
subscribe "events"
to_splunk "https://splunk:8088", hec_token=secret("HEC_TOKEN")

Send the same events to multiple destinations by having multiple subscribers:

// Pipeline 1: Ingest and publish
from_file "/var/log/*.json", watch=true
publish "logs"

One subscriber archives events to storage:

// Pipeline 2: Archive to storage
subscribe "logs"
import

Another forwards only high-severity events to a SIEM:

// Pipeline 3: Forward to SIEM
subscribe "logs"
where severity in ["high", "critical"]
to_splunk "https://splunk:8088", hec_token=secret("HEC_TOKEN")

A third sends authentication failures to a dedicated alerting channel:

// Pipeline 4: Real-time alerting
subscribe "logs"
where event_type == "auth" and outcome == "failure"
to_kafka "alerts"

All subscriber pipelines receive the same events independently.

Route events to different topics based on content:

from_file "eve.json" {
read_suricata
}
publish f"suricata.{event_type}"

Subscribers can then listen to specific event types:

// Only DNS events
subscribe "suricata.dns"

Or subscribe to alerts only:

// Only alert events
subscribe "suricata.alert"

Combine multiple sources into a single stream by publishing to the same topic:

// Pipeline 1: Zeek logs
from_file "/var/log/zeek/*.log", watch=true {
read_zeek_tsv
}
publish "network"

A second pipeline publishes Suricata alerts to the same topic:

// Pipeline 2: Suricata alerts
from_file "/var/log/suricata/eve.json", watch=true {
read_suricata
}
publish "network"

A third pipeline consumes the merged stream:

// Pipeline 3: Consume merged stream
subscribe "network"
import

The subscriber receives events from both Zeek and Suricata in a single stream.

A single subscriber can listen to multiple topics:

subscribe "alerts", "notices", "critical"
to_kafka "all-priority-events"

Use fork with publish to send copies of events while continuing the main pipeline:

from_file "events.json"
fork {
publish "raw-events"
}
// Continue processing in main pipeline
where severity >= "high"
import

Subscribers propagate back pressure to publishers. If a subscribing pipeline can’t keep up, publishers slow down to match, preventing data loss.

Pipelines not visible on the overview page at app.tenzir.com drop data instead of slowing publishers. This prevents slow ad-hoc queries from blocking production pipelines.

Last updated: