Shape data
Tenzir comes with numerous transformation operators that do change the the shape of their input and produce a new output. Here is a visual overview of transformations that you can perform over a data frame:
We'll walk through examples for each depicted operator, using the
M57 dataset. All examples assume that you have imported
the M57 sample data into a node, as explained in the
quickstart. We therefore start every pipeline with
export
.
Filter rows with where
Use where
to filter rows in the
input with an expression.
Filter by metadata using the #schema
selector:
export | where #schema == "suricata.alert"
Output
{
"timestamp": "2021-11-17T13:52:05.695469",
"flow_id": 1868285155318879,
"pcap_cnt": 143,
"vlan": null,
"in_iface": null,
"src_ip": "14.1.112.177",
"src_port": 38376,
"dest_ip": "198.71.247.91",
"dest_port": 123,
"proto": "UDP",
"event_type": "alert",
"community_id": null,
"alert": {
"app_proto": null,
"action": "allowed",
"gid": 1,
"signature_id": 2017919,
"rev": 2,
"signature": "ET DOS Possible NTP DDoS Inbound Frequent Un-Authed MON_LIST Requests IMPL 0x03",
"category": "Attempted Denial of Service",
"severity": 2,
"source": {
"ip": null,
"port": null
},
"target": {
"ip": null,
"port": null
},
"metadata": {
"created_at": [
"2014_01_03"
],
"updated_at": [
"2014_01_03"
]
}
},
"flow": {
"pkts_toserver": 2,
"pkts_toclient": 0,
"bytes_toserver": 468,
"bytes_toclient": 0,
"start": "2021-11-17T13:52:05.695391",
"end": null,
"age": null,
"state": null,
"reason": null,
"alerted": null
},
"payload": null,
"payload_printable": null,
"stream": null,
"packet": null,
"packet_info": {
"linktype": null
},
"app_proto": "failed"
}
(Only 1 out of 19 shown.)
Or by using type and field extractors:
export
| where 10.10.5.0/25 && (orig_bytes > 1 Mi || duration > 30 min)
Output
{
"ts": "2021-11-19T06:30:30.918301",
"uid": "C9T8pykxdsT7iSrc9",
"id": {
"orig_h": "10.10.5.101",
"orig_p": 50046,
"resp_h": "87.120.8.190",
"resp_p": 9090
},
"proto": "tcp",
"service": null,
"duration": "5.09m",
"orig_bytes": 1394538,
"resp_bytes": 95179,
"conn_state": "S1",
"local_orig": null,
"local_resp": null,
"missed_bytes": 0,
"history": "ShADad",
"orig_pkts": 5046,
"orig_ip_bytes": 1596390,
"resp_pkts": 5095,
"resp_ip_bytes": 298983,
"tunnel_parents": null,
"community_id": "1:UPodR2krvvXUGhc/NEL9kejd7FA=",
"_write_ts": null
}
{
"ts": "2021-11-19T07:05:44.694927",
"uid": "ChnTjeQncxZrb0ZWg",
"id": {
"orig_h": "10.10.5.101",
"orig_p": 50127,
"resp_h": "87.120.8.190",
"resp_p": 9090
},
"proto": "tcp",
"service": null,
"duration": "54.81s",
"orig_bytes": 1550710,
"resp_bytes": 97122,
"conn_state": "S1",
"local_orig": null,
"local_resp": null,
"missed_bytes": 0,
"history": "ShADadww",
"orig_pkts": 5409,
"orig_ip_bytes": 1767082,
"resp_pkts": 5477,
"resp_ip_bytes": 316206,
"tunnel_parents": null,
"community_id": "1:aw0CtkT7YikUZWyqdHwgLhqJXxU=",
"_write_ts": null
}
{
"ts": "2021-11-19T06:30:15.910850",
"uid": "CxuTEOgWv2Z74FCG6",
"id": {
"orig_h": "10.10.5.101",
"orig_p": 50041,
"resp_h": "87.120.8.190",
"resp_p": 9090
},
"proto": "tcp",
"service": null,
"duration": "36.48m",
"orig_bytes": 565,
"resp_bytes": 507,
"conn_state": "S1",
"local_orig": null,
"local_resp": null,
"missed_bytes": 0,
"history": "ShADad",
"orig_pkts": 78,
"orig_ip_bytes": 3697,
"resp_pkts": 77,
"resp_ip_bytes": 3591,
"tunnel_parents": null,
"community_id": "1:r337wYxbKPDv5Vkjoz3gGuld1bs=",
"_write_ts": null
}
The above example extracts connections from the subnet 10.10.5.0/25 that either have sent more than 1 MiB or lasted longer than 30 minutes.
Tenzir's expression language uses extractors to locate fields of interest.
If you don't know a field name but have concrete value, say an IP address,
you can apply a query over all schemas having fields of the ip
type by writing
:ip == 172.17.2.163
. The left-hand side of this predicate is a type
extractor, denoted by :T
for a type T
. The right-hand side is the IP
address literal 172.17.2.163
. You can go one step further and just write
172.17.2.163
as query. Tenzir infers the literal type and makes a predicate
out of it, i.e.,. x
, expands to :T == x
where T
is the type of x
. Under
the hood, the predicate all possible fields with type address and yields a
logical OR.
In the above example, the value 10.10.5.0/25
actually expands to the
expression :ip in 10.10.5.0/25 || :subnet == 10.10.5.0/25
, meaning, Tenzir
looks for any IP address field and performs a top-k prefix search, or any subnet
field where the value matches exactly.
Limit the output with head
and tail
Use the head
and
tail
operators to get the first or
last N records of the input.
The first 3 Zeek logs with IPs in 10.10.5.0/25:
export
| where #schema == /zeek.*/ && 10.10.5.0/25
| head 3
Output
{
"ts": "2021-11-19T04:28:06.186626",
"cause": "violation",
"analyzer_kind": "protocol",
"analyzer_name": "GSSAPI",
"uid": "CaHAWI2k6vB6BEOh65",
"fuid": null,
"id.orig_h": "10.10.5.101",
"id.orig_p": 49847,
"id.resp_h": "10.10.5.5",
"id.resp_p": 49667,
"id.vlan": null,
"id.vlan_inner": null,
"failure_reason": "Binpac exception: binpac exception: out_of_bound: ASN1EncodingMeta:more_len: 129 > 74",
"failure_data": null
}
{
"ts": "2021-11-19T04:28:06.186853",
"cause": "violation",
"analyzer_kind": "protocol",
"analyzer_name": "GSSAPI",
"uid": "CaHAWI2k6vB6BEOh65",
"fuid": null,
"id.orig_h": "10.10.5.101",
"id.orig_p": 49847,
"id.resp_h": "10.10.5.5",
"id.resp_p": 49667,
"id.vlan": null,
"id.vlan_inner": null,
"failure_reason": "Binpac exception: binpac exception: out_of_bound: ASN1EncodingMeta:more_len: 129 > 74",
"failure_data": null
}
{
"ts": "2021-11-19T04:28:06.187119",
"cause": "violation",
"analyzer_kind": "protocol",
"analyzer_name": "GSSAPI",
"uid": "CaHAWI2k6vB6BEOh65",
"fuid": null,
"id.orig_h": "10.10.5.101",
"id.orig_p": 49847,
"id.resp_h": "10.10.5.5",
"id.resp_p": 49667,
"id.vlan": null,
"id.vlan_inner": null,
"failure_reason": "Binpac exception: binpac exception: out_of_bound: ASN1EncodingMeta:more_len: 129 > 74",
"failure_data": null
}
tail
is blockingThe tail
operator must wait for its entire input, whereas head N
terminates
immediately after the first N
records have arrived. Use head
for
the majority of use cases and tail
only when you have to.
Pick fields with select
and drop
Use the select
operator to
restrict the output to a list of fields.
export
| where #schema == "suricata.alert"
| select src_ip, dest_ip, severity, signature
| head 3
Output
{
"src_ip": "8.218.64.104",
"dest_ip": "198.71.247.91",
"alert": {
"signature": "SURICATA UDPv4 invalid checksum",
"severity": 3
}
}
{
"src_ip": "14.1.112.177",
"dest_ip": "198.71.247.91",
"alert": {
"signature": "ET DOS Possible NTP DDoS Inbound Frequent Un-Authed MON_LIST Requests IMPL 0x03",
"severity": 2
}
}
{
"src_ip": "167.94.138.20",
"dest_ip": "198.71.247.91",
"alert": {
"signature": "SURICATA UDPv4 invalid checksum",
"severity": 3
}
}
Note that select
does not reorder the input fields. Use
put
for adjusting the field order.
Sample schemas with taste
The taste
operator provides a
sample of the first N events of every unique schemas in the dataflow. For
example, to get 5 unique samples:
export
| taste 1
| head 5
Output
{
"ts": "2021-11-17T13:54:01.721755",
"cause": "violation",
"analyzer_kind": "protocol",
"analyzer_name": "HTTP",
"uid": "Cqp7rtziLijlnrxYf",
"fuid": null,
"id.orig_h": "87.251.64.137",
"id.orig_p": 64078,
"id.resp_h": "198.71.247.91",
"id.resp_p": 80,
"id.vlan": null,
"id.vlan_inner": null,
"failure_reason": "not a http request line",
"failure_data": null
}
{
"ts": "2021-11-17T13:33:53.748229",
"ts_delta": "1.18m",
"peer": "zeek",
"gaps": 0,
"acks": 2,
"percent_lost": 0.0,
"_write_ts": null
}
{
"ts": "2021-11-17T13:32:46.565337",
"uid": "C5luJD1ATrGDOcouW2",
"id": {
"orig_h": "89.248.165.145",
"orig_p": 43831,
"resp_h": "198.71.247.91",
"resp_p": 52806
},
"proto": "tcp",
"service": null,
"duration": null,
"orig_bytes": null,
"resp_bytes": null,
"conn_state": "S0",
"local_orig": null,
"local_resp": null,
"missed_bytes": 0,
"history": "S",
"orig_pkts": 1,
"orig_ip_bytes": 40,
"resp_pkts": 0,
"resp_ip_bytes": 0,
"tunnel_parents": null,
"community_id": "1:c/CLmyk4xRElyzleEMhJ4Baf4Gk=",
"_write_ts": null
}
{
"ts": "2021-11-18T08:05:09.134638",
"uid": "Cwk5in34AvxJ8MurDh",
"id": {
"orig_h": "10.2.9.133",
"orig_p": 49768,
"resp_h": "10.2.9.9",
"resp_p": 135
},
"rtt": "254.0us",
"named_pipe": "135",
"endpoint": "epmapper",
"operation": "ept_map",
"_write_ts": null
}
{
"ts": "2021-11-18T08:00:21.486539",
"uids": [
"C4fKs01p1bdzLWvtQa"
],
"client_addr": "192.168.1.102",
"server_addr": "192.168.1.1",
"mac": "00:0b:db:63:58:a6",
"host_name": "m57-jo",
"client_fqdn": "m57-jo.",
"domain": "m57.biz",
"requested_addr": null,
"assigned_addr": "192.168.1.102",
"lease_time": "59.4m",
"client_message": null,
"server_message": null,
"msg_types": [
"REQUEST",
"ACK"
],
"duration": "163.82ms",
"trans_id": null,
"_write_ts": null
}
Add fields with put
and extend
The extend
operator appends new
fields to the input. The put
operator does the same but drops all non-referenced fields.
Here is an example that generates host pairs plus service for Zeek connection records. Think of the output is a the edges in graph, with the last column being the edge type.
export
| where #schema == "zeek.conn" && 10.10.5.0/25
| put id.orig_h, id.resp_h, service
| head
Output
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "77.75.230.91",
"service": "http"
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "10.10.5.5",
"service": "dns"
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "10.10.5.5",
"service": "dns"
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "20.189.173.1",
"service": null
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "20.189.173.1",
"service": "ssl"
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "52.109.8.21",
"service": "ssl"
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "10.10.5.5",
"service": "dns"
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "10.10.5.5",
"service": "dns"
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "20.54.88.152",
"service": "ssl"
}
{
"id.orig_h": "10.10.5.101",
"id.resp_h": "13.107.42.16",
"service": "ssl"
}
Give schemas and fields new names with rename
The rename
operator changes field
or schema names.
For example, rename the schema name and only print that afterwards:
export
| where #schema == "zeek.conn"
| rename flow=:zeek.conn
| put schema=#schema
| head 1
Output
{
"schema": "flow"
}
Rename a field:
export
| where #schema == "zeek.conn"
| rename src=id.orig_h, dst=id.resp_h
| put src, dst
| head
Output
{"src": "89.248.165.145", "dst": "198.71.247.91"}
{"src": "128.14.134.170", "dst": "198.71.247.91"}
{"src": "60.205.181.213", "dst": "198.71.247.91"}
{"src": "31.44.185.120", "dst": "198.71.247.91"}
{"src": "91.223.67.180", "dst": "198.71.247.91"}
{"src": "185.73.126.70", "dst": "198.71.247.91"}
{"src": "183.136.225.42", "dst": "198.71.247.91"}
{"src": "71.6.135.131", "dst": "198.71.247.91"}
{"src": "172.104.138.223", "dst": "198.71.247.91"}
{"src": "185.94.111.1", "dst": "198.71.247.91"}
Aggreate records with summarize
Use summarize
to group and
aggregate data.
export
| #schema == "suricata.alert"
| summarize count=count(src_ip) by severity
Output
{
"alert.severity": 1,
"count": 134644
}
{
"alert.severity": 2,
"count": 26780
}
{
"alert.severity": 3,
"count": 179713
}
Suricata alerts with lower severity are more critical, with severity 1 being the
highest. Let's group by alert signature containing the substring SHELLCODE
:
export
| where severity == 1
| summarize count=count(src_ip) by signature
| where /.*SHELLCODE.*/
Output
{
"alert.signature": "ET SHELLCODE Possible Call with No Offset TCP Shellcode",
"count": 2
}
{
"alert.signature": "ET SHELLCODE Possible %41%41%41%41 Heap Spray Attempt",
"count": 32
}
Reorder records with sort
Use sort
to arrange the output
records according to the order of a specific field.
export
| #schema == "suricata.alert"
| summarize count=count(src_ip) by severity
| sort count desc
Output
{
"alert.severity": 3,
"count": 179713
}
{
"alert.severity": 1,
"count": 134644
}
{
"alert.severity": 2,
"count": 26780
}
Deduplicate with unique
Use unique
to remove adjacent
duplicates. This operator comes in handy after a
sort
that arranges the input so
that duplicates lay next to each other:
export
| where #schema == "zeek.kerberos"
| put client
| sort client
| unique
| head
Output
{
"client": "/NM"
}
{
"client": "Administrator/EAGLEFREAKS"
}
{
"client": "DEKSTOP-D9UMVWL$/SIMONSAYSGO.NET"
}
{
"client": "DEKSTOP-VVCWQF5$/POLICYBARONS.COM"
}
{
"client": "DESKTOP-1-PC$/MAXSUGER.COM"
}
{
"client": "DESKTOP-1O7QAEA$/VICTORYPUNK.COM"
}
{
"client": "DESKTOP-2P2S7WR$/VICTORYPUNK.COM"
}
{
"client": "DESKTOP-30CQ14B$/FIRGREENTECH.COM"
}
{
"client": "DESKTOP-3KI6Y6G$/JIGGEDYJACK.COM"
}
{
"client": "DESKTOP-41SH6EJ$/DUCKKISSMIXER.COM"
}
To compute a unique list of values per group, use the distinct
aggregation
function in summarize
:
export
| where #schema == "zeek.conn"
| summarize sources=distinct(id.orig_h) by id.resp_h
| rename destination=id.resp_h
| head 3
Output
{
"destination": "192.168.201.13",
"sources": [
"10.12.14.101",
"10.12.17.101"
]
}
{
"destination": "192.168.62.104",
"sources": [
"10.12.14.101",
"10.12.17.101"
]
}
{
"destination": "10.0.177.137",
"sources": [
"10.7.5.133"
]
}
Profile the pipeline with measure
Use measure
to profile the input
and replace it with runtime statistics.
For example, one way to compute a histogram over the entire persisted dataset is to perform a full scan, replace the input with statistics, and then aggregate them by schema:
export
| measure
| summarize events=sum(events) by schema
| sort events desc
Output
{
"schema": "suricata.flow",
"events": 1129992
}
{
"schema": "zeek.conn",
"events": 583838
}
{
"schema": "suricata.alert",
"events": 341137
}
{
"schema": "suricata.dns",
"events": 289117
}
{
"schema": "suricata.http",
"events": 150736
}
{
"schema": "zeek.dns",
"events": 90013
}
{
"schema": "suricata.tls",
"events": 84608
}
{
"schema": "zeek.http",
"events": 75290
}
{
"schema": "zeek.telemetry",
"events": 72853
}
{
"schema": "suricata.smb",
"events": 67943
}
{
"schema": "zeek.ssl",
"events": 42389
}
{
"schema": "suricata.fileinfo",
"events": 35968
}
{
"schema": "suricata.dcerpc",
"events": 33055
}
{
"schema": "zeek.files",
"events": 21922
}
{
"schema": "zeek.dce_rpc",
"events": 19585
}
{
"schema": "zeek.analyzer",
"events": 14755
}
{
"schema": "suricata.anomaly",
"events": 8535
}
{
"schema": "zeek.notice",
"events": 5871
}
{
"schema": "suricata.smtp",
"events": 5208
}
{
"schema": "zeek.weird",
"events": 4617
}
{
"schema": "zeek.reporter",
"events": 3528
}
{
"schema": "suricata.krb5",
"events": 3378
}
{
"schema": "zeek.ocsp",
"events": 2874
}
{
"schema": "zeek.kerberos",
"events": 2708
}
{
"schema": "zeek.x509",
"events": 2379
}
{
"schema": "zeek.smtp",
"events": 1967
}
{
"schema": "zeek.smb_mapping",
"events": 1584
}
{
"schema": "zeek.stats",
"events": 1409
}
{
"schema": "zeek.ntp",
"events": 1224
}
{
"schema": "zeek.smb_files",
"events": 1140
}
{
"schema": "suricata.ftp",
"events": 954
}
{
"schema": "suricata.sip",
"events": 936
}
{
"schema": "zeek.dpd",
"events": 926
}
{
"schema": "suricata.dhcp",
"events": 648
}
{
"schema": "zeek.tunnel",
"events": 606
}
{
"schema": "zeek.sip",
"events": 565
}
{
"schema": "zeek.loaded_scripts",
"events": 512
}
{
"schema": "zeek.capture_loss",
"events": 476
}
{
"schema": "zeek.ntlm",
"events": 429
}
{
"schema": "zeek.pe",
"events": 315
}
{
"schema": "suricata.snmp",
"events": 288
}
{
"schema": "zeek.dhcp",
"events": 267
}
{
"schema": "zeek.snmp",
"events": 132
}
{
"schema": "suricata.tftp",
"events": 62
}
{
"schema": "suricata.stats",
"events": 12
}
{
"schema": "zeek.traceroute",
"events": 9
}
{
"schema": "zeek.ftp",
"events": 4
}
{
"schema": "suricata.ikev2",
"events": 2
}
{
"schema": "suricata.ftp_data",
"events": 1
}
{
"schema": "zeek.packet_filter",
"events": 1
}
{
"schema": "zeek.radius",
"events": 1
}
The above pipeline performs a full scan over the data at the node. Tenzir's pipeline optimizer pushes down predicates to avoid scans when possible. Consider this pipeline:
export
| where *.id.orig_h in 10.0.0.0/8
The optimizer coalesces the export
and where
operators such that
expression *.id.orig_h in 10.0.0.0/8
gets
pushed down to the index and storage layer.