Operators
Tenzir comes with a wide range of built-in pipeline operators.
Modify
Operator | Description | Example |
---|---|---|
set | Assigns a value to a field, creating it if necessary | name = "Tenzir" |
select | Selects some values and discard the rest | select name, id=metadata.id |
drop | Removes fields from the event | drop name, metadata.id |
enumerate | Adds a field with the number of the event | enumerate num |
Filter
Operator | Description | Example |
---|---|---|
where | Keeps only events matching a predicate | where name.starts_with("John") |
assert | Same as where , but warns if predicate is false | assert name.starts_with("John") |
taste | Keeps only N events of each type | taste 1 |
head | Keeps only the first N events | head 20 |
tail | Keeps only the last N events | tail 20 |
slice | Keeps a range of events with an optional stride | slice begin=10, end=30 |
Analyze
Operator | Description | Example |
---|---|---|
summarize | Aggregates events with implicit grouping | summarize name, sum(amount) |
sort | Sorts the events by one or more expressions | sort name, -abs(transaction) |
reverse | Reverses the event order | reverse |
top | Shows the most common values | top user |
rare | Shows the least common values | rare auth.token |
Flow Control
Operator | Description | Example |
---|---|---|
every | Restarts a pipeline periodically | every 10s { summarize sum(amount) } |
fork | Forwards a copy of the events to another pipeline | fork { to "copy.json" } |
if | Splits the flow based on a predicate | if transaction > 0 { … } else { … } |
load_balance | Routes the data to one of multiple subpipelines | load_balance $over { publish $over } |
Input
Operator | Description | Example |
---|---|---|
diagnostics | Retrieves diagnostic events of managed pipelines | diagnostics |
export | Retrieves events from the node | export |
from_velociraptor | Returns results from a Velociraptor server | from_velociraptor subscribe="Windows" |
load_file | Loads bytes from a file | load_file "/tmp/data.json" |
load_google… | Listen to a Google Cloud Pub/Sub subscription | load_google_cloud_pubsub "…", "…" |
load_http | Receives bytes from a HTTP request | load_http "example.org", params={n: 5} |
load_kafka | Receives bytes from an Apache Kafka topic | load_kafka topic="example" |
load_tcp | Loads bytes from a TCP or TLS connection | load_tcp "0.0.0.0:8090" { read_json } |
metrics | Retrieves metrics events from a Tenzir node | metrics "cpu" |
subscribe | Subscribes to events of a certain topic | subscribe "topic" |
Output
Operator | Description | Example |
---|---|---|
publish | Publishes events to a certain topic | publish "topic" |
import | Stores events at the node | import |
discard | Discards incoming bytes or events | discard |
save_file | Saves incoming bytes into a file | save_file "/tmp/out.json" |
save_google_cloud… | Publishes to a Google Cloud Pub/Sub topic | save_google_cloud_pubsub "…", "…" |
save_http | Sends incoming bytes over a HTTP connection | save_http "example.org/api" |
save_kafka | Saves incoming bytes to an Apache Kafka topic | save_kafka topic="example" |
serve | Makes events available at /serve | serve "abcde12345" |
to_azure_log_analytics | Sends events to Azure Log Analytics | to_azure_log_analytics tenant_id=… |
to_hive | Writes events using hive partitioning | to_hive "s3://…", partition_by=[x] |
to_splunk | Sends incoming events to a Splunk HEC | to_splunk "https://localhost:8088", … |
Parsing
Operator | Description | Example |
---|---|---|
read_bitz | Parses Tenzir's internal wire format | read_bitz |
read_cef | Parses the Common Event Format | read_cef |
read_csv | Parses comma-separated values | read_csv null_value="-" |
read_gelf | Parses the Graylog Extended Log Format | read_gelf |
read_grok | Parses events using a Grok pattern | read_grok "%{IP:client} %{WORD:action}" |
read_json | Parses JSON objects | read_json arrays_of_objects=true |
read_kv | Parses key-value pairs | read_kv r"(\s+)[A-Z_]+:", r":\s*" |
read_leef | Parses the Log Event Extended Format | read_leef |
read_lines | Parses each line into a separate event | read_lines |
read_ndjson | Parses newline-delimited JSON | read_ndjson |
read_ssv | Parses space-separated values | read_ssv header="name count" |
read_suricata | Parses Suricata's Eve format | read_suricata |
read_syslog | Parses syslog | read_syslog |
read_tsv | Parses tab-separated values | read_tsv auto_expand=true |
read_xsv | Parses custom-separated values | read_xsv ";", ":", "N/A" |
read_yaml | Parses YAML | read_yaml |
read_zeek_json | Parses Zeek JSON | read_zeek_json |
read_zeek_tsv | Parses Zeek TSV | read_zeek_tsv |
Printing
Operator | Description | Example |
---|---|---|
write_bitz | Writes events as Tenzir's internal wire format | write_bitz |
write_json | Writes events as JSON | write_json ndjson=true |
Internals
Operator | Description | Example |
---|---|---|
api | Calls Tenzir's REST API from a pipeline | api "/pipeline/list" |
batch | Controls the batch size of events | batch timeout=1s |
buffer | Adds additional buffering to handle spikes | buffer 10M, policy="drop" |
measure | Returns events describing the incoming batches | measure |
throttle | Limits the amount of data flowing through | throttle 100M, within=1min |
cache | In-memory cache shared between pipelines | cache "w01wyhTZm3", ttl=10min |
legacy | Provides a compatibility fallback to TQL1 pipelines | legacy "chart area" |
Node Inspection
Operator | Description | Example |
---|---|---|
config | Returns the node's configuration | config |
fields | Lists all fields stored at the node | fields |
openapi | Returns the OpenAPI specification | openapi |
partitions | Retrieves metadata about events stored at the node | partitions src_ip == 1.2.3.4 |
plugins | Lists available plugins | plugins |
schemas | Lists schemas for events stored at the node | schemas |
version | Shows the current version | version |
Host Inspection
Operator | Description | Example |
---|---|---|
files | Lists files in a directory | files "/var/log/", recurse=true |
nics | Lists available network interfaces | nics |
processes | Lists running processes | processes |
sockets | Lists open sockets | sockets |
Uncategorized
Operator | Description | Example |
---|---|---|
compress | Compresses a stream of bytes | compress "zstd", level=18 |
decompress | Decompresses a stream of bytes | decompress "brotli" |
delay | Delays events relative to a start time | delay ts, speed=2.5 |
pass | Does nothing with the input | pass |
repeat | Repeats the input after it has finished | repeat 100 |
sigma | Matches incoming events against Sigma rules | sigma "/tmp/rules/" |
timeshift | Adjusts timestamps relative to a given start time | timeshift ts, start=2020-01-01 |
yara | Matches the incoming byte stream against YARA rules | yara "/path/to/rules", blockwise=true |
python | Executes a Python snippet for each event | python "self.x = self.y" |
shell | Runs a shell command within the pipeline | shell "./process.sh | tee copy.txt" |