Skip to content

Spawns a subpipeline for every incoming event, with the event bound to $this.

each [parallel=int] {}

The each operator runs a fresh subpipeline for every incoming event. The record of the current event is bound to $this inside the subpipeline, so the subpipeline can parametrize its behavior on a per-event basis.

The subpipeline takes no input from each. It either emits events—which are forwarded as the operator’s output—or ends with a sink, in which case each itself becomes a sink. The subpipeline must not produce bytes.

Use each for per-event jobs, such as running a lookup, export, or sink whose source depends on the incoming event. For keyed streams that should keep one subpipeline alive per key, use group instead.

The maximum number of subpipelines that may run concurrently. Must be at least 1. Excess events queue and start as soon as a slot frees.

Defaults to 10.

The subpipeline to spawn for each event. Must start with a source.

Inside the subpipeline, $this refers to the record of the current input event.

Use fields from the input event to parametrize a source subpipeline. This example treats the input as investigation cases and searches a historical event set for matching source IPs:

from {case_id: "C-1", ip: "10.0.0.5"},
{case_id: "C-2", ip: "10.0.0.7"}
each parallel=4 {
from {ts: 2024-01-01T10:00:00, src_ip: "10.0.0.5", action: "login"},
{ts: 2024-01-01T10:02:00, src_ip: "10.0.0.8", action: "scan"},
{ts: 2024-01-01T10:05:00, src_ip: "10.0.0.7", action: "download"}
where src_ip == $this.ip
case_id = $this.case_id
}
sort case_id, ts
{
ts: 2024-01-01T10:00:00.000000,
src_ip: "10.0.0.5",
action: "login",
case_id: "C-1",
}
{
ts: 2024-01-01T10:05:00.000000,
src_ip: "10.0.0.7",
action: "download",
case_id: "C-2",
}

When the subpipeline ends with a sink, each itself becomes a sink. Use this to write a separate output file per tenant in the input:

from {tenant: "alpha"}, {tenant: "beta"}
each {
from {tenant: $this.tenant, status: "ok"},
{tenant: $this.tenant, status: "fail"}
to_file f"/tmp/tenzir/{$this.tenant}.json" {
write_ndjson
}
}

Last updated: