Skip to main content
Version: Next

Operators

Tenzir comes with a wide range of built-in pipeline operators.

Modify

OperatorDescriptionExample
setAssigns a value to a field, creating it if necessaryname = "Tenzir"
selectSelects some values and discard the restselect name, id=metadata.id
dropRemoves fields from the eventdrop name, metadata.id
enumerateAdds a field with the number of the eventenumerate num
timeshiftAdjusts timestamps relative to a given start timetimeshift ts, start=2020-01-01
unrollUnrolls a field of type list, duplicating the surrounding eventunroll names

Filter

OperatorDescriptionExample
whereKeeps only events matching a predicatewhere name.starts_with("John")
assertSame as where, but warns if predicate is falseassert name.starts_with("John")
tasteKeeps only N events of each typetaste 1
headKeeps only the first N eventshead 20
tailKeeps only the last N eventstail 20
sliceKeeps a range of events with an optional strideslice begin=10, end=30
sampleSamples events based on loadsample 30s, max_samples=2k
deduplicateRemoves duplicate eventsdeduplicate src_ip

Analyze

OperatorDescriptionExample
summarizeAggregates events with implicit groupingsummarize name, sum(amount)
sortSorts the events by one or more expressionssort name, -abs(transaction)
reverseReverses the event orderreverse
topShows the most common valuestop user
rareShows the least common valuesrare auth.token

Flow Control

OperatorDescriptionExample
delayDelays events relative to a start timedelay ts, speed=2.5
cronRuns a pipeline periodically with a cron expressioncron "* */10 * * * MON-FRI" { from "https://example.org" }
discardDiscards incoming bytes or eventsdiscard
everyRuns a pipeline periodically at a fixed intervalevery 10s { summarize sum(amount) }
forkForwards a copy of the events to another pipelinefork { to "copy.json" }
load_balanceRoutes the data to one of multiple subpipelinesload_balance $over { publish $over }
passDoes nothing with the inputpass
repeatRepeats the input after it has finishedrepeat 100
throttleLimits the amount of data flowing throughthrottle 100M, within=1min

Inputs

Events

OperatorDescriptionExample
fromReads events from an URI
Creates events from records
from "http://example.org/file.csv.gz"
from {key: "value"}…
from_fluent_bitReturns results from Fluent Bitfrom_fluent_bit "opentelemetry"
from_velocira…Returns results from a Velociraptor serverfrom_velociraptor subscribe="Windows"

Bytes

OperatorDescriptionExample
load_amqpLoads bytes from an AMQP serverload_amqp
load_azure_blob…Load bytes from an Azure Blob Storageload_azure_blob_storage "abfs://…
load_fileLoads bytes from a fileload_file "/tmp/data.json"
load_ftpLoads bytes via FTPload_ftp "ftp.example.org"
load_google_c…Listen to a Google Cloud Pub/Sub subscriptionload_google_cloud_pubsub project_id=…
load_httpReceives bytes from a HTTP requestload_http "example.org", params={n: 5}
load_kafkaReceives bytes from an Apache Kafka topicload_kafka topic="example"
load_nicReceives bytes from a Network Interface Cardload_nic "eth0"
load_s3Receives bytes from an Amazon S3 objectload_s3 "s3://my-bucket/obj.csv"
load_sqsReceives bytes from an Amazon SQS queueload_sqs "sqs://tenzir"
load_tcpLoads bytes from a TCP or TLS connectionload_tcp "0.0.0.0:8090" { read_json }
load_udpLoads bytes from a UDP socketload_udp "0.0.0.0:8090"
load_zmqReceives bytes from ZeroMQ messagesload_zmq

Outputs

Events

OperatorDescriptionExample
toWrites events to an URIfrom "s3://examplebucket/obj.json.gz"
to_azure_log_ana…Sends events to Azure Log Analyticsto_azure_log_analytics tenant_id=…
to_fluent_bitSends events to Fluent Bitto_fluent_bit "elasticsearch" …
to_hiveWrites events using hive partitioningto_hive "s3://…", partition_by=[x]
to_opensearchSends incoming events to the OpenSearch Bulk APIto_opensearch 'localhost:9200", ...
to_splunkSends incoming events to a Splunk HECto_splunk "localhost:8088", …

Bytes

OperatorDescriptionExample
save_amqpSaves incoming bytes to an AMQP serversave_amqp
save_azure_blob…Saves to an Azure Blob Storagesave_azure_blob_storage "abfs://…
save_emailSaves incoming bytes through an SMTP serversave_email "user@example.org"
save_fileSaves incoming bytes into a filesave_file "/tmp/out.json"
save_ftpSaves incoming bytes via FTPsave_ftp "ftp.example.org"
save_google_cloud…Publishes to a Google Cloud Pub/Sub topicsave_google_cloud_pubsub project…
save_httpSends incoming bytes over a HTTP connectionsave_http "example.org/api"
save_kafkaSaves incoming bytes to an Apache Kafka topicsave_kafka topic="example"
save_s3Saves incoming bytes to an Amazon S3 objectsave_s3 "s3://my-bucket/obj.csv"
save_sqsSaves incoming bytes to an Amazon SQS queuesave_sqs "sqs://tenzir"
save_tcpSaves incoming bytes to a TCP or TLS connectionsave_tcp "0.0.0.0:8090", tls=true
save_udpSaves incoming bytes to a UDP socketsave_udp "0.0.0.0:8090"
save_zmqSaves incoming bytes to ZeroMQ messagessave_zmq

Parsing

OperatorDescriptionExample
read_bitzParses Tenzir's internal wire formatread_bitz
read_cefParses the Common Event Formatread_cef
read_csvParses comma-separated valuesread_csv null_value="-"
read_featherParses Feather formatread_feather
read_gelfParses the Graylog Extended Log Formatread_gelf
read_grokParses events using a Grok patternread_grok "%{IP:client} %{WORD:action}"
read_jsonParses JSON objectsread_json arrays_of_objects=true
read_kvParses key-value pairsread_kv r"(\s+)[A-Z_]+:", r":\s*"
read_leefParses the Log Event Extended Formatread_leef
read_linesParses each line into a separate eventread_lines
read_ndjsonParses newline-delimited JSONread_ndjson
read_pcapParses raw network packets in PCAP formatread_pcap
read_parquetParses Parquet formatread_parquet
read_ssvParses space-separated valuesread_ssv header="name count"
read_suricataParses Suricata's Eve formatread_suricata
read_syslogParses syslogread_syslog
read_tsvParses tab-separated valuesread_tsv auto_expand=true
read_xsvParses custom-separated valuesread_xsv ";", ":", "N/A"
read_yamlParses YAMLread_yaml
read_zeek_jsonParses Zeek JSONread_zeek_json
read_zeek_tsvParses Zeek TSVread_zeek_tsv

Printing

OperatorDescriptionExample
write_bitzWrites events as Tenzir's internal wire formatwrite_bitz
write_csvWrites events as CSVwrite_csv
write_featherWrites events as Featherwrite_feather
write_jsonWrites events as JSONwrite_json
write_ndjsonWrites events as Newline-Delimited JSONwrite_ndjson
write_linesWrites events as lineswrite_lines
write_parquetWrites events as Parquetwrite_parquet
write_pcapWrites events as PCAPwrite_pcap
write_ssvWrites events as SSVwrite_ssv
write_tsvWrites events as TSVwrite_tsv
write_xsvWrites events as XSVwrite_xsv
write_yamlWrites events as YAMLwrite_yaml
write_zeek_tsvWrites events as Zeek TSVwrite_zeek_tsv

Connecting Pipelines

OperatorDescriptionExample
publishPublishes events to a certain topicpublish "topic"
subscribeSubscribes to events of a certain topicsubscribe "topic"

Node

Inspection

OperatorDescriptionExample
configReturns the node's configurationconfig
diagnosticsReturns diagnostic events of managed pipelinesdiagnostics
openapiReturns the OpenAPI specificationopenapi
metricsRetrieves metrics events from a Tenzir nodemetrics "cpu"
pluginsLists available pluginsplugins
versionShows the current versionversion

Storage Engine

OperatorDescriptionExample
exportRetrieves events from the nodeexport
fieldsLists all fields stored at the nodefields
importStores events at the nodeimport
partitionsRetrieves metadata about events stored at the nodepartitions src_ip == 1.2.3.4
schemasLists schemas for events stored at the nodeschemas

Host Inspection

OperatorDescriptionExample
filesLists files in a directoryfiles "/var/log/", recurse=true
nicsLists available network interfacesnics
processesLists running processesprocesses
socketsLists open socketssockets

Detection

OperatorDescriptionExample
sigmaMatches incoming events against Sigma rulessigma "/tmp/rules/"
yaraMatches the incoming byte stream against YARA rulesyara "/path/to/rules", blockwise=true

Internals

OperatorDescriptionExample
apiCalls Tenzir's REST API from a pipelineapi "/pipeline/list"
batchControls the batch size of eventsbatch timeout=1s
bufferAdds additional buffering to handle spikesbuffer 10M, policy="drop"
cacheIn-memory cache shared between pipelinescache "w01wyhTZm3", ttl=10min
legacyProvides a compatibility fallback to TQL1 pipelineslegacy "chart area"
localForces a pipeline to run locallylocal { sort foo }
measureReturns events describing the incoming batchesmeasure
remoteForces a pipeline to run remotely at a noderemote { version }
serveMakes events available at /serveserve "abcde12345"
unorderedRemove ordering assumptions in a pipelineunordered { read_ndjson }

Encode & Decode

OperatorDescriptionExample
compressCompresses a stream of bytescompress "zstd", level=18
decompressDecompresses a stream of bytesdecompress "brotli"

Contexts

FunctionDescriptionExample
context::create_bloom_filterCreates a Bloom filter contextcontext::create_bloom_filter "ctx", capacity=1Mi, fp_probability=0.01
context::create_lookup_tableCreates a lookup table contextcontext::create_lookup_table "ctx"
context::create_geoipCreates a GeoIP context for IP-based geolocationcontext::create_geoip "ctx", db_path="GeoLite2-City.mmdb"
context::enrichEnriches with a contextcontext::enrich "ctx", key=x
context::eraseRemoves entries from a contextcontext::erase "ctx", key=x
context::inspectInspects the details of a specified contextcontext::inspect "ctx"
context::listLists all contextscontext::list
context::removeDeletes a contextcontext::remove "ctx"
context::resetResets the state of a specified contextcontext::reset "ctx"
context::saveSaves context statecontext::save "ctx"
context::loadLoads context statecontext::load "ctx"
context::updateUpdates an existing context with new datacontext::update "ctx", key=x, value=y

Packages

OperatorDescriptionExample
package::addInstalls a packagepackage::add "suricata-ocsf"
package::listShows installed packagespackage::list
package::removeUninstalls a packagepackage::remove "suricata-ocsf"

Escape Hatches

OperatorDescriptionExample
pythonExecutes a Python snippet for each eventpython "self.x = self.y"
shellRuns a shell command within the pipelineshell "./process.sh | tee copy.txt"