Skip to content

This guide shows you how to write OCSF mapping operators in TQL. You’ll learn to organize mappings by attribute groups, handle unmapped fields, and validate your output. The guide assumes you’ve already identified your target OCSF event class and profiles.

Install the tenzir-ocsf skill when you want an agent to help with OCSF schema decisions. See Use agent skills for installation and usage examples.

Ask the agent to choose the OCSF version, event class, and profiles before it maps attributes. OCSF mappings should produce the required classification and occurrence attributes first, then add primary, context, and profile-specific attributes.

This section uses Palo Alto NGFW traffic logs as a running example. The namespace follows the vendor::product:: convention, resulting in paloalto::ngfw::ocsf::map.

Use this annotated template as a starting point for creating a user-defined operator as part of a TQL package.

We recommend organizing fields by OCSF attribute groups: classification, occurrence, context, primary.

// --- Preamble ---------------------------------
// Keep source data in a source-specific working namespace.
this = { panos: this }
// --- OCSF: classification attributes ----------
@name = "ocsf.network_activity"
ocsf.category_uid = 4
ocsf.class_uid = 4001
ocsf.activity_id = 6
ocsf.severity_id = 1
ocsf.type_uid = ocsf.class_uid * 100 + ocsf.activity_id
// --- OCSF: occurrence attributes --------------
ocsf.time = move panos.time_generated
ocsf.start_time = move panos.start
if panos.elapsed != null {
ocsf.end_time = ocsf.start_time + panos.elapsed
ocsf.duration = count_milliseconds(move panos.elapsed).round()
}
// --- OCSF: context attributes -----------------
// Metadata about the event source.
ocsf.metadata = {
log_name: "traffic",
product: {
cpe_name: "cpe:/a:paloaltonetworks:pan-os",
name: "NGFW",
vendor_name: "Palo Alto Networks",
},
original_event_uid: move panos.sessionid,
version: "1.8.0",
}
ocsf.app_name = move panos.app
// --- OCSF: primary attributes -----------------
// Primary attributes reflect the core semantic meaning of the event.
ocsf.src_endpoint = {
ip: move panos.src,
port: move panos.sport,
}
ocsf.dst_endpoint = {
ip: move panos.dst,
port: move panos.dport,
}
let $proto_nums = {tcp: 6, udp: 17, icmp: 1}
ocsf.connection_info = {
protocol_name: (move panos.proto).to_lower(),
}
ocsf.connection_info.protocol_num = $proto_nums[ocsf.connection_info.protocol_name]? else -1
// --- OCSF: profile-specific attributes --------
// Add fields for declared profiles (host, network_proxy, etc.)
// ocsf.device = {...}
// ocsf.proxy = {...}
// --- Epilogue ---------------------------------
// Return the mapped OCSF event and preserve mapping residue.
this = {...ocsf, unmapped: panos}
// Derive sibling fields and validate the final shape.
ocsf::derive
ocsf::cast
  • Use a source namespace: Move the input under a short descriptor such as panos, zeek, okta, or the generic event before mapping. This prevents name clashes with OCSF fields and keeps the remaining source fields ready for unmapped.
  • Use move: Transfer fields with move to simultaneously assign and remove from source, for example ocsf.time = move panos.time_generated.
  • Only use drop for multi-use fields: When a field appears in multiple mappings, drop it after the last use. Prefer move and single assignments.
  • Keep unmapped residue: Fields left under the source namespace still need review or an intentional decision to preserve source-specific data.
  • Produce minimal OCSF: Map required identifiers, required attributes, and source-specific semantics. Don’t hand-write derived sibling fields such as activity_name, category_name, or severity; let ocsf::derive expand the minimal event before validation.
  • Validate the result: Run ocsf::derive and ocsf::cast after the mapper returns the OCSF event.

Organize OCSF mappings as a package with a dispatcher and per-event-type operators:

  • Directorypaloalto/
    • Directoryoperators/
      • Directoryngfw/
        • Directoryocsf/
          • map.tql Main dispatcher
          • base.tql Fallback to OCSF base event
          • Directoryevents/
            • network.tql Traffic logs → Network Activity
            • dns.tql DNS logs → DNS Activity
            • threat.tql Threat logs → Security Finding
    • Directorytests/
      • Directoryinputs/
        • traffic.json Input shared across multiple tests
      • Directoryngfw/
        • Directoryocsf/
          • map.tql
          • map.txt
          • base.tql
          • base.txt
          • Directoryevents/
            • network.input Log sample(s)
            • network.tql Mapping for network event type
            • network.txt Mapped OCSF event(s)
            • dns.input
            • dns.tql
            • dns.txt
    • package.yaml

Your package should include one main mapping operator. This operator performs source-specific cleanup and shared OCSF setup, dispatches events based on the event type, and returns the mapped OCSF event:

this = { panos: this }
ocsf.metadata = {
product: {
cpe_name: "cpe:/a:paloaltonetworks:pan-os",
name: "NGFW",
vendor_name: "Palo Alto Networks",
},
version: "1.8.0",
}
ocsf.severity_id = 1
match panos.type {
"TRAFFIC" => {
paloalto::ngfw::ocsf::events::network
}
"DNS" => {
paloalto::ngfw::ocsf::events::dns
}
"THREAT" => {
paloalto::ngfw::ocsf::events::threat
}
_ => {
paloalto::ngfw::ocsf::base
}
}
this = {...ocsf, unmapped: panos}

If the parser package does not set a type field, dispatch on a different field in the log that differentiates the event types.

Create one test file per event type:

from_file env("TENZIR_INPUT") {
read_json
}
paloalto::ngfw::ocsf::map
ocsf::derive
ocsf::cast

This requires that your test file has a sibling .input file that the test framework exposes through TENZIR_INPUT. Use the reader that matches your fixture format.

The ocsf::cast operator is the primary schema validation gate. It ensures that your mapping produces schema-compliant output.

Your mapping is complete once ocsf::cast no longer emits warnings.

Last updated: