This guide shows you how to connect pipelines using
publish and
subscribe operators. You’ll learn to split
event streams for parallel processing and merge multiple sources into a single
pipeline.
How publish/subscribe works
Section titled “How publish/subscribe works”The publish operator sends events to a named
channel (topic) on a node. The subscribe
operator receives events from that channel. Multiple subscribers
can receive the same events, and multiple publishers can write to the same
topic.
// Publisher pipelinefrom_file "events.json"publish "events"A separate pipeline subscribes to receive these events:
// Subscriber pipelinesubscribe "events"to_splunk "https://splunk:8088", hec_token=secret("HEC_TOKEN")Fan-out: split streams
Section titled “Fan-out: split streams”Send the same events to multiple destinations by having multiple subscribers:
Multiple subscribers on one topic
Section titled “Multiple subscribers on one topic”// Pipeline 1: Ingest and publishfrom_file "/var/log/*.json", watch=truepublish "logs"One subscriber archives events to storage:
// Pipeline 2: Archive to storagesubscribe "logs"importAnother forwards only high-severity events to a SIEM:
// Pipeline 3: Forward to SIEMsubscribe "logs"where severity in ["high", "critical"]to_splunk "https://splunk:8088", hec_token=secret("HEC_TOKEN")A third sends authentication failures to a dedicated alerting channel:
// Pipeline 4: Real-time alertingsubscribe "logs"where event_type == "auth" and outcome == "failure"to_kafka "alerts"All subscriber pipelines receive the same events independently.
Dynamic topics
Section titled “Dynamic topics”Route events to different topics based on content:
from_file "eve.json" { read_suricata}publish f"suricata.{event_type}"Subscribers can then listen to specific event types:
// Only DNS eventssubscribe "suricata.dns"Or subscribe to alerts only:
// Only alert eventssubscribe "suricata.alert"Fan-in: merge streams
Section titled “Fan-in: merge streams”Combine multiple sources into a single stream by publishing to the same topic:
// Pipeline 1: Zeek logsfrom_file "/var/log/zeek/*.log", watch=true { read_zeek_tsv}publish "network"A second pipeline publishes Suricata alerts to the same topic:
// Pipeline 2: Suricata alertsfrom_file "/var/log/suricata/eve.json", watch=true { read_suricata}publish "network"A third pipeline consumes the merged stream:
// Pipeline 3: Consume merged streamsubscribe "network"importThe subscriber receives events from both Zeek and Suricata in a single stream.
Subscribe to multiple topics
Section titled “Subscribe to multiple topics”A single subscriber can listen to multiple topics:
subscribe "alerts", "notices", "critical"to_kafka "all-priority-events"Combining with fork
Section titled “Combining with fork”Use fork with
publish to send copies of events while
continuing the main pipeline:
from_file "events.json"fork { publish "raw-events"}// Continue processing in main pipelinewhere severity >= "high"importBack pressure behavior
Section titled “Back pressure behavior”Subscribers propagate back pressure to publishers. If a subscribing pipeline can’t keep up, publishers slow down to match, preventing data loss.
Pipelines not visible on the overview page at app.tenzir.com drop data instead of slowing publishers. This prevents slow ad-hoc queries from blocking production pipelines.