Skip to content

In this tutorial you’ll learn how to map events to Open Cybersecurity Schema Framework (OCSF). We walk you through an example of events from a network monitor and show how you can use Tenzir pipelines to transform them into OCSF-compliant events.

NormalizeEnrichCollectRouteSIEMSOARStorageEndpointNetworkCloud

The diagram above illustrates the data lifecycle and shows where the OCSF mapping takes place: you collect data from various data sources, each of which has a different shape, and then convert them to a standardized representation. Normalization decouples data acquisition from downstream analytics, so you can scale each process independently.

OCSF is a vendor-agnostic event schema, also known as a taxonomy, that defines structure and semantics for security events. Here are some key terms you need to know to map events:

  • Attribute: a unique identifier for a specific type, such as parent_folder of type String or observables of type Observable Array.
  • Event Class: a description of an event that uses specific attributes, such as HTTP Activity and Detection Finding.
  • Category: a group of event classes, such as System Activity or Findings.

The diagram below illustrates how subsets of attributes form an event class:

AttributeNameData TypeAttribute DictionaryAttributeNameData TypeAttributeNameData Type...Event ClassAttributeNameData TypeAttributeNameData TypeNetwork ActivitySource Endpointsrc_endpointobjectDestination Endpointdst_endpointobjectConnection Infoconnection_infoobject...Example Event ClassA fixed subset of attributesdefines an event classrequiredoptionalrecommendedThere exist3 attribute requirement flagsrequiredrecommendedrecommended

The Base Event Class is a special event class that appears in every event class. Think of it as a mixin of attributes that OCSF automatically includes:

...Event ClassAttributeNameData TypeAttributeNameData TypeBase Event Class+Event TimetimeTimestampActivity IDactivity_idintegerRaw Dataraw_dataStringMessagemessageString...=Event ClassEvent TimetimeTimestampActivity IDactivity_idintegerRaw Dataraw_dataStringMessagemessageStringAttributeNameData TypeAttributeNameData Type...ClassificationBase Event ClassActivity IDactivity_idIntegerrequiredOccurrenceEvent TimetimeTimestamprequiredContextRaw Dataraw_dataStringoptionalPrimaryMessagemessageStringrecommendedImportant for the taxonomyand schema itselfTemporal characteristicsabout when the event happenedEnhancements that add extradata, e.g., enrichmentsDefines the key semanticsof the given eventThe Base Event classis part of every event classAttributes are grouped intocategories within an event classBase Attributes are part ofevery Event Class

For this tutorial, we look at OCSF from the perspective of the mapper persona, i.e., as someone who converts existing events into the OCSF schema. OCSF also defines three other personas, author, producer, and analyst. This tutorial doesn’t cover them. Our mission as mapper is to study the event semantics of the data source we want to map, and translate the event to the appropriate OCSF event class.

Let’s map some Zeek logs to OCSF!

Zeek generates logs in tab-separated values (TSV) or JSON format. Here’s an example of a connection log in TSV format:

conn.log (TSV)
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path conn
#open 2023-03-07-10-23-46
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p id.vlan id.vlan_inner proto service duration orig_bytes resp_bytes conn_state local_orig local_resp missed_bytes history orig_pkts orig_ip_bytes resp_pkts resp_ip_bytes tunnel_parents vlan inner_vlan orig_l2_addr resp_l2_addr geo.orig.country_code geo.orig.region geo.orig.city geo.orig.latitude geo.orig.longitude geo.resp.country_code geo.resp.region geo.resp.city geo.resp.latitude geo.resp.longitude community_id
#types time string addr port addr port int int enum string interval count count string bool bool count string count count count count set[string] int int string string string string string double double string string string double double string
1637155963.237882 CZwqhx3td8eTfCSwJb 128.14.134.170 57468 198.71.247.91 80 - - tcp http 5.162805 205 278 SF - - 0 ShADadfF 6 525 5 546 - - - 64:9e:f3:be:db:66 00:16:3c:f1:fd:6d US CA Los Angeles 34.0544 -118.2441 US VA Ashburn 39.0469 -77.4903 1:YXWfTYEyYLKVv5Ge4WqijUnKTrM=
1637157758.165570 CnrwFesjfOhI3fuu1 45.137.23.27 47958 198.71.247.91 53 - - udp dns - - - S0 - - 0 D 1 58 0 0 - - - 64:9e:f3:be:db:66 00:16:3c:f1:fd:6d BD - - 23.7018 90.3742 US VA Ashburn 39.0469 -77.4903 1:0nZC/6S/pr+IceCZ04RjDZbX+KI=
1637229399.549141 CBTne9tomX1ktuCQa 10.4.21.101 53824 107.23.103.216 587 - - tcp smtp 606.747526 975904 11950 SF - - 0 ShAdDaTtTfF 1786 1069118 1070 55168 - - - 00:08:02:1c:47:ae 20:e5:2a:b6:93:f1 - - - - - US VA Ashburn 39.0469 -77.4903 1:I6VoTvbCqaKvPrlFnNbRRbjlMsc=

You can also download this sample to avoid dealing with tabs and spaces in the snippet above.

We first parse the log file into a structured form so that we can work with the individual fields. The read_zeek_tsv operator parses the above structure out of the box:

Terminal window
tenzir 'read_zeek_tsv' < conn.log
Output
{
ts: 2021-11-17T13:32:43.237881856Z,
uid: "CZwqhx3td8eTfCSwJb",
id: {
orig_h: 128.14.134.170,
orig_p: 57468,
resp_h: 198.71.247.91,
resp_p: 80,
},
proto: "tcp",
service: "http",
duration: 5.162805s,
orig_bytes: 205,
resp_bytes: 278,
conn_state: "SF",
local_orig: null,
local_resp: null,
missed_bytes: 0,
history: "ShADadfF",
orig_pkts: 6,
orig_ip_bytes: 525,
resp_pkts: 5,
resp_ip_bytes: 546,
tunnel_parents: null,
community_id: "1:YXWfTYEyYLKVv5Ge4WqijUnKTrM=",
_write_ts: null,
}
{
ts: 2021-11-17T14:02:38.165570048Z,
uid: "CnrwFesjfOhI3fuu1",
id: {
orig_h: 45.137.23.27,
orig_p: 47958,
resp_h: 198.71.247.91,
resp_p: 53,
},
proto: "udp",
service: "dns",
duration: null,
orig_bytes: null,
resp_bytes: null,
conn_state: "S0",
local_orig: null,
local_resp: null,
missed_bytes: 0,
history: "D",
orig_pkts: 1,
orig_ip_bytes: 58,
resp_pkts: 0,
resp_ip_bytes: 0,
tunnel_parents: null,
community_id: "1:0nZC/6S/pr+IceCZ04RjDZbX+KI=",
_write_ts: null,
}
{
ts: 2021-11-18T09:56:39.549140992Z,
uid: "CBTne9tomX1ktuCQa",
id: {
orig_h: 10.4.21.101,
orig_p: 53824,
resp_h: 107.23.103.216,
resp_p: 587,
},
proto: "tcp",
service: "smtp",
duration: 10.112458766666666min,
orig_bytes: 975904,
resp_bytes: 11950,
conn_state: "SF",
local_orig: null,
local_resp: null,
missed_bytes: 0,
history: "ShAdDaTtTfF",
orig_pkts: 1786,
orig_ip_bytes: 1069118,
resp_pkts: 1070,
resp_ip_bytes: 55168,
tunnel_parents: null,
community_id: "1:I6VoTvbCqaKvPrlFnNbRRbjlMsc=",
_write_ts: null,
}

Now that we have decomposed the data into its atomic values, we can map them to the corresponding OCSF fields.

To map fields, you must first identify the appropriate OCSF event class. In our example, the corresponding event class in OCSF is Network Activity. We use OCSF v1.8.0 throughout this tutorial.

To make the mapping process more organized, we map per attribute group. The schema has four groups:

  1. Classification: Important for the taxonomy and schema itself
  2. Occurrence: Temporal characteristics about when the event happened
  3. Context: Auxiliary information about the event
  4. Primary: Defines the key semantics of the given event

Here’s a template for the mapping pipeline:

// --- Preamble ---------------------------------
// Keep the source event in a source-specific working namespace.
this = { zeek: this }
// (2) Populate the OCSF event. Style-wise, we recommend using one coherent
// block of TQL per OCSF attribute group to provide a bit of structure for the
// reader.
// --- OCSF: classification attributes ----------
@name = "ocsf.network_activity"
ocsf.category_uid = 4
ocsf.class_uid = 4001
ocsf.activity_id = 6
ocsf.severity_id = 1
// ...fill out remaining classification attributes.
// --- OCSF: occurrence attributes --------------
ocsf.time = move zeek.ts // 👈 remove source field while mapping
if zeek.duration != null {
ocsf.end_time = ocsf.time + zeek.duration
ocsf.duration = count_milliseconds(move zeek.duration).round()
}
ocsf.start_time = ocsf.time
// ...fill out remaining occurrence attributes; add timezone_offset if known.
// --- OCSF: context attributes -----------------
ocsf.metadata = {
product: {
name: "Zeek",
vendor_name: "Zeek",
},
profiles: [], // 👈 add profiles as needed, such as ["host", "network_proxy"]
original_event_uid: move zeek.uid,
version: "1.8.0",
}
// ...fill out remaining context attributes.
// --- OCSF: primary attributes -----------------
ocsf.src_endpoint = {
ip: move zeek.id.orig_h,
port: move zeek.id.orig_p,
}
ocsf.dst_endpoint = {
ip: move zeek.id.resp_h,
port: move zeek.id.resp_p,
}
ocsf.is_src_dst_assignment_known = true
// ...fill out remaining primary attributes.
drop zeek.id // 👈 remove source field after mapping
// --- OCSF: profile-specific attributes --------
// Add profile-specific fields here based on metadata.profiles.
// For the host profile: ocsf.actor.user, ocsf.device, etc.
// ...
// --- Epilogue ---------------------------------
// (3) Return the mapped OCSF event and preserve mapping residue.
this = {...ocsf, unmapped: zeek}
// (4) Expand minimal OCSF and validate the final schema shape.
ocsf::derive
ocsf::cast

Let’s unpack this:

  1. With this = { zeek: this }, we keep the original event in a source-specific working namespace. This approach avoids name clashes when we create new OCSF fields in the next steps.
  2. The main work takes place here. Our approach is structured: for every field in the source event, (1) map it, and (2) remove it. Ideally, use the move keyword to perform (1) and (2) together, for example, ocsf.x = move zeek.y. If a field needs to be used multiple times in the same expression, use the drop afterwards.
  3. Assign @name to the target OCSF event class so that downstream pipelines can filter by schema.
  4. The assignment this = {...ocsf, unmapped: zeek} returns the mapped OCSF event and stores the remaining source fields as unmapped for review.
  5. The mapper intentionally produces minimal OCSF: it maps identifiers and source-derived semantics, but does not hand-write derived sibling fields. ocsf::derive expands the event with fields such as activity_name, category_name, and severity before ocsf::cast validates the final event against the OCSF schema.

Now that we have a template, let’s get our hands dirty and go deep into the actual mapping.

The classification attributes are important for the schema. Mapping them is mechanical and mostly involves reviewing the schema docs.

ocsf.activity_id = 6
ocsf.activity_name = "Traffic"
ocsf.category_uid = 4
ocsf.category_name = "Network Activity"
ocsf.class_uid = 4001
ocsf.class_name = "Network Activity"
ocsf.severity_id = 1
ocsf.severity = "Informational"
ocsf.type_uid = ocsf.class_uid * 100 + ocsf.activity_id

Note that computing the field type_uid requires simple arithmetic. In package mappers, prefer minimal OCSF: set the IDs and source-derived values, but let ocsf::derive populate sibling fields. This keeps mappings concise and prevents repetitive label assignments. The shorter form is:

ocsf.activity_id = 6
ocsf.category_uid = 4
ocsf.class_uid = 4001
ocsf.severity_id = 1
ocsf.type_uid = ocsf.class_uid * 100 + ocsf.activity_id
ocsf::derive

The computed sibling fields for <field>_id often have the pattern <field>_name or simply <field>. Run ocsf::derive after the mapper returns the minimal OCSF event so downstream pipelines still receive the comprehensive schema shape.

Let’s tackle the occurrence group. These attributes are all about time.

ocsf.time = move zeek.ts
if zeek.duration != null {
ocsf.end_time = ocsf.time + zeek.duration
ocsf.duration = count_milliseconds(move zeek.duration).round()
}
ocsf.start_time = ocsf.time

When Zeek records a connection duration, using + with a value of type time and duration yields a new time value, just as you’d expect. The OCSF duration attribute itself is an integer millisecond count, so we assign that field only after deriving end_time.

The context attributes provide auxiliary information. Most notably, the metadata attribute holds data-source specific information. Even though unmapped belongs to this group, we deal with it at the very end.

ocsf.metadata = {
log_name: "conn.log",
logged_time: move zeek._write_ts?,
product: {
name: "Zeek",
vendor_name: "Zeek",
cpe_name: "cpe:2.3:a:zeek:zeek",
},
original_event_uid: move zeek.uid,
version: "1.8.0",
}
drop zeek._path? // implied in metadata.log_name
ocsf.app_protocol_name = move zeek.service

We use ? when accessing fields that are not always present, such as zeek._write_ts?. If you omit the ?, the pipeline emits a warning when _write_ts is missing.

Zeek’s service field identifies the application-layer protocol, so map it to app_protocol_name. Reserve app_name for a specific network application, such as an App-ID result.

The primary attributes define the semantics of the event class itself. This is where the core value of the data is, as we are mapping the most event-specific information.

ocsf.src_endpoint = {
ip: move zeek.id.orig_h,
port: move zeek.id.orig_p,
}
ocsf.dst_endpoint = {
ip: move zeek.id.resp_h,
port: move zeek.id.resp_p,
}
ocsf.is_src_dst_assignment_known = true
ocsf.src_endpoint.mac = move zeek.orig_l2_addr?
ocsf.dst_endpoint.mac = move zeek.resp_l2_addr?
if zeek.geo.orig? != null {
ocsf.src_endpoint.location = {
country: move zeek.geo.orig.country_code?,
region: move zeek.geo.orig.region?,
city: move zeek.geo.orig.city?,
lat: move zeek.geo.orig.latitude?,
long: move zeek.geo.orig.longitude?,
}
}
if zeek.geo.resp? != null {
ocsf.dst_endpoint.location = {
country: move zeek.geo.resp.country_code?,
region: move zeek.geo.resp.region?,
city: move zeek.geo.resp.city?,
lat: move zeek.geo.resp.latitude?,
long: move zeek.geo.resp.longitude?,
}
}
drop zeek.geo?
// Here, we use `drop` because we simply want to get rid of the intermediate
// `id` record that we already mapped above.
drop zeek.id
// Locality of reference: we define the protocol numbers close where they are
// used for easier readability.
let $proto_nums = {
tcp: 6,
udp: 17,
icmp: 1,
icmpv6: 58,
ipv6: 41,
}
ocsf.connection_info = {
community_uid: move zeek.community_id?,
protocol_name: move zeek.proto,
}
ocsf.connection_info.protocol_num = $proto_nums[ocsf.connection_info.protocol_name]? else -1
// Use `if` for binary predicates and `match` for finite case dispatch.
if ocsf.src_endpoint.ip.is_v6() or ocsf.dst_endpoint.ip.is_v6() {
ocsf.connection_info.protocol_ver_id = 6
} else {
ocsf.connection_info.protocol_ver_id = 4
}
match ({orig: zeek.local_orig, resp: zeek.local_resp}) {
{orig: true, resp: true} => {
ocsf.connection_info.direction = "Lateral"
ocsf.connection_info.direction_id = 3
}
{orig: true, resp: false} => {
ocsf.connection_info.direction = "Outbound"
ocsf.connection_info.direction_id = 2
}
{orig: false, resp: true} => {
ocsf.connection_info.direction = "Inbound"
ocsf.connection_info.direction_id = 1
}
_ => {
ocsf.connection_info.direction = "Unknown"
ocsf.connection_info.direction_id = 0
}
}
drop zeek.local_orig, zeek.local_resp
// The `status` attribute in OCSF is a success indicator. While we could use
// `zeek.conn_state` to extract success/failure, this would go beyond
// the tutorial.
ocsf.status_id = 99
ocsf.status = "Other"
ocsf.status_code = move zeek.conn_state
ocsf.traffic = {
bytes_in: move zeek.resp_bytes,
bytes_out: move zeek.orig_bytes,
bytes_missed: move zeek.missed_bytes,
packets_in: move zeek.resp_pkts,
packets_out: move zeek.orig_pkts,
}
if ocsf.traffic.bytes_out? != null and ocsf.traffic.bytes_in? != null {
ocsf.traffic.bytes = ocsf.traffic.bytes_out + ocsf.traffic.bytes_in
}
ocsf.traffic.packets = ocsf.traffic.packets_out + ocsf.traffic.packets_in
if zeek.tunnel_parents? == null {
drop zeek.tunnel_parents?
}
if zeek.vlan? == null {
drop zeek.vlan?
}
if zeek.inner_vlan? == null {
drop zeek.inner_vlan?
}

Here’s what happens here:

  • The expression $proto_nums[ocsf.connection_info.protocol_name] takes the moved Zeek proto value and uses it as an index into a static record $proto_nums. Add a ? at the end to avoid warnings when the lookup returns null, and use the inline else expression for the fallback value.
  • Set is_src_dst_assignment_known because Zeek’s id.orig_* fields identify the connection initiator and id.resp_* fields identify the responder.
  • Move Layer 2 addresses and GeoIP enrichment into endpoint attributes instead of leaving them in unmapped.
  • Use fnis_v6 on the connection IPs to identify IPv6 connections.

When we combine all TQL snippets from above, we get the following output:

{
activity_id: 6,
activity_name: "Traffic",
app_protocol_name: "http",
category_name: "Network Activity",
category_uid: 4,
class_name: "Network Activity",
class_uid: 4001,
connection_info: {
community_uid: "1:YXWfTYEyYLKVv5Ge4WqijUnKTrM=",
direction: "Unknown",
direction_id: 0,
flag_history: "ShADadfF",
protocol_name: "tcp",
protocol_num: 6,
protocol_ver: "Internet Protocol version 4 (IPv4)",
protocol_ver_id: 4,
},
dst_endpoint: {
ip: 198.71.247.91,
location: {
city: "Ashburn",
country: "US",
lat: 39.0469,
long: -77.4903,
region: "VA",
},
mac: "00:16:3c:f1:fd:6d",
port: 80,
},
duration: 5163,
end_time: 2021-11-17T13:32:48.400686856Z,
is_src_dst_assignment_known: true,
metadata: {
log_name: "conn.log",
logged_time: null,
original_event_uid: "CZwqhx3td8eTfCSwJb",
product: {
cpe_name: "cpe:2.3:a:zeek:zeek",
name: "Zeek",
vendor_name: "Zeek",
},
version: "1.8.0",
},
severity: "Informational",
severity_id: 1,
src_endpoint: {
ip: 128.14.134.170,
location: {
city: "Los Angeles",
country: "US",
lat: 34.0544,
long: -118.2441,
region: "CA",
},
mac: "64:9e:f3:be:db:66",
port: 57468,
},
start_time: 2021-11-17T13:32:43.237881856Z,
status: "Other",
status_code: "SF",
status_id: 99,
time: 2021-11-17T13:32:43.237881856Z,
traffic: {
bytes: 483,
bytes_in: 278,
bytes_missed: 0,
bytes_out: 205,
packets: 11,
packets_in: 5,
packets_out: 6,
},
type_name: "Network Activity: Traffic",
type_uid: 400106,
unmapped: {
orig_ip_bytes: 525,
resp_ip_bytes: 546,
},
}
{
activity_id: 6,
activity_name: "Traffic",
app_protocol_name: "dns",
category_name: "Network Activity",
category_uid: 4,
class_name: "Network Activity",
class_uid: 4001,
connection_info: {
community_uid: "1:0nZC/6S/pr+IceCZ04RjDZbX+KI=",
direction: "Unknown",
direction_id: 0,
flag_history: "D",
protocol_name: "udp",
protocol_num: 17,
protocol_ver: "Internet Protocol version 4 (IPv4)",
protocol_ver_id: 4,
},
dst_endpoint: {
ip: 198.71.247.91,
location: {
city: "Ashburn",
country: "US",
lat: 39.0469,
long: -77.4903,
region: "VA",
},
mac: "00:16:3c:f1:fd:6d",
port: 53,
},
is_src_dst_assignment_known: true,
metadata: {
log_name: "conn.log",
logged_time: null,
original_event_uid: "CnrwFesjfOhI3fuu1",
product: {
cpe_name: "cpe:2.3:a:zeek:zeek",
name: "Zeek",
vendor_name: "Zeek",
},
version: "1.8.0",
},
severity: "Informational",
severity_id: 1,
src_endpoint: {
ip: 45.137.23.27,
location: {
city: null,
country: "BD",
lat: 23.7018,
long: 90.3742,
region: null,
},
mac: "64:9e:f3:be:db:66",
port: 47958,
},
start_time: 2021-11-17T14:02:38.165570048Z,
status: "Other",
status_code: "S0",
status_id: 99,
time: 2021-11-17T14:02:38.165570048Z,
traffic: {
bytes_in: null,
bytes_missed: 0,
bytes_out: null,
packets: 1,
packets_in: 0,
packets_out: 1,
},
type_name: "Network Activity: Traffic",
type_uid: 400106,
unmapped: {
orig_ip_bytes: 58,
resp_ip_bytes: 0,
},
}

There are still several fields that we can map to the schema, but we’ll leave this as an exercise for the reader.

Recap: Understand the OCSF pipeline architecture

Section titled “Recap: Understand the OCSF pipeline architecture”

Most pipelines (1) onboard data from a source, (2) transform it, and (3) send it somewhere. This tutorial focuses on the middle piece, with transformation being the mapping to OCSF.

We’ve addressed data onboarding by reading a log file and decomposing the unstructured Zeek TSV contents into a structured record. The built-in read_zeek_tsv operator made this trivial. But it often requires a lot more elbow grease to get there. Check out our extensive guide on parsing string fields for more details. We haven’t yet addressed the other end of the pipeline: data offboarding. Our examples run tenzir on the command line, relying on an implicit output operator that writes the result of the last transformation to the terminal.

In other words, we have a sandwich structure in our pipeline. To make it explicit:

// (1) Onboard data (explicit)
from_stdin
read_zeek_tsv
// (2) Map to OCSF
// ...
// Lots of TQL here!
// ...
// (3) Offboard data (implicit)
to_stdout

Such a pipeline is impractical because data may arrive via multiple channels: log files, Kafka messages, or via Syslog over the wire. Even the encoding may vary. Zeek TSV is one way you can configure Zeek, but JSON output is another format. Similarly, users may want to consume the data in various ways.

In theory, we’re done now. We have a working mapping. It’s just not yet very (re)usable. To turn it into a package capability, we split the pipeline into independently usable snippets. The next section describes how to achieve this.

To make our OCSF mapping more reusable, we extract it as a user-defined operator and put it into a package. There is an entire tutorial on writing packages, but all you need to know right now is that packages are one-click installable bundles of operators, examples, tests, and deployable pipelines. After installation, you can call the newly introduced mapping operators from any pipeline.

Break down complexity with user-defined operators

Section titled “Break down complexity with user-defined operators”

Let’s work towards a package that comes with a user-defined operator called zeek::ocsf::map that maps Zeek connection logs to OCSF:

from_stdin
read_zeek_tsv
zeek::ocsf::map // 👈 Returns OCSF events with a package UDO.
ocsf::derive
ocsf::cast
to_stdout

All you have to do to get there is create a package with following directory structure:

  • Directoryzeek/
    • Directoryexamples/
    • Directoryoperators/ 👈 user-defined operators go here
    • Directorypipelines/
    • Directorytests/
    • package.yaml

Notice how zeek::ocsf::map has two modules that are colon-separated: zeek and ocsf. The directory structure in the package determines the module hierarchy:

  • Directoryzeek/
    • Directoryoperators/
      • Directoryocsf/
        • map.tql 👈 Exposes zeek::ocsf::map as operator

Since operators fully compose, you can implement zeek::ocsf::map as one main mapper that performs source-specific cleanup, sets shared fields in the ocsf record, and dispatches to event-specific operators for each log type.

zeek/operators/ocsf/map.tql
// Keep the parsed source event around while event-specific operators move
// fields into their OCSF homes.
this = { zeek: this }
ocsf.metadata = {
product: {
name: "Zeek",
vendor_name: "Zeek",
cpe_name: "cpe:2.3:a:zeek:zeek",
},
version: "1.8.0",
}
ocsf.severity_id = 1
// Dispatch mappings based on schema name. This assumes that you've parsed the
// input logs and added the appropriate @name based on the log type. The
// read_zeek_tsv operator does this automatically. You could also dispatch based
// on any other stable discriminator, such as _path or event_type.
match @name {
"zeek.conn" => {
// Map "conn.log" events
zeek::ocsf::events::conn
}
_ => {
// Base Event: unknown or unsupported Zeek log type
zeek::ocsf::base
}
}
// Return the mapped OCSF event from the operator.
this = {...ocsf, unmapped: zeek}

In this layout, you’d put the mapping operators in the following directories:

  • Directoryzeek/
    • Directoryoperators/
      • Directoryocsf/
        • Directoryevents/ 👈 Exposes zeek::ocsf::events::* operators
          • conn.tql
        • base.tql
        • map.tql

The mapper keeps intermediate results under ocsf internally and returns a minimal OCSF event. Callers then run shared OCSF helpers such as ocsf::derive and ocsf::cast to expand and validate the final shape. This mirrors how larger package mappers handle cleanup, shared fields, fallback Base Event mapping, and event-specific mappings.

Write tests for production-grade reliability

Section titled “Write tests for production-grade reliability”

To achieve production-grade quality of your mappings, you must ensure that they do what they promise. In practice, this means shipping tests along the mappings: given a mapping, test whether a provided input produces a valid output.

This is where our test framework comes into play. Put your test scenarios in tests/ and sample data into tests/inputs:

  • Directoryzeek/
    • Directorytests/
      • Directoryinputs/ 👈 raw log samples
        • conn.log
      • map_one.tql 👈 test scenario
      • map_one.txt 👈 expected output for test scenario

Here’s an example of the test:

zeek/tests/map_one.tql
from_file f"{env("TENZIR_INPUTS")}/conn.log" {
read_zeek_tsv
}
zeek::ocsf::map
ocsf::derive
ocsf::cast
sort time
head 2

Now run the test framework in the package directory:

Terminal window
uvx tenzir-test
i executing project: zeek (.)
i running 1 tests (44 jobs) in project .
i 1× tenzir (v5.16.0+gc0a0c3ba49)
✘ tests/map_one.tql
└─▶ Failed to find ref file: "../docs/public/packages/zeek/tests/map_one.txt"
i ran 1 test: 0 passed (0%) / 1 failed (100%)
passed (100%) / 0 failed (0%)

There is no baseline for the test yet, let’s generate it via --update

Terminal window
uvx tenzir-test --update

Now there’s a *.txt file next to the test scenario. Verify that it has the expected output. Alternatively, use uvx tenzir-test --passthrough to print the output to the terminal for inline inspection.

Generated *.txt test scenario baseline
zeek/tests/map_one.txt
{
activity_id: 6,
activity_name: "Traffic",
app_protocol_name: "http",
category_name: "Network Activity",
category_uid: 4,
class_name: "Network Activity",
class_uid: 4001,
connection_info: {
community_uid: "1:YXWfTYEyYLKVv5Ge4WqijUnKTrM=",
direction: "Unknown",
direction_id: 0,
flag_history: "ShADadfF",
protocol_name: "tcp",
protocol_num: 6,
protocol_ver: "Internet Protocol version 4 (IPv4)",
protocol_ver_id: 4,
},
dst_endpoint: {
ip: 198.71.247.91,
location: {
city: "Ashburn",
country: "US",
lat: 39.0469,
long: -77.4903,
region: "VA",
},
mac: "00:16:3c:f1:fd:6d",
port: 80,
},
duration: 5163,
end_time: 2021-11-17T13:32:48.400686856Z,
is_src_dst_assignment_known: true,
metadata: {
log_name: "conn.log",
logged_time: null,
original_event_uid: "CZwqhx3td8eTfCSwJb",
product: {
cpe_name: "cpe:2.3:a:zeek:zeek",
name: "Zeek",
vendor_name: "Zeek",
},
version: "1.8.0",
},
severity: "Informational",
severity_id: 1,
src_endpoint: {
ip: 128.14.134.170,
location: {
city: "Los Angeles",
country: "US",
lat: 34.0544,
long: -118.2441,
region: "CA",
},
mac: "64:9e:f3:be:db:66",
port: 57468,
},
start_time: 2021-11-17T13:32:43.237881856Z,
status: "Other",
status_code: "SF",
status_id: 99,
time: 2021-11-17T13:32:43.237881856Z,
traffic: {
bytes: 483,
bytes_in: 278,
bytes_missed: 0,
bytes_out: 205,
packets: 11,
packets_in: 5,
packets_out: 6,
},
type_name: "Network Activity: Traffic",
type_uid: 400106,
unmapped: {
orig_ip_bytes: 525,
resp_ip_bytes: 546,
},
}
{
activity_id: 6,
activity_name: "Traffic",
app_protocol_name: "dns",
category_name: "Network Activity",
category_uid: 4,
class_name: "Network Activity",
class_uid: 4001,
connection_info: {
community_uid: "1:0nZC/6S/pr+IceCZ04RjDZbX+KI=",
direction: "Unknown",
direction_id: 0,
flag_history: "D",
protocol_name: "udp",
protocol_num: 17,
protocol_ver: "Internet Protocol version 4 (IPv4)",
protocol_ver_id: 4,
},
dst_endpoint: {
ip: 198.71.247.91,
location: {
city: "Ashburn",
country: "US",
lat: 39.0469,
long: -77.4903,
region: "VA",
},
mac: "00:16:3c:f1:fd:6d",
port: 53,
},
is_src_dst_assignment_known: true,
metadata: {
log_name: "conn.log",
logged_time: null,
original_event_uid: "CnrwFesjfOhI3fuu1",
product: {
cpe_name: "cpe:2.3:a:zeek:zeek",
name: "Zeek",
vendor_name: "Zeek",
},
version: "1.8.0",
},
severity: "Informational",
severity_id: 1,
src_endpoint: {
ip: 45.137.23.27,
location: {
city: null,
country: "BD",
lat: 23.7018,
long: 90.3742,
region: null,
},
mac: "64:9e:f3:be:db:66",
port: 47958,
},
start_time: 2021-11-17T14:02:38.165570048Z,
status: "Other",
status_code: "S0",
status_id: 99,
time: 2021-11-17T14:02:38.165570048Z,
traffic: {
bytes_in: null,
bytes_missed: 0,
bytes_out: null,
packets: 1,
packets_in: 0,
packets_out: 1,
},
type_name: "Network Activity: Traffic",
type_uid: 400106,
unmapped: {
orig_ip_bytes: 58,
resp_ip_bytes: 0,
},
}

After running uvx tenzir-test again without any flags, you get the following output:

i executing project: zeek (.)
i running 1 tests (44 jobs) in project .; update
i 1× tenzir (v5.16.0+gc0a0c3ba49)
✔ tests/map_one.tql
i ran 1 test: 1 passed (100%) / 0 failed (0%)

Perfect. Now proceed with all log types and you have a production-grade package.

After you have fleshed out the complete package, install it, either interactively via package::add, or IaC-style by putting it into a git repo and pointing the config option tenzir.package-dirs to it.

This tutorial showed you how to map security data to OCSF using TQL pipelines. You learned:

  • How OCSF events look like in high-level terms
  • How to structure your mapping pipeline using OCSF attribute groups (classification, occurrence, context, and primary)
  • How to use TQL operators and expressions to transform raw events into OCSF-compliant records
  • How to package your mappings as reusable operators in a Tenzir package

The key to successful OCSF mapping is systematic organization: keep source residue in zeek, map attributes group by group while removing source fields as you go, return the residue as unmapped, and produce minimal OCSF that ocsf::derive expands before ocsf::cast validates it. This approach keeps mappings maintainable while ensuring downstream pipelines receive the comprehensive schema shape.

For more examples and ready-to-use OCSF mappings, check out the Tenzir Community Library.

Last updated: