In this tutorial you’ll learn how to map events to Open Cybersecurity Schema Framework (OCSF). We walk you through an example of events from a network monitor and show how you can use Tenzir pipelines to transform them into OCSF-compliant events.
The diagram above illustrates the data lifecycle and shows where the OCSF mapping takes place: you collect data from various data sources, each of which has a different shape, and then convert them to a standardized representation. Normalization decouples data acquisition from downstream analytics, so you can scale each process independently.
OCSF Primer
Section titled “OCSF Primer”OCSF is a vendor-agnostic event schema, also known as a taxonomy, that defines structure and semantics for security events. Here are some key terms you need to know to map events:
- Attribute: a unique identifier for a specific type, such as
parent_folderof typeStringorobservablesof typeObservable Array. - Event Class: a description of an event that uses specific attributes,
such as
HTTP ActivityandDetection Finding. - Category: a group of event classes, such as
System ActivityorFindings.
The diagram below illustrates how subsets of attributes form an event class:
The Base Event Class is a special event class that appears in every event class. Think of it as a mixin of attributes that OCSF automatically includes:
For this tutorial, we look at OCSF from the perspective of the mapper persona, i.e., as someone who converts existing events into the OCSF schema. OCSF also defines three other personas, author, producer, and analyst. This tutorial doesn’t cover them. Our mission as mapper is to study the event semantics of the data source we want to map, and translate the event to the appropriate OCSF event class.
Case Study: Zeek Logs
Section titled “Case Study: Zeek Logs”Let’s map some Zeek logs to OCSF!
Zeek generates logs in tab-separated values (TSV) or JSON format. Here’s an example of a connection log in TSV format:
#separator \x09#set_separator ,#empty_field (empty)#unset_field -#path conn#open 2023-03-07-10-23-46#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p id.vlan id.vlan_inner proto service duration orig_bytes resp_bytes conn_state local_orig local_resp missed_bytes history orig_pkts orig_ip_bytes resp_pkts resp_ip_bytes tunnel_parents vlan inner_vlan orig_l2_addr resp_l2_addr geo.orig.country_code geo.orig.region geo.orig.city geo.orig.latitude geo.orig.longitude geo.resp.country_code geo.resp.region geo.resp.city geo.resp.latitude geo.resp.longitude community_id#types time string addr port addr port int int enum string interval count count string bool bool count string count count count count set[string] int int string string string string string double double string string string double double string1637155963.237882 CZwqhx3td8eTfCSwJb 128.14.134.170 57468 198.71.247.91 80 - - tcp http 5.162805 205 278 SF - - 0 ShADadfF 6 525 5 546 - - - 64:9e:f3:be:db:66 00:16:3c:f1:fd:6d US CA Los Angeles 34.0544 -118.2441 US VA Ashburn 39.0469 -77.4903 1:YXWfTYEyYLKVv5Ge4WqijUnKTrM=1637157758.165570 CnrwFesjfOhI3fuu1 45.137.23.27 47958 198.71.247.91 53 - - udp dns - - - S0 - - 0 D 1 58 0 0 - - - 64:9e:f3:be:db:66 00:16:3c:f1:fd:6d BD - - 23.7018 90.3742 US VA Ashburn 39.0469 -77.4903 1:0nZC/6S/pr+IceCZ04RjDZbX+KI=1637229399.549141 CBTne9tomX1ktuCQa 10.4.21.101 53824 107.23.103.216 587 - - tcp smtp 606.747526 975904 11950 SF - - 0 ShAdDaTtTfF 1786 1069118 1070 55168 - - - 00:08:02:1c:47:ae 20:e5:2a:b6:93:f1 - - - - - US VA Ashburn 39.0469 -77.4903 1:I6VoTvbCqaKvPrlFnNbRRbjlMsc=You can also download this sample to avoid dealing with tabs and spaces in the snippet above.
Step 1: Parse the input
Section titled “Step 1: Parse the input”We first parse the log file into a structured form so that we can work with the
individual fields. The read_zeek_tsv operator parses the above
structure out of the box:
tenzir 'read_zeek_tsv' < conn.logOutput
{ ts: 2021-11-17T13:32:43.237881856Z, uid: "CZwqhx3td8eTfCSwJb", id: { orig_h: 128.14.134.170, orig_p: 57468, resp_h: 198.71.247.91, resp_p: 80, }, proto: "tcp", service: "http", duration: 5.162805s, orig_bytes: 205, resp_bytes: 278, conn_state: "SF", local_orig: null, local_resp: null, missed_bytes: 0, history: "ShADadfF", orig_pkts: 6, orig_ip_bytes: 525, resp_pkts: 5, resp_ip_bytes: 546, tunnel_parents: null, community_id: "1:YXWfTYEyYLKVv5Ge4WqijUnKTrM=", _write_ts: null,}{ ts: 2021-11-17T14:02:38.165570048Z, uid: "CnrwFesjfOhI3fuu1", id: { orig_h: 45.137.23.27, orig_p: 47958, resp_h: 198.71.247.91, resp_p: 53, }, proto: "udp", service: "dns", duration: null, orig_bytes: null, resp_bytes: null, conn_state: "S0", local_orig: null, local_resp: null, missed_bytes: 0, history: "D", orig_pkts: 1, orig_ip_bytes: 58, resp_pkts: 0, resp_ip_bytes: 0, tunnel_parents: null, community_id: "1:0nZC/6S/pr+IceCZ04RjDZbX+KI=", _write_ts: null,}{ ts: 2021-11-18T09:56:39.549140992Z, uid: "CBTne9tomX1ktuCQa", id: { orig_h: 10.4.21.101, orig_p: 53824, resp_h: 107.23.103.216, resp_p: 587, }, proto: "tcp", service: "smtp", duration: 10.112458766666666min, orig_bytes: 975904, resp_bytes: 11950, conn_state: "SF", local_orig: null, local_resp: null, missed_bytes: 0, history: "ShAdDaTtTfF", orig_pkts: 1786, orig_ip_bytes: 1069118, resp_pkts: 1070, resp_ip_bytes: 55168, tunnel_parents: null, community_id: "1:I6VoTvbCqaKvPrlFnNbRRbjlMsc=", _write_ts: null,}Now that we have decomposed the data into its atomic values, we can map them to the corresponding OCSF fields.
Step 2: Map to OCSF
Section titled “Step 2: Map to OCSF”To map fields, you must first identify the appropriate OCSF event class. In our example, the corresponding event class in OCSF is Network Activity. We use OCSF v1.8.0 throughout this tutorial.
To make the mapping process more organized, we map per attribute group. The schema has four groups:
- Classification: Important for the taxonomy and schema itself
- Occurrence: Temporal characteristics about when the event happened
- Context: Auxiliary information about the event
- Primary: Defines the key semantics of the given event
Here’s a template for the mapping pipeline:
// --- Preamble ---------------------------------
// Keep the source event in a source-specific working namespace.this = { zeek: this }
// (2) Populate the OCSF event. Style-wise, we recommend using one coherent// block of TQL per OCSF attribute group to provide a bit of structure for the// reader.
// --- OCSF: classification attributes ----------
@name = "ocsf.network_activity"ocsf.category_uid = 4ocsf.class_uid = 4001ocsf.activity_id = 6ocsf.severity_id = 1// ...fill out remaining classification attributes.
// --- OCSF: occurrence attributes --------------
ocsf.time = move zeek.ts // 👈 remove source field while mappingif zeek.duration != null { ocsf.end_time = ocsf.time + zeek.duration ocsf.duration = count_milliseconds(move zeek.duration).round()}ocsf.start_time = ocsf.time// ...fill out remaining occurrence attributes; add timezone_offset if known.
// --- OCSF: context attributes -----------------
ocsf.metadata = { product: { name: "Zeek", vendor_name: "Zeek", }, profiles: [], // 👈 add profiles as needed, such as ["host", "network_proxy"] original_event_uid: move zeek.uid, version: "1.8.0",}// ...fill out remaining context attributes.
// --- OCSF: primary attributes -----------------
ocsf.src_endpoint = { ip: move zeek.id.orig_h, port: move zeek.id.orig_p,}ocsf.dst_endpoint = { ip: move zeek.id.resp_h, port: move zeek.id.resp_p,}ocsf.is_src_dst_assignment_known = true// ...fill out remaining primary attributes.drop zeek.id // 👈 remove source field after mapping
// --- OCSF: profile-specific attributes --------
// Add profile-specific fields here based on metadata.profiles.// For the host profile: ocsf.actor.user, ocsf.device, etc.// ...
// --- Epilogue ---------------------------------
// (3) Return the mapped OCSF event and preserve mapping residue.this = {...ocsf, unmapped: zeek}
// (4) Expand minimal OCSF and validate the final schema shape.ocsf::deriveocsf::castLet’s unpack this:
- With
this = { zeek: this }, we keep the original event in a source-specific working namespace. This approach avoids name clashes when we create new OCSF fields in the next steps. - The main work takes place here. Our approach is structured: for every field
in the source event, (1) map it, and (2) remove it. Ideally, use the
movekeyword to perform (1) and (2) together, for example,ocsf.x = move zeek.y. If a field needs to be used multiple times in the same expression, use thedropafterwards. - Assign
@nameto the target OCSF event class so that downstream pipelines can filter by schema. - The assignment
this = {...ocsf, unmapped: zeek}returns the mapped OCSF event and stores the remaining source fields asunmappedfor review. - The mapper intentionally produces minimal OCSF: it maps identifiers and
source-derived semantics, but does not hand-write derived sibling fields.
ocsf::deriveexpands the event with fields such asactivity_name,category_name, andseveritybeforeocsf::castvalidates the final event against the OCSF schema.
Now that we have a template, let’s get our hands dirty and go deep into the actual mapping.
Classification Attributes
Section titled “Classification Attributes”The classification attributes are important for the schema. Mapping them is mechanical and mostly involves reviewing the schema docs.
ocsf.activity_id = 6ocsf.activity_name = "Traffic"ocsf.category_uid = 4ocsf.category_name = "Network Activity"ocsf.class_uid = 4001ocsf.class_name = "Network Activity"ocsf.severity_id = 1ocsf.severity = "Informational"ocsf.type_uid = ocsf.class_uid * 100 + ocsf.activity_idNote that computing the field type_uid requires simple arithmetic. In package
mappers, prefer minimal OCSF: set the IDs and source-derived values, but let
ocsf::derive populate sibling fields. This keeps mappings concise and
prevents repetitive label assignments. The shorter form is:
ocsf.activity_id = 6ocsf.category_uid = 4ocsf.class_uid = 4001ocsf.severity_id = 1ocsf.type_uid = ocsf.class_uid * 100 + ocsf.activity_idocsf::deriveThe computed sibling fields for <field>_id often have the pattern
<field>_name or simply <field>. Run ocsf::derive after the mapper
returns the minimal OCSF event so downstream pipelines still receive the
comprehensive schema shape.
Occurrence Attributes
Section titled “Occurrence Attributes”Let’s tackle the occurrence group. These attributes are all about time.
ocsf.time = move zeek.tsif zeek.duration != null { ocsf.end_time = ocsf.time + zeek.duration ocsf.duration = count_milliseconds(move zeek.duration).round()}ocsf.start_time = ocsf.timeWhen Zeek records a connection duration, using + with a value of type time
and duration yields a new time value, just as you’d expect. The OCSF
duration attribute itself is an integer millisecond count, so we assign that
field only after deriving end_time.
Context Attributes
Section titled “Context Attributes”The context attributes provide auxiliary information. Most notably, the
metadata attribute holds data-source specific information. Even though
unmapped belongs to this group, we deal with it at the very end.
ocsf.metadata = { log_name: "conn.log", logged_time: move zeek._write_ts?, product: { name: "Zeek", vendor_name: "Zeek", cpe_name: "cpe:2.3:a:zeek:zeek", }, original_event_uid: move zeek.uid, version: "1.8.0",}drop zeek._path? // implied in metadata.log_nameocsf.app_protocol_name = move zeek.serviceWe use ? when accessing fields that are not always present, such as
zeek._write_ts?. If you omit the ?, the pipeline emits a warning
when _write_ts is missing.
Zeek’s service field identifies the application-layer protocol, so map it to
app_protocol_name. Reserve app_name for a specific network application, such
as an App-ID result.
Primary Attributes
Section titled “Primary Attributes”The primary attributes define the semantics of the event class itself. This is where the core value of the data is, as we are mapping the most event-specific information.
ocsf.src_endpoint = { ip: move zeek.id.orig_h, port: move zeek.id.orig_p,}ocsf.dst_endpoint = { ip: move zeek.id.resp_h, port: move zeek.id.resp_p,}ocsf.is_src_dst_assignment_known = trueocsf.src_endpoint.mac = move zeek.orig_l2_addr?ocsf.dst_endpoint.mac = move zeek.resp_l2_addr?if zeek.geo.orig? != null { ocsf.src_endpoint.location = { country: move zeek.geo.orig.country_code?, region: move zeek.geo.orig.region?, city: move zeek.geo.orig.city?, lat: move zeek.geo.orig.latitude?, long: move zeek.geo.orig.longitude?, }}if zeek.geo.resp? != null { ocsf.dst_endpoint.location = { country: move zeek.geo.resp.country_code?, region: move zeek.geo.resp.region?, city: move zeek.geo.resp.city?, lat: move zeek.geo.resp.latitude?, long: move zeek.geo.resp.longitude?, }}drop zeek.geo?// Here, we use `drop` because we simply want to get rid of the intermediate// `id` record that we already mapped above.drop zeek.id// Locality of reference: we define the protocol numbers close where they are// used for easier readability.let $proto_nums = { tcp: 6, udp: 17, icmp: 1, icmpv6: 58, ipv6: 41,}ocsf.connection_info = { community_uid: move zeek.community_id?, protocol_name: move zeek.proto,}ocsf.connection_info.protocol_num = $proto_nums[ocsf.connection_info.protocol_name]? else -1// Use `if` for binary predicates and `match` for finite case dispatch.if ocsf.src_endpoint.ip.is_v6() or ocsf.dst_endpoint.ip.is_v6() { ocsf.connection_info.protocol_ver_id = 6} else { ocsf.connection_info.protocol_ver_id = 4}match ({orig: zeek.local_orig, resp: zeek.local_resp}) { {orig: true, resp: true} => { ocsf.connection_info.direction = "Lateral" ocsf.connection_info.direction_id = 3 } {orig: true, resp: false} => { ocsf.connection_info.direction = "Outbound" ocsf.connection_info.direction_id = 2 } {orig: false, resp: true} => { ocsf.connection_info.direction = "Inbound" ocsf.connection_info.direction_id = 1 } _ => { ocsf.connection_info.direction = "Unknown" ocsf.connection_info.direction_id = 0 }}drop zeek.local_orig, zeek.local_resp// The `status` attribute in OCSF is a success indicator. While we could use// `zeek.conn_state` to extract success/failure, this would go beyond// the tutorial.ocsf.status_id = 99ocsf.status = "Other"ocsf.status_code = move zeek.conn_stateocsf.traffic = { bytes_in: move zeek.resp_bytes, bytes_out: move zeek.orig_bytes, bytes_missed: move zeek.missed_bytes, packets_in: move zeek.resp_pkts, packets_out: move zeek.orig_pkts,}if ocsf.traffic.bytes_out? != null and ocsf.traffic.bytes_in? != null { ocsf.traffic.bytes = ocsf.traffic.bytes_out + ocsf.traffic.bytes_in}ocsf.traffic.packets = ocsf.traffic.packets_out + ocsf.traffic.packets_inif zeek.tunnel_parents? == null { drop zeek.tunnel_parents?}if zeek.vlan? == null { drop zeek.vlan?}if zeek.inner_vlan? == null { drop zeek.inner_vlan?}Here’s what happens here:
- The expression
$proto_nums[ocsf.connection_info.protocol_name]takes the moved Zeekprotovalue and uses it as an index into a static record$proto_nums. Add a?at the end to avoid warnings when the lookup returnsnull, and use the inlineelseexpression for the fallback value. - Set
is_src_dst_assignment_knownbecause Zeek’sid.orig_*fields identify the connection initiator andid.resp_*fields identify the responder. - Move Layer 2 addresses and GeoIP enrichment into endpoint attributes instead
of leaving them in
unmapped. - Use
is_v6on the connection IPs to identify IPv6 connections.
Putting it together
Section titled “Putting it together”When we combine all TQL snippets from above, we get the following output:
{ activity_id: 6, activity_name: "Traffic", app_protocol_name: "http", category_name: "Network Activity", category_uid: 4, class_name: "Network Activity", class_uid: 4001, connection_info: { community_uid: "1:YXWfTYEyYLKVv5Ge4WqijUnKTrM=", direction: "Unknown", direction_id: 0, flag_history: "ShADadfF", protocol_name: "tcp", protocol_num: 6, protocol_ver: "Internet Protocol version 4 (IPv4)", protocol_ver_id: 4, }, dst_endpoint: { ip: 198.71.247.91, location: { city: "Ashburn", country: "US", lat: 39.0469, long: -77.4903, region: "VA", }, mac: "00:16:3c:f1:fd:6d", port: 80, }, duration: 5163, end_time: 2021-11-17T13:32:48.400686856Z, is_src_dst_assignment_known: true, metadata: { log_name: "conn.log", logged_time: null, original_event_uid: "CZwqhx3td8eTfCSwJb", product: { cpe_name: "cpe:2.3:a:zeek:zeek", name: "Zeek", vendor_name: "Zeek", }, version: "1.8.0", }, severity: "Informational", severity_id: 1, src_endpoint: { ip: 128.14.134.170, location: { city: "Los Angeles", country: "US", lat: 34.0544, long: -118.2441, region: "CA", }, mac: "64:9e:f3:be:db:66", port: 57468, }, start_time: 2021-11-17T13:32:43.237881856Z, status: "Other", status_code: "SF", status_id: 99, time: 2021-11-17T13:32:43.237881856Z, traffic: { bytes: 483, bytes_in: 278, bytes_missed: 0, bytes_out: 205, packets: 11, packets_in: 5, packets_out: 6, }, type_name: "Network Activity: Traffic", type_uid: 400106, unmapped: { orig_ip_bytes: 525, resp_ip_bytes: 546, },}{ activity_id: 6, activity_name: "Traffic", app_protocol_name: "dns", category_name: "Network Activity", category_uid: 4, class_name: "Network Activity", class_uid: 4001, connection_info: { community_uid: "1:0nZC/6S/pr+IceCZ04RjDZbX+KI=", direction: "Unknown", direction_id: 0, flag_history: "D", protocol_name: "udp", protocol_num: 17, protocol_ver: "Internet Protocol version 4 (IPv4)", protocol_ver_id: 4, }, dst_endpoint: { ip: 198.71.247.91, location: { city: "Ashburn", country: "US", lat: 39.0469, long: -77.4903, region: "VA", }, mac: "00:16:3c:f1:fd:6d", port: 53, }, is_src_dst_assignment_known: true, metadata: { log_name: "conn.log", logged_time: null, original_event_uid: "CnrwFesjfOhI3fuu1", product: { cpe_name: "cpe:2.3:a:zeek:zeek", name: "Zeek", vendor_name: "Zeek", }, version: "1.8.0", }, severity: "Informational", severity_id: 1, src_endpoint: { ip: 45.137.23.27, location: { city: null, country: "BD", lat: 23.7018, long: 90.3742, region: null, }, mac: "64:9e:f3:be:db:66", port: 47958, }, start_time: 2021-11-17T14:02:38.165570048Z, status: "Other", status_code: "S0", status_id: 99, time: 2021-11-17T14:02:38.165570048Z, traffic: { bytes_in: null, bytes_missed: 0, bytes_out: null, packets: 1, packets_in: 0, packets_out: 1, }, type_name: "Network Activity: Traffic", type_uid: 400106, unmapped: { orig_ip_bytes: 58, resp_ip_bytes: 0, },}There are still several fields that we can map to the schema, but we’ll leave this as an exercise for the reader.
Recap: Understand the OCSF pipeline architecture
Section titled “Recap: Understand the OCSF pipeline architecture”Most pipelines (1) onboard data from a source, (2) transform it, and (3) send it somewhere. This tutorial focuses on the middle piece, with transformation being the mapping to OCSF.
We’ve addressed data onboarding by reading a log file and decomposing the
unstructured Zeek TSV contents into a structured record. The
built-in read_zeek_tsv operator made this trivial. But it often requires a lot
more elbow grease to get there. Check out our extensive guide on parsing string
fields for more details. We haven’t yet
addressed the other end of the pipeline: data offboarding. Our examples run
tenzir on the command line, relying on an implicit output operator that writes
the result of the last transformation to the terminal.
In other words, we have a sandwich structure in our pipeline. To make it explicit:
// (1) Onboard data (explicit)from_stdinread_zeek_tsv
// (2) Map to OCSF// ...// Lots of TQL here!// ...
// (3) Offboard data (implicit)to_stdoutSuch a pipeline is impractical because data may arrive via multiple channels: log files, Kafka messages, or via Syslog over the wire. Even the encoding may vary. Zeek TSV is one way you can configure Zeek, but JSON output is another format. Similarly, users may want to consume the data in various ways.
In theory, we’re done now. We have a working mapping. It’s just not yet very (re)usable. To turn it into a package capability, we split the pipeline into independently usable snippets. The next section describes how to achieve this.
Step 3: Package the mapping
Section titled “Step 3: Package the mapping”To make our OCSF mapping more reusable, we extract it as a user-defined operator and put it into a package. There is an entire tutorial on writing packages, but all you need to know right now is that packages are one-click installable bundles of operators, examples, tests, and deployable pipelines. After installation, you can call the newly introduced mapping operators from any pipeline.
Break down complexity with user-defined operators
Section titled “Break down complexity with user-defined operators”Let’s work towards a package that comes with a user-defined operator called
zeek::ocsf::map that maps Zeek connection logs to OCSF:
from_stdinread_zeek_tsvzeek::ocsf::map // 👈 Returns OCSF events with a package UDO.ocsf::deriveocsf::castto_stdoutAll you have to do to get there is create a package with following directory structure:
Directoryzeek/
Directoryexamples/
- …
Directoryoperators/ 👈 user-defined operators go here
- …
Directorypipelines/
- …
Directorytests/
- …
- package.yaml
Notice how zeek::ocsf::map has two modules that are colon-separated: zeek
and ocsf. The directory structure in the package determines the module
hierarchy:
Directoryzeek/
Directoryoperators/
Directoryocsf/
- map.tql 👈 Exposes
zeek::ocsf::mapas operator
- map.tql 👈 Exposes
Since operators fully compose, you can implement zeek::ocsf::map as one main
mapper that performs source-specific cleanup, sets shared fields in the ocsf
record, and dispatches to event-specific operators for each log type.
// Keep the parsed source event around while event-specific operators move// fields into their OCSF homes.this = { zeek: this }
ocsf.metadata = { product: { name: "Zeek", vendor_name: "Zeek", cpe_name: "cpe:2.3:a:zeek:zeek", }, version: "1.8.0",}
ocsf.severity_id = 1
// Dispatch mappings based on schema name. This assumes that you've parsed the// input logs and added the appropriate @name based on the log type. The// read_zeek_tsv operator does this automatically. You could also dispatch based// on any other stable discriminator, such as _path or event_type.match @name { "zeek.conn" => { // Map "conn.log" events zeek::ocsf::events::conn } _ => { // Base Event: unknown or unsupported Zeek log type zeek::ocsf::base }}
// Return the mapped OCSF event from the operator.this = {...ocsf, unmapped: zeek}In this layout, you’d put the mapping operators in the following directories:
Directoryzeek/
Directoryoperators/
Directoryocsf/
Directoryevents/ 👈 Exposes
zeek::ocsf::events::*operators- conn.tql
- …
- base.tql
- map.tql
The mapper keeps intermediate results under ocsf internally and returns a
minimal OCSF event. Callers then run shared OCSF helpers such as
ocsf::derive and ocsf::cast to expand and validate the final
shape. This mirrors how larger package mappers handle cleanup, shared fields,
fallback Base Event mapping, and event-specific mappings.
Write tests for production-grade reliability
Section titled “Write tests for production-grade reliability”To achieve production-grade quality of your mappings, you must ensure that they do what they promise. In practice, this means shipping tests along the mappings: given a mapping, test whether a provided input produces a valid output.
This is where our test framework comes into play.
Put your test scenarios in tests/ and sample data into tests/inputs:
Directoryzeek/
Directorytests/
Directoryinputs/ 👈 raw log samples
- conn.log
- …
- map_one.tql 👈 test scenario
- map_one.txt 👈 expected output for test scenario
- …
Here’s an example of the test:
from_file f"{env("TENZIR_INPUTS")}/conn.log" { read_zeek_tsv}zeek::ocsf::mapocsf::deriveocsf::castsort timehead 2Now run the test framework in the package directory:
uvx tenzir-testi executing project: zeek (.)i running 1 tests (44 jobs) in project .i 1× tenzir (v5.16.0+gc0a0c3ba49)✘ tests/map_one.tql└─▶ Failed to find ref file: "../docs/public/packages/zeek/tests/map_one.txt"i ran 1 test: 0 passed (0%) / 1 failed (100%) passed (100%) / 0 failed (0%)There is no baseline for the test yet, let’s generate it via --update
uvx tenzir-test --updateNow there’s a *.txt file next to the test scenario. Verify that it has the
expected output. Alternatively, use uvx tenzir-test --passthrough to print the
output to the terminal for inline inspection.
Generated *.txt test scenario baseline
{ activity_id: 6, activity_name: "Traffic", app_protocol_name: "http", category_name: "Network Activity", category_uid: 4, class_name: "Network Activity", class_uid: 4001, connection_info: { community_uid: "1:YXWfTYEyYLKVv5Ge4WqijUnKTrM=", direction: "Unknown", direction_id: 0, flag_history: "ShADadfF", protocol_name: "tcp", protocol_num: 6, protocol_ver: "Internet Protocol version 4 (IPv4)", protocol_ver_id: 4, }, dst_endpoint: { ip: 198.71.247.91, location: { city: "Ashburn", country: "US", lat: 39.0469, long: -77.4903, region: "VA", }, mac: "00:16:3c:f1:fd:6d", port: 80, }, duration: 5163, end_time: 2021-11-17T13:32:48.400686856Z, is_src_dst_assignment_known: true, metadata: { log_name: "conn.log", logged_time: null, original_event_uid: "CZwqhx3td8eTfCSwJb", product: { cpe_name: "cpe:2.3:a:zeek:zeek", name: "Zeek", vendor_name: "Zeek", }, version: "1.8.0", }, severity: "Informational", severity_id: 1, src_endpoint: { ip: 128.14.134.170, location: { city: "Los Angeles", country: "US", lat: 34.0544, long: -118.2441, region: "CA", }, mac: "64:9e:f3:be:db:66", port: 57468, }, start_time: 2021-11-17T13:32:43.237881856Z, status: "Other", status_code: "SF", status_id: 99, time: 2021-11-17T13:32:43.237881856Z, traffic: { bytes: 483, bytes_in: 278, bytes_missed: 0, bytes_out: 205, packets: 11, packets_in: 5, packets_out: 6, }, type_name: "Network Activity: Traffic", type_uid: 400106, unmapped: { orig_ip_bytes: 525, resp_ip_bytes: 546, },}{ activity_id: 6, activity_name: "Traffic", app_protocol_name: "dns", category_name: "Network Activity", category_uid: 4, class_name: "Network Activity", class_uid: 4001, connection_info: { community_uid: "1:0nZC/6S/pr+IceCZ04RjDZbX+KI=", direction: "Unknown", direction_id: 0, flag_history: "D", protocol_name: "udp", protocol_num: 17, protocol_ver: "Internet Protocol version 4 (IPv4)", protocol_ver_id: 4, }, dst_endpoint: { ip: 198.71.247.91, location: { city: "Ashburn", country: "US", lat: 39.0469, long: -77.4903, region: "VA", }, mac: "00:16:3c:f1:fd:6d", port: 53, }, is_src_dst_assignment_known: true, metadata: { log_name: "conn.log", logged_time: null, original_event_uid: "CnrwFesjfOhI3fuu1", product: { cpe_name: "cpe:2.3:a:zeek:zeek", name: "Zeek", vendor_name: "Zeek", }, version: "1.8.0", }, severity: "Informational", severity_id: 1, src_endpoint: { ip: 45.137.23.27, location: { city: null, country: "BD", lat: 23.7018, long: 90.3742, region: null, }, mac: "64:9e:f3:be:db:66", port: 47958, }, start_time: 2021-11-17T14:02:38.165570048Z, status: "Other", status_code: "S0", status_id: 99, time: 2021-11-17T14:02:38.165570048Z, traffic: { bytes_in: null, bytes_missed: 0, bytes_out: null, packets: 1, packets_in: 0, packets_out: 1, }, type_name: "Network Activity: Traffic", type_uid: 400106, unmapped: { orig_ip_bytes: 58, resp_ip_bytes: 0, },}After running uvx tenzir-test again without any flags, you get the following
output:
i executing project: zeek (.)i running 1 tests (44 jobs) in project .; updatei 1× tenzir (v5.16.0+gc0a0c3ba49)✔ tests/map_one.tqli ran 1 test: 1 passed (100%) / 0 failed (0%)Perfect. Now proceed with all log types and you have a production-grade package.
Step 4: Install and use the package
Section titled “Step 4: Install and use the package”After you have fleshed out the complete package, install
it, either interactively
via package::add, or IaC-style by putting it into a git repo and pointing
the config option tenzir.package-dirs to it.
Summary
Section titled “Summary”This tutorial showed you how to map security data to OCSF using TQL pipelines. You learned:
- How OCSF events look like in high-level terms
- How to structure your mapping pipeline using OCSF attribute groups (classification, occurrence, context, and primary)
- How to use TQL operators and expressions to transform raw events into OCSF-compliant records
- How to package your mappings as reusable operators in a Tenzir package
The key to successful OCSF mapping is systematic organization: keep source
residue in zeek, map attributes group by group while removing source fields
as you go, return the residue as unmapped, and produce minimal OCSF that
ocsf::derive expands before ocsf::cast validates it. This
approach keeps mappings maintainable while ensuring downstream pipelines receive
the comprehensive schema shape.
For more examples and ready-to-use OCSF mappings, check out the Tenzir Community Library.