Operators
Tenzir comes with a wide range of built-in pipeline operators.
Modify
Operator | Description | Example |
---|---|---|
set | Assigns a value to a field, creating it if necessary | name = "Tenzir" |
select | Selects some values and discard the rest | select name, id=metadata.id |
drop | Removes fields from the event | drop name, metadata.id |
enumerate | Adds a field with the number of the event | enumerate num |
timeshift | Adjusts timestamps relative to a given start time | timeshift ts, start=2020-01-01 |
unroll | Unrolls a field of type list, duplicating the surrounding event | unroll names |
Filter
Operator | Description | Example |
---|---|---|
where | Keeps only events matching a predicate | where name.starts_with("John") |
assert | Same as where , but warns if predicate is false | assert name.starts_with("John") |
taste | Keeps only N events of each type | taste 1 |
head | Keeps only the first N events | head 20 |
tail | Keeps only the last N events | tail 20 |
slice | Keeps a range of events with an optional stride | slice begin=10, end=30 |
sample | Samples events based on load | sample 30s, max_samples=2k |
deduplicate | Removes duplicate events | deduplicate src_ip |
Analyze
Operator | Description | Example |
---|---|---|
summarize | Aggregates events with implicit grouping | summarize name, sum(amount) |
sort | Sorts the events by one or more expressions | sort name, -abs(transaction) |
reverse | Reverses the event order | reverse |
top | Shows the most common values | top user |
rare | Shows the least common values | rare auth.token |
Flow Control
Operator | Description | Example |
---|---|---|
delay | Delays events relative to a start time | delay ts, speed=2.5 |
cron | Runs a pipeline periodically with a cron expression | cron "* */10 * * * MON-FRI" { from "https://example.org" } |
discard | Discards incoming bytes or events | discard |
every | Runs a pipeline periodically at a fixed interval | every 10s { summarize sum(amount) } |
fork | Forwards a copy of the events to another pipeline | fork { to "copy.json" } |
load_balance | Routes the data to one of multiple subpipelines | load_balance $over { publish $over } |
pass | Does nothing with the input | pass |
repeat | Repeats the input after it has finished | repeat 100 |
throttle | Limits the amount of data flowing through | throttle 100M, within=1min |
Inputs
Events
Operator | Description | Example |
---|---|---|
from | Reads events from an URI Creates events from records | from "http://example.org/file.csv.gz" from {key: "value"}… |
from_fluent_bit | Returns results from Fluent Bit | from_fluent_bit "opentelemetry" |
from_velocira… | Returns results from a Velociraptor server | from_velociraptor subscribe="Windows" |
Bytes
Operator | Description | Example |
---|---|---|
load_amqp | Loads bytes from an AMQP server | load_amqp |
load_azure_blob… | Load bytes from an Azure Blob Storage | load_azure_blob_storage "abfs://… |
load_file | Loads bytes from a file | load_file "/tmp/data.json" |
load_ftp | Loads bytes via FTP | load_ftp "ftp.example.org" |
load_google_c… | Listen to a Google Cloud Pub/Sub subscription | load_google_cloud_pubsub project_id=… |
load_http | Receives bytes from a HTTP request | load_http "example.org", params={n: 5} |
load_kafka | Receives bytes from an Apache Kafka topic | load_kafka topic="example" |
load_nic | Receives bytes from a Network Interface Card | load_nic "eth0" |
load_s3 | Receives bytes from an Amazon S3 object | load_s3 "s3://my-bucket/obj.csv" |
load_sqs | Receives bytes from an Amazon SQS queue | load_sqs "sqs://tenzir" |
load_tcp | Loads bytes from a TCP or TLS connection | load_tcp "0.0.0.0:8090" { read_json } |
load_udp | Loads bytes from a UDP socket | load_udp "0.0.0.0:8090" |
load_zmq | Receives bytes from ZeroMQ messages | load_zmq |
Outputs
Events
Operator | Description | Example |
---|---|---|
to | Writes events to an URI | from "s3://examplebucket/obj.json.gz" |
to_azure_log_ana… | Sends events to Azure Log Analytics | to_azure_log_analytics tenant_id=… |
to_fluent_bit | Sends events to Fluent Bit | to_fluent_bit "elasticsearch" … |
to_hive | Writes events using hive partitioning | to_hive "s3://…", partition_by=[x] |
to_opensearch | Sends incoming events to the OpenSearch Bulk API | to_opensearch 'localhost:9200", ... |
to_snowflake | Sends incoming events to a Snowflake database | to_snowflake account_identifier="… |
to_splunk | Sends incoming events to a Splunk HEC | to_splunk "localhost:8088", … |
Bytes
Operator | Description | Example |
---|---|---|
save_amqp | Saves incoming bytes to an AMQP server | save_amqp |
save_azure_blob… | Saves to an Azure Blob Storage | save_azure_blob_storage "abfs://… |
save_email | Saves incoming bytes through an SMTP server | save_email "user@example.org" |
save_file | Saves incoming bytes into a file | save_file "/tmp/out.json" |
save_ftp | Saves incoming bytes via FTP | save_ftp "ftp.example.org" |
save_google_cloud… | Publishes to a Google Cloud Pub/Sub topic | save_google_cloud_pubsub project… |
save_http | Sends incoming bytes over a HTTP connection | save_http "example.org/api" |
save_kafka | Saves incoming bytes to an Apache Kafka topic | save_kafka topic="example" |
save_s3 | Saves incoming bytes to an Amazon S3 object | save_s3 "s3://my-bucket/obj.csv" |
save_sqs | Saves incoming bytes to an Amazon SQS queue | save_sqs "sqs://tenzir" |
save_tcp | Saves incoming bytes to a TCP or TLS connection | save_tcp "0.0.0.0:8090", tls=true |
save_udp | Saves incoming bytes to a UDP socket | save_udp "0.0.0.0:8090" |
save_zmq | Saves incoming bytes to ZeroMQ messages | save_zmq |
Parsing
Operator | Description | Example |
---|---|---|
read_bitz | Parses Tenzir's internal wire format | read_bitz |
read_cef | Parses the Common Event Format | read_cef |
read_csv | Parses comma-separated values | read_csv null_value="-" |
read_feather | Parses Feather format | read_feather |
read_gelf | Parses the Graylog Extended Log Format | read_gelf |
read_grok | Parses events using a Grok pattern | read_grok "%{IP:client} %{WORD:action}" |
read_json | Parses JSON objects | read_json arrays_of_objects=true |
read_kv | Parses key-value pairs | read_kv r"(\s+)[A-Z_]+:", r":\s*" |
read_leef | Parses the Log Event Extended Format | read_leef |
read_lines | Parses each line into a separate event | read_lines |
read_ndjson | Parses newline-delimited JSON | read_ndjson |
read_pcap | Parses raw network packets in PCAP format | read_pcap |
read_parquet | Parses Parquet format | read_parquet |
read_ssv | Parses space-separated values | read_ssv header="name count" |
read_suricata | Parses Suricata's Eve format | read_suricata |
read_syslog | Parses syslog | read_syslog |
read_tsv | Parses tab-separated values | read_tsv auto_expand=true |
read_xsv | Parses custom-separated values | read_xsv ";", ":", "N/A" |
read_yaml | Parses YAML | read_yaml |
read_zeek_json | Parses Zeek JSON | read_zeek_json |
read_zeek_tsv | Parses Zeek TSV | read_zeek_tsv |
Printing
Operator | Description | Example |
---|---|---|
write_bitz | Writes events as Tenzir's internal wire format | write_bitz |
write_csv | Writes events as CSV | write_csv |
write_feather | Writes events as Feather | write_feather |
write_json | Writes events as JSON | write_json |
write_ndjson | Writes events as Newline-Delimited JSON | write_ndjson |
write_lines | Writes events as lines | write_lines |
write_parquet | Writes events as Parquet | write_parquet |
write_pcap | Writes events as PCAP | write_pcap |
write_ssv | Writes events as SSV | write_ssv |
write_tsv | Writes events as TSV | write_tsv |
write_xsv | Writes events as XSV | write_xsv |
write_yaml | Writes events as YAML | write_yaml |
write_zeek_tsv | Writes events as Zeek TSV | write_zeek_tsv |
Connecting Pipelines
Operator | Description | Example |
---|---|---|
publish | Publishes events to a certain topic | publish "topic" |
subscribe | Subscribes to events of a certain topic | subscribe "topic" |
Node
Inspection
Operator | Description | Example |
---|---|---|
config | Returns the node's configuration | config |
diagnostics | Returns diagnostic events of managed pipelines | diagnostics |
openapi | Returns the OpenAPI specification | openapi |
metrics | Retrieves metrics events from a Tenzir node | metrics "cpu" |
plugins | Lists available plugins | plugins |
version | Shows the current version | version |
Storage Engine
Operator | Description | Example |
---|---|---|
export | Retrieves events from the node | export |
fields | Lists all fields stored at the node | fields |
import | Stores events at the node | import |
partitions | Retrieves metadata about events stored at the node | partitions src_ip == 1.2.3.4 |
schemas | Lists schemas for events stored at the node | schemas |
Host Inspection
Operator | Description | Example |
---|---|---|
files | Lists files in a directory | files "/var/log/", recurse=true |
nics | Lists available network interfaces | nics |
processes | Lists running processes | processes |
sockets | Lists open sockets | sockets |
Detection
Operator | Description | Example |
---|---|---|
sigma | Matches incoming events against Sigma rules | sigma "/tmp/rules/" |
yara | Matches the incoming byte stream against YARA rules | yara "/path/to/rules", blockwise=true |
Internals
Operator | Description | Example |
---|---|---|
api | Calls Tenzir's REST API from a pipeline | api "/pipeline/list" |
batch | Controls the batch size of events | batch timeout=1s |
buffer | Adds additional buffering to handle spikes | buffer 10M, policy="drop" |
cache | In-memory cache shared between pipelines | cache "w01wyhTZm3", ttl=10min |
legacy | Provides a compatibility fallback to TQL1 pipelines | legacy "chart area" |
local | Forces a pipeline to run locally | local { sort foo } |
measure | Returns events describing the incoming batches | measure |
remote | Forces a pipeline to run remotely at a node | remote { version } |
serve | Makes events available at /serve | serve "abcde12345" |
unordered | Remove ordering assumptions in a pipeline | unordered { read_ndjson } |
Encode & Decode
Operator | Description | Example |
---|---|---|
compress_brotli | Compresses bytes using Brotli compression | compress_zstd, level=10 |
compress_bz2 | Compresses bytes using Bzip compression | compress_bz2, level=9 |
compress_gzip | Compresses bytes using Gzip compression | compress_gzip, level=8 |
compress_lz4 | Compresses bytes using lz4 compression | compress_lz4, level=7 |
compress_zstd | Compresses bytes using Gzip compression | compress_zstd, level=6 |
decompress_brotli | Decompresses Brotli compressed bytes | decompress_zstd |
decompress_bz2 | Decompresses Bzip2 compressed bytes | decompress_bz2 |
decompress_gzip | Decompresses Gzip compressed bytes | decompress_gzip |
decompress_lz4 | Decompresses lz4 compressed bytes | decompress_lz4 |
decompress_zstd | Decompresses Zstd compressed bytes | decompress_zstd |
Pipelines
Operator | Description | Example |
---|---|---|
pipeline::list | Shows managed pipelines | package::list |
Contexts
Function | Description | Example |
---|---|---|
context::create_bloom_filter | Creates a Bloom filter context | context::create_bloom_filter "ctx", capacity=1Mi, fp_probability=0.01 |
context::create_lookup_table | Creates a lookup table context | context::create_lookup_table "ctx" |
context::create_geoip | Creates a GeoIP context for IP-based geolocation | context::create_geoip "ctx", db_path="GeoLite2-City.mmdb" |
context::enrich | Enriches with a context | context::enrich "ctx", key=x |
context::erase | Removes entries from a context | context::erase "ctx", key=x |
context::inspect | Inspects the details of a specified context | context::inspect "ctx" |
context::list | Lists all contexts | context::list |
context::remove | Deletes a context | context::remove "ctx" |
context::reset | Resets the state of a specified context | context::reset "ctx" |
context::save | Saves context state | context::save "ctx" |
context::load | Loads context state | context::load "ctx" |
context::update | Updates an existing context with new data | context::update "ctx", key=x, value=y |
Packages
Operator | Description | Example |
---|---|---|
package::add | Installs a package | package::add "suricata-ocsf" |
package::list | Shows installed packages | package::list |
package::remove | Uninstalls a package | package::remove "suricata-ocsf" |
Escape Hatches
Operator | Description | Example |
---|---|---|
python | Executes a Python snippet for each event | python "self.x = self.y" |
shell | Runs a shell command within the pipeline | shell "./process.sh | tee copy.txt" |