Skip to main content
Version: Next

Operators

Tenzir comes with a wide range of built-in pipeline operators.

Modify

OperatorDescriptionExample
setAssigns a value to a field, creating it if necessaryname = "Tenzir"
selectSelects some values and discard the restselect name, id=metadata.id
dropRemoves fields from the eventdrop name, metadata.id
enumerateAdds a field with the number of the eventenumerate num
unrollUnrolls a field of type list, duplicating the surrounding eventunroll names

Filter

OperatorDescriptionExample
whereKeeps only events matching a predicatewhere name.starts_with("John")
assertSame as where, but warns if predicate is falseassert name.starts_with("John")
tasteKeeps only N events of each typetaste 1
headKeeps only the first N eventshead 20
tailKeeps only the last N eventstail 20
sliceKeeps a range of events with an optional strideslice begin=10, end=30

Analyze

OperatorDescriptionExample
summarizeAggregates events with implicit groupingsummarize name, sum(amount)
sortSorts the events by one or more expressionssort name, -abs(transaction)
reverseReverses the event orderreverse
topShows the most common valuestop user
rareShows the least common valuesrare auth.token

Flow Control

OperatorDescriptionExample
everyRestarts a pipeline periodicallyevery 10s { summarize sum(amount) }
forkForwards a copy of the events to another pipelinefork { to "copy.json" }
ifSplits the flow based on a predicateif transaction > 0 { … } else { … }
load_balanceRoutes the data to one of multiple subpipelinesload_balance $over { publish $over }

Input

OperatorDescriptionExample
diagnosticsRetrieves diagnostic events of managed pipelinesdiagnostics
exportRetrieves events from the nodeexport
from_velociraptorReturns results from a Velociraptor serverfrom_velociraptor subscribe="Windows"
load_fileLoads bytes from a fileload_file "/tmp/data.json"
load_google…Listen to a Google Cloud Pub/Sub subscriptionload_google_cloud_pubsub "…", "…"
load_httpReceives bytes from a HTTP requestload_http "example.org", params={n: 5}
load_kafkaReceives bytes from an Apache Kafka topicload_kafka topic="example"
load_tcpLoads bytes from a TCP or TLS connectionload_tcp "0.0.0.0:8090" { read_json }
metricsRetrieves metrics events from a Tenzir nodemetrics "cpu"
subscribeSubscribes to events of a certain topicsubscribe "topic"

Output

OperatorDescriptionExample
publishPublishes events to a certain topicpublish "topic"
importStores events at the nodeimport
discardDiscards incoming bytes or eventsdiscard
save_fileSaves incoming bytes into a filesave_file "/tmp/out.json"
save_google_cloud…Publishes to a Google Cloud Pub/Sub topicsave_google_cloud_pubsub "…", "…"
save_httpSends incoming bytes over a HTTP connectionsave_http "example.org/api"
save_kafkaSaves incoming bytes to an Apache Kafka topicsave_kafka topic="example"
serveMakes events available at /serveserve "abcde12345"
to_azure_log_analyticsSends events to Azure Log Analyticsto_azure_log_analytics tenant_id=…
to_hiveWrites events using hive partitioningto_hive "s3://…", partition_by=[x]
to_splunkSends incoming events to a Splunk HECto_splunk "https://localhost:8088", …

Parsing

OperatorDescriptionExample
read_bitzParses Tenzir's internal wire formatread_bitz
read_cefParses the Common Event Formatread_cef
read_csvParses comma-separated valuesread_csv null_value="-"
read_gelfParses the Graylog Extended Log Formatread_gelf
read_grokParses events using a Grok patternread_grok "%{IP:client} %{WORD:action}"
read_jsonParses JSON objectsread_json arrays_of_objects=true
read_kvParses key-value pairsread_kv r"(\s+)[A-Z_]+:", r":\s*"
read_leefParses the Log Event Extended Formatread_leef
read_linesParses each line into a separate eventread_lines
read_ndjsonParses newline-delimited JSONread_ndjson
read_ssvParses space-separated valuesread_ssv header="name count"
read_suricataParses Suricata's Eve formatread_suricata
read_syslogParses syslogread_syslog
read_tsvParses tab-separated valuesread_tsv auto_expand=true
read_xsvParses custom-separated valuesread_xsv ";", ":", "N/A"
read_yamlParses YAMLread_yaml
read_zeek_jsonParses Zeek JSONread_zeek_json
read_zeek_tsvParses Zeek TSVread_zeek_tsv

Printing

OperatorDescriptionExample
write_bitzWrites events as Tenzir's internal wire formatwrite_bitz
write_jsonWrites events as JSONwrite_json ndjson=true

Internals

OperatorDescriptionExample
apiCalls Tenzir's REST API from a pipelineapi "/pipeline/list"
batchControls the batch size of eventsbatch timeout=1s
bufferAdds additional buffering to handle spikesbuffer 10M, policy="drop"
measureReturns events describing the incoming batchesmeasure
throttleLimits the amount of data flowing throughthrottle 100M, within=1min
cacheIn-memory cache shared between pipelinescache "w01wyhTZm3", ttl=10min
legacyProvides a compatibility fallback to TQL1 pipelineslegacy "chart area"

Node Inspection

OperatorDescriptionExample
configReturns the node's configurationconfig
fieldsLists all fields stored at the nodefields
openapiReturns the OpenAPI specificationopenapi
partitionsRetrieves metadata about events stored at the nodepartitions src_ip == 1.2.3.4
pluginsLists available pluginsplugins
schemasLists schemas for events stored at the nodeschemas
versionShows the current versionversion

Host Inspection

OperatorDescriptionExample
filesLists files in a directoryfiles "/var/log/", recurse=true
nicsLists available network interfacesnics
processesLists running processesprocesses
socketsLists open socketssockets

Uncategorized

OperatorDescriptionExample
compressCompresses a stream of bytescompress "zstd", level=18
decompressDecompresses a stream of bytesdecompress "brotli"
delayDelays events relative to a start timedelay ts, speed=2.5
passDoes nothing with the inputpass
repeatRepeats the input after it has finishedrepeat 100
sigmaMatches incoming events against Sigma rulessigma "/tmp/rules/"
timeshiftAdjusts timestamps relative to a given start timetimeshift ts, start=2020-01-01
yaraMatches the incoming byte stream against YARA rulesyara "/path/to/rules", blockwise=true
pythonExecutes a Python snippet for each eventpython "self.x = self.y"
shellRuns a shell command within the pipelineshell "./process.sh | tee copy.txt"