Skip to content

This guide shows you how to fan out an event stream into subpipelines with each and group. You’ll learn when to spawn one subpipeline per event, when to keep one subpipeline per key, and how these operators differ from fixed fan-out operators like fork, parallel, and load_balance.

Tenzir has several operators that send events into subpipelines. Choose the operator based on how many subpipelines you need and how events should flow into them:

OperatorSubpipelinesEvent flowUse case
forkOne fixed side branchEvery event goes to the main pipeline and the side branchArchive or publish a copy while continuing processing
parallelA fixed number of workersEach event goes to one worker running the same subpipelineSpeed up CPU-heavy or I/O-heavy work
load_balanceOne branch per configured targetEach event goes to one targetDistribute load across equivalent sinks
eachOne fresh subpipeline per input eventThe input event is available as $this; it is not passed as inputRun a per-event job, such as a lookup or export
groupOne subpipeline per keyMatching events are passed to the same keyed subpipelineKeep per-tenant, per-host, or per-session processing isolated

Use regular transformations when every event can flow through the same linear pipeline. Use subpipeline fan-out when the pipeline structure itself depends on each event or key.

Use each when every input event describes a job to run. The nested pipeline must start with a source because each does not pass the input event into the subpipeline. Instead, it binds the current event record to $this.

The following pipeline treats incoming cases as lookup requests. Each case queries the same historical dataset for matching source IPs and annotates the matches with the case ID:

from {case_id: "C-1", ip: "10.0.0.5"},
{case_id: "C-2", ip: "10.0.0.7"}
each parallel=4 {
from {ts: 2024-01-01T10:00:00, src_ip: "10.0.0.5", action: "login"},
{ts: 2024-01-01T10:02:00, src_ip: "10.0.0.8", action: "scan"},
{ts: 2024-01-01T10:05:00, src_ip: "10.0.0.7", action: "download"}
where src_ip == $this.ip
case_id = $this.case_id
}
sort case_id, ts
{
ts: 2024-01-01T10:00:00.000000,
src_ip: "10.0.0.5",
action: "login",
case_id: "C-1",
}
{
ts: 2024-01-01T10:05:00.000000,
src_ip: "10.0.0.7",
action: "download",
case_id: "C-2",
}

The parallel option limits how many per-event jobs run at the same time. When more events arrive, each queues them and applies back pressure upstream until a running subpipeline finishes. Keep this value bounded for external APIs, expensive exports, or destinations with rate limits.

Use group when events with the same key must go through the same subpipeline. Unlike each, the nested pipeline receives input: Tenzir sends all matching events for a key to that key’s subpipeline. The key is available as $group inside the subpipeline.

The following pipeline keeps tenant streams separate and computes a summary per tenant:

from {tenant: "alpha", bytes: 120},
{tenant: "beta", bytes: 90},
{tenant: "alpha", bytes: 80}
group tenant {
summarize events=count(), bytes=sum(bytes)
tenant = $group
}
sort tenant
{
events: 2,
bytes: 200,
tenant: "alpha",
}
{
events: 1,
bytes: 90,
tenant: "beta",
}

For a pure aggregation, summarize is usually shorter. Use group when the per-key subpipeline does more than aggregate, such as keeping state, applying a keyed transformation, or writing to a key-specific sink.

A common group pattern is to write each tenant, host, or sensor to its own file. The subpipeline ends with a sink, so group itself becomes a sink:

from {tenant: "alpha", message: "login"},
{tenant: "beta", message: "scan"},
{tenant: "alpha", message: "logout"}
group tenant {
to_file f"/tmp/tenzir/{$group}.json" {
write_ndjson
}
}

This creates one subpipeline per tenant and writes matching events to that subpipeline’s file.

  • Don’t use each for ordinary per-event transformations. Use regular TQL statements or parallel when every event follows the same processing steps.
  • Don’t use group only to calculate grouped totals. Use summarize unless you need a full subpipeline per key.
  • Don’t leave each unbounded for external systems. Set parallel to match the concurrency that the downstream service can handle.
  • Remember that each subpipelines must start with a source, while group subpipelines receive the grouped input stream.
  • Neither each nor group can use subpipelines that produce bytes as output.

Last updated: