Tenzir comes with a wide range of built-in pipeline operators.
Analyze
Section titled “Analyze”rare
→Shows the least common values.
rare auth.token
sort
→Sorts events by the given expressions.
sort name, -abs(transaction)
summarize
→Groups events and applies aggregate functions to each group.
summarize name, sum(amount)
top
→Shows the most common values.
top user
Charts
Section titled “Charts”chart_area
→Plots events on an area chart.
chart_area …
chart_bar
→Plots events on an bar chart.
chart_bar …
chart_line
→Plots events on an line chart.
chart_line …
chart_pie
→Plots events on an pie chart.
chart_pie …
Connecting Pipelines
Section titled “Connecting Pipelines”publish
→Publishes events to a channel with a topic.
publish "topic"
subscribe
→Subscribes to events from a channel with a topic.
subscribe "topic"
Contexts
Section titled “Contexts”context::create_bloom_filter
→Creates a Bloom filter context.
context::create_bloom_filter "ctx", capacity=1Mi, fp_probability=0.01
context::create_geoip
→Creates a GeoIP context.
context::create_geoip "ctx", db_path="GeoLite2-City.mmdb"
context::create_lookup_table
→Creates a lookup table context.
context::create_lookup_table "ctx"
context::enrich
→Resets data with a context.
context::enrich "ctx", key=x
context::erase
→Removes entries from a context.
context::erase "ctx", key=x
context::inspect
→Resets a context.
context::inspect "ctx"
context::list
→Lists all contexts
context::list
context::load
→Loads context state.
context::load "ctx"
context::remove
→Deletes a context.
context::remove "ctx"
context::reset
→Resets a context.
context::reset "ctx"
context::save
→Saves context state.
context::save "ctx"
context::update
→Updates a context with new data.
context::update "ctx", key=x, value=y
package::remove
→Uninstalls a package.
package::remove "suricata-ocsf"
Detection
Section titled “Detection”sigma
→Filter the input with Sigma rules and output matching events.
sigma "/tmp/rules/"
yara
→Executes YARA rules on byte streams.
yara "/path/to/rules", blockwise=true
Encode & Decode
Section titled “Encode & Decode”compress
→Compresses a stream of bytes.
compress "zstd"
compress_brotli
→Compresses a stream of bytes using Brotli compression.
compress_brotli, level=10
compress_bz2
→Compresses a stream of bytes using bz2 compression.
compress_bz2, level=9
compress_gzip
→Compresses a stream of bytes using gzip compression.
compress_gzip, level=8
compress_lz4
→Compresses a stream of bytes using lz4 compression.
compress_lz4, level=7
compress_zstd
→Compresses a stream of bytes using zstd compression.
compress_zstd, level=6
decompress
→Decompresses a stream of bytes.
decompress "gzip"
decompress_brotli
→Decompresses a stream of bytes in the Brotli format.
decompress_brotli
Escape Hatches
Section titled “Escape Hatches”python
→Executes Python code against each event of the input.
python "self.x = self.y"
Filter
Section titled “Filter”assert
→Drops events and emits a warning if the invariant is violated.
assert name.starts_with("John")
assert_throughput
→Emits a warning if the pipeline does not have the expected throughput
assert_throughput 1000, within=1s
deduplicate
→Removes duplicate events based on a common key.
deduplicate src_ip
sample
→Dynamically samples events from a event stream.
sample 30s, max_samples=2k
slice
→Keeps a range of events within the interval
[begin, end)
stepping by stride
.slice begin=10, end=30
where
→Keeps only events for which the given predicate is true.
where name.starts_with("John")
Flow Control
Section titled “Flow Control”cron
→Runs a pipeline periodically according to a cron expression.
cron "* */10 * * * MON-FRI" { from "https://example.org" }
delay
→Delays events relative to a given start time, with an optional speedup.
delay ts, speed=2.5
every
→Runs a pipeline periodically at a fixed interval.
every 10s { summarize sum(amount) }
fork
→Executes a subpipeline with a copy of the input.
fork { to "copy.json" }
load_balance
→Routes the data to one of multiple subpipelines.
load_balance $over { publish $over }
throttle
→Limits the bandwidth of a pipeline.
throttle 100M, within=1min
Host Inspection
Section titled “Host Inspection”files
→Shows file information for a given directory.
files "/var/log/", recurse=true
Internals
Section titled “Internals”api
→Use Tenzir's REST API directly from a pipeline.
api "/pipeline/list"
batch
→The
batch
operator controls the batch size of events.batch timeout=1s
buffer
→An in-memory buffer to improve handling of data spikes in upstream operators.
buffer 10M, policy="drop"
cache
→An in-memory cache shared between pipelines.
cache "w01wyhTZm3", ttl=10min
legacy
→Provides a compatibility fallback to TQL1 pipelines.
legacy "chart area"
local
→Forces a pipeline to run locally.
local { sort foo }
remote
→Forces a pipeline to run remotely at a node.
remote { version }
serve
→Make events available under the
/serve
REST API endpointserve "abcde12345"
strict
→Treats all warnings as errors.
strict { assert false }
unordered
→Removes ordering assumptions from a pipeline.
unordered { read_ndjson }
Modify
Section titled “Modify”drop
→Removes fields from the event.
drop name, metadata.id
http
→Sends HTTP/1.1 requests and forwards the response.
http "example.com"
move
→Moves values from one field to another, removing the original field.
move id=parsed_id, ctx.message=incoming.status
select
→Selects some values and discards the rest.
select name, id=metadata.id
set
→Assigns a value to a field, creating it if necessary.
name = "Tenzir"
timeshift
→Adjusts timestamps relative to a given start time, with an optional speedup.
timeshift ts, start=2020-01-01
Packages
Section titled “Packages”context::list
→Lists all contexts
context::list
packag::list
→Shows installed packages.
package::list
package::add
→Installs a package.
package::add "suricata-ocsf"
pipeline::list
→Shows managed pipelines.
pipeline::list
Parsing
Section titled “Parsing”read_bitz
→Parses bytes as BITZ format.
read_bitz
read_csv
→Read CSV (Comma-Separated Values) from a byte stream.
read_csv null_value="-"
read_grok
→Parses lines of input with a grok pattern.
read_grok "%{IP:client} %{WORD:action}"
read_json
→Parses an incoming JSON stream into events.
read_json arrays_of_objects=true
read_kv
→Read Key-Value pairs from a byte stream.
read_kv r"(\s+)[A-Z_]+:", r":\s*"
read_ssv
→Read SSV (Space-Separated Values) from a byte stream.
read_ssv header="name count"
read_tsv
→Read TSV (Tab-Separated Values) from a byte stream.
read_tsv auto_expand=true
read_xsv
→Read XSV from a byte stream.
read_xsv ";", ":", "N/A"
Pipelines
Section titled “Pipelines”pipeline::activity
→Summarizes the activity of pipelines.
pipeline::activity range=1d, interval=1h
pipeline::detach
→Starts a pipeline in the node.
pipeline::detach { … }
pipeline::list
→Shows managed pipelines.
pipeline::list
pipeline::run
→Starts a pipeline in the node and waits for it to complete.
pipeline::run { … }
Printing
Section titled “Printing”write_bitz
→Writes events in BITZ format.
write_bitz
write_ndjson
→Transforms the input event stream to a Newline-Delimited JSON byte stream.
write_ndjson
write_syslog
→Writes events as syslog.
write_syslog
Inputs
Section titled “Inputs”load_azure_blob_storage
→Loads bytes from Azure Blob Storage.
load_azure_blob_storage "abfs://container/file"
load_file
→Loads the contents of the file at
path
as a byte stream.load_file "/tmp/data.json"
load_ftp
→Loads a byte stream via FTP.
load_ftp "ftp.example.org"
load_gcs
→Loads bytes from a Google Cloud Storage object.
load_gcs "gs://bucket/object.json"
load_google_cloud_pubsub
→Subscribes to a Google Cloud Pub/Sub subscription and obtains bytes.
load_google_cloud_pubsub project_id="my-project"
load_http
→Loads a byte stream via HTTP.
load_http "example.org", params={n: 5}
load_kafka
→Loads a byte stream from a Apache Kafka topic.
load_kafka topic="example"
load_nic
→Loads bytes from a network interface card (NIC).
load_nic "eth0"
load_s3
→Loads from an Amazon S3 object.
load_s3 "s3://my-bucket/obj.csv"
load_sqs
→Loads bytes from [Amazon SQS][sqs] queues.
load_sqs "sqs://tenzir"
load_tcp
→Loads bytes from a TCP or TLS connection.
load_tcp "0.0.0.0:8090" { read_json }
load_udp
→Loads bytes from a UDP socket.
load_udp "0.0.0.0:8090"
load_zmq
→Receives ZeroMQ messages.
load_zmq
Events
Section titled “Events”from_file
→Reads one or multiple files from a filesystem.
from_file "s3://data/**.json"
from_fluent_bit
→Receives events via Fluent Bit.
from_fluent_bit "opentelemetry"
from_http
→Sends and receives HTTP/1.1 requests.
from_http "0.0.0.0:8080
from_opensearch
→Receives events via Opensearch Bulk API.
from_opensearch
from_velociraptor
→Submits VQL to a Velociraptor server and returns the response as events.
from_velociraptor subscribe="Windows"
Inspection
Section titled “Inspection”metrics
→Retrieves metrics events from a Tenzir node.
metrics "cpu"
Storage Engine
Section titled “Storage Engine”partitions
→Retrieves metadata about events stored at a node.
partitions src_ip == 1.2.3.4
Outputs
Section titled “Outputs”save_azure_blob_storage
→Saves bytes to Azure Blob Storage.
save_azure_blob_storage "abfs://container/file"
save_email
→Saves bytes through an SMTP server.
save_email "user@example.org"
save_file
→Writes a byte stream to a file.
save_file "/tmp/out.json"
save_ftp
→Saves a byte stream via FTP.
save_ftp "ftp.example.org"
save_gcs
→Saves bytes to a Google Cloud Storage object.
save_gcs "gs://bucket/object.json"
save_google_cloud_pubsub
→Publishes to a Google Cloud Pub/Sub topic.
save_google_cloud_pubsub project_id="my-project"
save_http
→Sends a byte stream via HTTP.
save_http "example.org/api"
save_kafka
→Saves a byte stream to a Apache Kafka topic.
save_kafka topic="example"
save_s3
→Saves bytes to an Amazon S3 object.
save_s3 "s3://my-bucket/obj.csv"
save_sqs
→Saves bytes to [Amazon SQS][sqs] queues.
save_sqs "sqs://tenzir"
save_tcp
→Saves bytes to a TCP or TLS connection.
save_tcp "0.0.0.0:8090", tls=true
save_udp
→Saves bytes to a UDP socket.
save_udp "0.0.0.0:8090"
Events
Section titled “Events”to_asl
→Sends events to Amazon Security Lake (ASL).
to_asl "s3://…"
to_azure_log_analytics
→Sends events to the Microsoft Azure Logs Ingestion API.
to_azure_log_analytics tenant_id="...", workspace_id="..."
to_clickhouse
→Sends events to a ClickHouse table.
to_clickhouse table="my_table"
to_fluent_bit
→Sends events via Fluent Bit.
to_fluent_bit "elasticsearch" …
to_google_cloud_logging
→Sends events to Google Cloud Logging.
to_google_cloud_logging …
to_google_secops
→Sends unstructured events to a Google SecOps Chronicle instance.
to_google_secops …
to_hive
→Writes events to a URI using hive partitioning.
to_hive "s3://…", partition_by=[x]
to_opensearch
→Sends events to an OpenSearch-compatible Bulk API.
to_opensearch "localhost:9200", …
to_snowflake
→Sends events to a Snowflake database.
to_snowflake account_identifier="…
to_splunk
→Sends events to a Splunk [HTTP Event Collector (HEC)][hec].
to_splunk "localhost:8088", …