Skip to main content
Version: v4.24

Operators

Tenzir comes with a wide range of built-in pipeline operators.

Modify

OperatorDescriptionExample
setAssigns a value to a field, creating it if necessaryname = "Tenzir"
selectSelects some values and discard the restselect name, id=metadata.id
dropRemoves fields from the eventdrop name, metadata.id
enumerateAdds a field with the number of the eventenumerate num
unrollUnrolls a field of type list, duplicating the surrounding eventunroll names

Filter

OperatorDescriptionExample
whereKeeps only events matching a predicatewhere name.starts_with("John")
assertSame as where, but warns if predicate is falseassert name.starts_with("John")
tasteKeeps only N events of each typetaste 1
headKeeps only the first N eventshead 20
tailKeeps only the last N eventstail 20
sliceKeeps a range of events with an optional strideslice begin=10, end=30
sampleSamples events based on loadsample 30s, max_samples=2k

Analyze

OperatorDescriptionExample
summarizeAggregates events with implicit groupingsummarize name, sum(amount)
sortSorts the events by one or more expressionssort name, -abs(transaction)
reverseReverses the event orderreverse
topShows the most common valuestop user
rareShows the least common valuesrare auth.token

Flow Control

OperatorDescriptionExample
everyRestarts a pipeline periodicallyevery 10s { summarize sum(amount) }
forkForwards a copy of the events to another pipelinefork { to "copy.json" }
ifSplits the flow based on a predicateif transaction > 0 { … } else { … }
load_balanceRoutes the data to one of multiple subpipelinesload_balance $over { publish $over }

Input

OperatorDescriptionExample
diagnosticsReturns diagnostic events of managed pipelinesdiagnostics
exportRetrieves events from the nodeexport
from_velocira…Returns results from a Velociraptor serverfrom_velociraptor subscribe="Windows"
load_amqpLoads bytes from an AMQP serverload_amqp
load_fileLoads bytes from a fileload_file "/tmp/data.json"
load_ftpLoads bytes via FTPload_ftp "ftp.example.org"
load_google_c…Listen to a Google Cloud Pub/Sub subscriptionload_google_cloud_pubsub project_id=…
load_httpReceives bytes from a HTTP requestload_http "example.org", params={n: 5}
load_kafkaReceives bytes from an Apache Kafka topicload_kafka topic="example"
load_nicReceives bytes from a Network Interface Cardload_nic "eth0"
load_s3Receives bytes from an Amazon S3 objectload_s3 "s3://my-bucket/obj.csv"
load_sqsReceives bytes from an Amazon SQS queueload_sqs "sqs://tenzir"
load_tcpLoads bytes from a TCP or TLS connectionload_tcp "0.0.0.0:8090" { read_json }
load_udpLoads bytes from a UDP socketload_udp "0.0.0.0:8090"
load_zmqReceives bytes from ZeroMQ messagesload_zmq
metricsRetrieves metrics events from a Tenzir nodemetrics "cpu"
subscribeSubscribes to events of a certain topicsubscribe "topic"

Output

OperatorDescriptionExample
publishPublishes events to a certain topicpublish "topic"
importStores events at the nodeimport
discardDiscards incoming bytes or eventsdiscard
save_amqpSaves incoming bytes to an AMQP serversave_amqp
save_emailSaves incoming bytes through an SMTP serversave_email "user@example.org"
save_fileSaves incoming bytes into a filesave_file "/tmp/out.json"
save_ftpSaves incoming bytes via FTPsave_ftp "ftp.example.org"
save_google_cloud…Publishes to a Google Cloud Pub/Sub topicsave_google_cloud_pubsub project…
save_httpSends incoming bytes over a HTTP connectionsave_http "example.org/api"
save_kafkaSaves incoming bytes to an Apache Kafka topicsave_kafka topic="example"
save_s3Saves incoming bytes to an Amazon S3 objectsave_s3 "s3://my-bucket/obj.csv"
save_sqsSaves incoming bytes to an Amazon SQS queuesave_sqs "sqs://tenzir"
save_tcpSaves incoming bytes to a TCP or TLS connectionsave_tcp "0.0.0.0:8090", tls=true
save_udpSaves incoming bytes to a UDP socketsave_udp "0.0.0.0:8090"
save_zmqSaves incoming bytes to ZeroMQ messagessave_zmq
serveMakes events available at /serveserve "abcde12345"
to_azure_log_ana…Sends events to Azure Log Analyticsto_azure_log_analytics tenant_id=…
to_hiveWrites events using hive partitioningto_hive "s3://…", partition_by=[x]
to_splunkSends incoming events to a Splunk HECto_splunk "localhost:8088", …

Parsing

OperatorDescriptionExample
read_bitzParses Tenzir's internal wire formatread_bitz
read_cefParses the Common Event Formatread_cef
read_csvParses comma-separated valuesread_csv null_value="-"
read_featherParses Feather formatread_feather
read_gelfParses the Graylog Extended Log Formatread_gelf
read_grokParses events using a Grok patternread_grok "%{IP:client} %{WORD:action}"
read_jsonParses JSON objectsread_json arrays_of_objects=true
read_kvParses key-value pairsread_kv r"(\s+)[A-Z_]+:", r":\s*"
read_leefParses the Log Event Extended Formatread_leef
read_linesParses each line into a separate eventread_lines
read_ndjsonParses newline-delimited JSONread_ndjson
read_pcapParses raw network packets in PCAP formatread_pcap
read_parquetParses Parquet formatread_parquet
read_ssvParses space-separated valuesread_ssv header="name count"
read_suricataParses Suricata's Eve formatread_suricata
read_syslogParses syslogread_syslog
read_tsvParses tab-separated valuesread_tsv auto_expand=true
read_xsvParses custom-separated valuesread_xsv ";", ":", "N/A"
read_yamlParses YAMLread_yaml
read_zeek_jsonParses Zeek JSONread_zeek_json
read_zeek_tsvParses Zeek TSVread_zeek_tsv

Printing

OperatorDescriptionExample
write_bitzWrites events as Tenzir's internal wire formatwrite_bitz
write_csvWrites events as CSVwrite_csv
write_featherWrites events as Featherwrite_feather
write_jsonWrites events as JSONwrite_json
write_ndjsonWrites events as Newline-Delimited JSONwrite_ndjson
write_linesWrites events as lineswrite_lines
write_parquetWrites events as Parquetwrite_parquet
write_pcapWrites events as PCAPwrite_pcap
write_ssvWrites events as SSVwrite_ssv
write_tsvWrites events as TSVwrite_tsv
write_xsvWrites events as XSVwrite_xsv
write_yamlWrites events as YAMLwrite_yaml
write_zeek_tsvWrites events as Zeek TSVwrite_zeek_tsv

Node Inspection

OperatorDescriptionExample
configReturns the node's configurationconfig
fieldsLists all fields stored at the nodefields
openapiReturns the OpenAPI specificationopenapi
partitionsRetrieves metadata about events stored at the nodepartitions src_ip == 1.2.3.4
pluginsLists available pluginsplugins
schemasLists schemas for events stored at the nodeschemas
versionShows the current versionversion

Host Inspection

OperatorDescriptionExample
filesLists files in a directoryfiles "/var/log/", recurse=true
nicsLists available network interfacesnics
processesLists running processesprocesses
socketsLists open socketssockets

Detection

OperatorDescriptionExample
sigmaMatches incoming events against Sigma rulessigma "/tmp/rules/"
yaraMatches the incoming byte stream against YARA rulesyara "/path/to/rules", blockwise=true

Internals

OperatorDescriptionExample
apiCalls Tenzir's REST API from a pipelineapi "/pipeline/list"
batchControls the batch size of eventsbatch timeout=1s
bufferAdds additional buffering to handle spikesbuffer 10M, policy="drop"
cacheIn-memory cache shared between pipelinescache "w01wyhTZm3", ttl=10min
legacyProvides a compatibility fallback to TQL1 pipelineslegacy "chart area"
localForces a pipeline to run locallylocal { sort foo }
measureReturns events describing the incoming batchesmeasure
remoteForces a pipeline to run remotely at a noderemote { version }
throttleLimits the amount of data flowing throughthrottle 100M, within=1min
unorderedRemove ordering assumptions in a pipelineunordered { read_ndjson }

Encode & Decode

OperatorDescriptionExample
compressCompresses a stream of bytescompress "zstd", level=18
decompressDecompresses a stream of bytesdecompress "brotli"

Contexts

FunctionDescriptionExample
context::create_bloom_filterCreates a Bloom filter contextcontext::create_bloom_filter "ctx", capacity=1Mi, fp_probability=0.01
context::create_lookup_tableCreates a lookup table contextcontext::create_lookup_table "ctx"
context::create_geoipCreates a GeoIP context for IP-based geolocationcontext::create_geoip "ctx", db_path="GeoLite2-City.mmdb"
context::enrichEnriches with a contextcontext::enrich "ctx", key=x
context::inspectInspects the details of a specified contextcontext::inspect "ctx"
context::listLists all contextscontext::list
context::removeDeletes a contextcontext::remove "ctx"
context::resetResets the state of a specified contextcontext::reset "ctx"
context::saveSaves context statecontext::save "ctx"
context::loadLoads context statecontext::load "ctx"
context::updateUpdates an existing context with new datacontext::update "ctx", key=x, value=y

Packages

OperatorDescriptionExample
package::addInstalls a packagepackage::add "suricata-ocsf"
package::listShows installed packagespackage::list
package::removeUninstalls a packagepackage::remove "suricata-ocsf"

Uncategorized

OperatorDescriptionExample
delayDelays events relative to a start timedelay ts, speed=2.5
passDoes nothing with the inputpass
repeatRepeats the input after it has finishedrepeat 100
timeshiftAdjusts timestamps relative to a given start timetimeshift ts, start=2020-01-01
pythonExecutes a Python snippet for each eventpython "self.x = self.y"
shellRuns a shell command within the pipelineshell "./process.sh | tee copy.txt"