Map Data to OCSF
In this tutorial you'll learn how to map events to Open Cybersecurity Schema Framework (OCSF). We walk you through an example of events from a network monitor and show how you can use Tenzir pipelines to easily transform them so that they become OCSF-compliant events.
The diagram above illustrates the data lifecycle and the OCSF mapping takes place: you collect data from various data sources, each of which has a different shape, and then convert them to a standardized representation. The primary benefit is that normalization decouples data acquisition from downstream analytics, allowing the processes to scale independently.
OCSF Primer
The OCSF is a vendor-agnostic event schema (aka. "taxonomy") that defines structure and semantics for security events. Here are some key terms you need to know to map events:
- Attribute: a unique identifier for a specific type, e.g.,
parent_folder
of typeString
orobservables
of typeObservable Array
. - Event Class: the description of an event defined in terms of attributes,
e.g.,
HTTP Activity
andDetection Finding
. - Category: a group of event classes, e.g.,
System Activity
orFindings
.
The diagram below illustrates how subsets of attributes form an event class:
The Base Event Class is a special event class that's part of every event class. Think of it as a mixin of attributes that get automatically added:
For this tutorial, we look at OCSF from the perspective of the mapper persona, i.e., as someone who converts existing events into the OCSF schema. OCSF also defines three other personas, author, producer, and analyst. These are out of scope. Our mission as mapper is now to study the event semantics of the data source we want to map, and translate the event to the appropriate OCSF event class.
Case Study: Zeek Logs
Let's map some Zeek logs to OCSF!
The Zeek network monitor turns raw network traffic into detailed, structured logs. The logs range across the OSI stack from link layer activity to application-specific messages. In addition, Zeek provides a powerful scripting language to act on network events, making it a versatile tool for writing network-based detections to raise alerts.
Zeek generates logs in tab-separated values (TSV) or JSON format. Here's an example of a connection log in TSV format:
Step 1: Parse the input
We first need to parse the log file into structured form that we can work with the individual fields. Thanks to Tenzir's Zeek support, we can get quickly turn TSV logs into structured data using a single operator:
Run the pipeline as follows:
Output
Step 2: Map to OCSF
Now that we have structured data to work with, our objective is to map the fields from the Zeek conn.log to OCSF. The corresponding event class in OCSF is Network Activity. We will be using OCSF v1.3.0 throughout this tutorial.
To make the mapping process more organized, we map per attribute group:
- Classification: Important for the taxonomy and schema itself
- Occurrence: Temporal characteristics about when the event happened
- Context: Temporal characteristics about when the event happened
- Primary: Defines the key semantics of the given event
Within each attribute group, we go through the attributes in the order of the three requirement flags required, recommended, and optional.
Here's a template for the mapping pipeline:
// (1) Move original event into dedicated field.
this = { event: this }
// (2) Assign some intermediate values for use in the next step, e.g., because
// they're used multiple times.
class_uid = 4001
activity_id = 6
...
// (3) Populate the OCSF event.
this = {
// --- Classification ---
activity_id: activity_id,
class_uid: class_uid,
type_id: class_uid * 100 + activity_id,
...
// --- Occurrence ---
...
// --- Context ---
unmapped: event, // (4) Explicitly assign unmapped.
...
// --- Primary ---
...
}
// (5) Drop all mapped fields, with the effect that the remaining fields remain
// in unmapped.
drop(
unmapped.id,
...
)
// (6) Assign a new schema name to the transformed event.
@name = "ocsf.network_activity"
Let's unpack this:
- With
this = { event: this }
we move the original event into the fieldevent
. This also has the benefit that we avoid name clashes when creating new fields in the next steps. - There are several fields we want to reference in expressions in the subsequent assignment, so we precompute them here.
- The giant
this = { ... }
assignment create the OCSF event, with a field order that matches the official OCSF documentation. - We copy the original event into
unmapped
. - After we mapped all fields, we now explicitly remove them from
unmapped
. This has the effect that everything we didn't touch automatically ends up here. - We give the event a new schema name so that we can easily filter by its shape in further Tenzir pipelines.
Now that we know the general structure, let's get our hands dirty and go deep into the actual mapping.
Classification Attributes
The classification attributes are important for the schema. Mapping them is pretty mechanical and mostly involves going through the docs.
class_uid = 4001
activity_id = 6
activity_name = "Traffic"
this = {
// --- Classification (required) ---
activity_id: activity_id,
category_uid: 4,
class_uid: class_uid,
type_id: class_uid * 100 + activity_id,
severity_id: 1,
// --- Classification (optional) ---
activity_name: activity_name,
category_name: "Network Activity",
class_name: "Network Activity",
severity: "Informational",
}
// TODO: provide a function for this and make it possible to reference
// `type_id` from the same assignment.
//type_name: ocsf_type_name(type_id),
Because we want to compute the type_id
as class_uid * 100 activity_id
, we
assign these variables in dedicated fields beforehand.
Occurrence Attributes
Let's tackle the next group: Occurrence. These attributes are all about time. We
won't repeat the above record fields in the assignment, but the idea is to
incrementally construct a giant statement with the assignment this = { ... }
:
this = {
// --- Classification ---
...
// --- Occurrence (required) ---
time: event.ts,
// --- Occurrence (recommended) ---
// TODO: provide a function for this.
// timezone_offset: ...
// --- Occurrence (optional) ---
duration: event.duration,
end_time: event.ts + event.duration,
start_time: event.ts,
}
Context Attributes
The Context attributes provide enhancing information. Most notably, the
metadata
attribute holds data-source specific information and the unmapped
attribute collects all fields that we cannot map directly to their semantic
counterparts in OCSF.
this = {
// --- Classification, Occurrence ---
...
// --- Context (required) ---
metadata: {
log_name: "conn", // Zeek calls it "path"
logged_time: event._write_ts,
product: {
name: "Zeek",
vendor_name: "Zeek",
},
uid: event.uid,
version: "1.3.0",
},
// --- Context (optional) ---
unmapped: event
}
Note that we're copying the original event into unmapped
so that we can in a
later step remove all mapped fields from it.
Primary Attributes
The primary attributes define the semantics of the event class itself. This is where the core value of the data is, as we are mapping the most event-specific information.
For this, we still need to precompute several attributes that we are going to
use in the this = { ... }
assignment. You can see the use of if
/else
here
to create a constant field based on values in the original event.
if event.local_orig and event.local_resp {
direction = "Lateral"
direction_id = 3
} else if event.local_orig {
direction = "Outbound"
direction_id = 2
} else if event.local_resp {
direction = "Inbound"
direction_id = 1
} else {
direction = "Unknown"
direction_id = 0
}
if event.proto == "tcp" {
protocol_num = 6
} else if event.proto == "udp" {
protocol_num = 17
} else if event.proto == "icmp" {
protocol_num = 1
} else {
protocol_num = -1
}
if event.id.orig_h.is_v6() or event.id.resp_h.is_v6() {
protocol_ver_id = 6
} else {
protocol_ver_id = 4
}
this = {
// --- Classification, Occurrence, Context ---
...
// --- Primary (required) ---
dst_endpoint: {
ip: event.id.resp_h,
port: event.id.resp_p,
// TODO: start a conversation in the OCSF Slack to figure out how to
// assign the entire connection a protocol. We use svc_name as the
// next best thing, but it clearly can't be different between
// endpoints for the service semantics that Zeek has.
svc_name: event.service,
},
// --- Primary (recommended) ---
connection_info: {
uid: event.community_id,
direction: direction,
direction_id: direction_id,
protocol_ver_id: protocol_ver_id,
protocol_name: event.proto,
protocol_num: protocol_num,
},
src_endpoint: {
ip: event.id.orig_h,
port: event.id.orig_p,
svc_name: event.service,
},
// TODO: we actually could go deeper into the `conn_state` field and
// choose a more accurate status. But this would require string
// manipulations and/or regex matching, which TQL doesn't have yet.
status: "Other",
status_code: event.conn_state,
status_id: 99,
traffic: {
bytes_in: event.resp_bytes,
bytes_out: event.orig_bytes,
packets_in: event.resp_pkts,
packets_out: event.orig_pkts,
total_bytes: event.orig_bytes + event.resp_bytes,
total_packets: event.orig_pkts + event.resp_pkts,
},
// --- Primary (optional) ---
// TODO
// - `ja4_fingerprint_list`: once we have some sample logs with JA4
// fingerprints, which requires an additional Zeek package, we should
// populate them here.
// - `tls`: if we buffer ssl log for this connection, we could add the
// information in here.
}
Recap
Phew, that's a wrap. Here's the entire pipeline in a single piece:
Complete pipeline definition
Let's run the pipeline:
You should get the following output:
Step 3: Combine multiple pipelines
So far we've mapped just a single event. But Zeek has dozens of different event types, and we need to write one mapping pipeline for each. But how do we combine the individual pipelines?
Tenzir's answer for this is topic-based publish-subscribe. The
publish
and
subscribe
operators send events to, and read
events from a topic, respectively. Here's an illustration of the conceptual
approach we are going to use:
The first pipeline publishes to the zeek
topic:
read_zeek_tsv
publish "zeek"
Then we have one pipeline per Zeek event type X
and OCSF event class C
:
subscribe "zeek"
where @name == "zeek.X"
// map to OCSF
publish "ocsf.C"
The idea is that all Zeek logs arrive at the topic zeek
, and all mapping
pipelines start there by subscribing to the same topic.
You may think that copying the full feed of the zeek
topic to every mapping
pipeline is inefficient. The good news is that it is not, for two reasons:
- Data transfers between
publish
andsubscribe
use the same zero-copy mechanism that pipelines use internally for sharing of events. - Pipelines of the form
subscribe ... | where <predicate>
push perform predicate pushdown and sendpredicate
upstream so that the filtering can happen as early as possible.
Summary
In this tutorial, we demonstrated how you map logs to OCSF event classes. We used the Zeek network monitor as a case study to illustrate the general mapping pattern. Finally, we explained how to use Tenzir's pub-sub mechanism to scale from on to many pipelines, each of which handle a specific OCSF event class.