Skip to content

This guide shows you how to map events to Elastic Common Schema (ECS) in TQL. You’ll learn how to populate required ECS fields, choose event categorization values, map common network fieldsets, preserve source-specific details, and prepare records for Elasticsearch-compatible destinations.

The TQL examples in this guide use nested records that serialize to ECS field paths. For example, source.ip in ECS becomes source: {ip: ...} in TQL.

Install the tenzir-ecs skill when you want an agent to help with ECS schema decisions. See Use agent skills for installation and usage examples.

Ask the agent to choose event.kind, event.category, and event.type values before it maps fieldsets. ECS treats event.category and event.type as lists, and values such as allowed or denied belong in event.type or event.action, not automatically in event.outcome.

Start by setting the fields that define the document contract and search behavior:

  • @timestamp: The time when the event originated.
  • ecs.version: The ECS version used for the mapping.
  • event.kind: The high-level kind, such as event, alert, metric, or pipeline_error.
  • event.category: The broad category list, such as network.
  • event.type: The subtype list, such as connection, allowed, or denied.

For a firewall connection event, choose event.kind: event, event.category: [network], and an event.type list that includes connection plus the normalized disposition when the source records one.

The following example maps a parsed firewall connection event to ECS. It keeps the source data under fw, sets ECS categorization fields, maps network participants, normalizes the transport protocol, and preserves unmapped residue under a custom namespace.

from {
ts: 2024-01-15T10:30:45Z,
action: "allowed",
src_ip: "10.0.0.5",
src_port: 51544,
dst_ip: "203.0.113.10",
dst_port: 443,
proto: "tcp",
bytes_out: 1280,
bytes_in: 8192,
device: "edge-fw-01",
}
this = {fw: this}
let $event_types = {
allow: ["connection", "allowed"],
allowed: ["connection", "allowed"],
block: ["connection", "denied"],
blocked: ["connection", "denied"],
deny: ["connection", "denied"],
denied: ["connection", "denied"],
drop: ["connection", "denied"],
dropped: ["connection", "denied"],
}
ecs["@timestamp"] = move fw.ts
ecs.ecs.version = "9.4.0"
ecs.event.kind = "event"
ecs.event.category = ["network"]
ecs.event.type = $event_types[fw.action.to_lower()]? else ["connection"]
ecs.event.action = (move fw.action).to_lower()
ecs.source.ip = move fw.src_ip
ecs.source.port = move fw.src_port
ecs.source.bytes = move fw.bytes_out
ecs.destination.ip = move fw.dst_ip
ecs.destination.port = move fw.dst_port
ecs.destination.bytes = move fw.bytes_in
ecs.network.transport = (move fw.proto).to_lower()
ecs.network.bytes = ecs.source.bytes + ecs.destination.bytes
ecs.observer.hostname = move fw.device
ecs.observer.vendor = "Example Networks"
ecs.observer.product = "Example Firewall"
this = {...ecs, example: {unmapped: fw}}
{
"@timestamp": 2024-01-15T10:30:45Z,
ecs: {
version: "9.4.0",
},
event: {
kind: "event",
category: [
"network",
],
type: [
"connection",
"allowed",
],
action: "allowed",
},
source: {
ip: "10.0.0.5",
port: 51544,
bytes: 1280,
},
destination: {
ip: "203.0.113.10",
port: 443,
bytes: 8192,
},
network: {
transport: "tcp",
bytes: 9472,
},
observer: {
hostname: "edge-fw-01",
vendor: "Example Networks",
product: "Example Firewall",
},
example: {
unmapped: {},
},
}

Use the same structure for larger mappings:

  • Keep a source namespace: Move the parsed event under a short namespace such as fw, dns, edr, or event before you create ECS fields.
  • Populate required fields first: Set @timestamp and ecs.version before you map event-specific fieldsets.
  • Choose categorization before fields: Let event.kind, event.category, and event.type drive which fieldsets you populate.
  • Use role-specific fieldsets: Use source and destination for packet or flow direction, and use client and server when the protocol role matters.
  • Normalize categorical values explicitly: Map source strings to ECS categorization values such as network, connection, allowed, or denied.
  • Preserve custom data deliberately: Put source-specific details that don’t have an ECS target into a custom namespace, not into a field that ECS already defines for another purpose.

Send ECS to Elastic-compatible destinations

Section titled “Send ECS to Elastic-compatible destinations”

After you map events to ECS, send them to Elasticsearch or OpenSearch with the Bulk API operators:

my_source::ecs::map
to_elasticsearch "https://elasticsearch.example.com:9200",
action="index",
index="ecs-events"

Use to_opensearch instead when the destination is OpenSearch.

Last updated: