This guide shows you how to map events to Elastic Common Schema (ECS) in TQL. You’ll learn how to populate required ECS fields, choose event categorization values, map common network fieldsets, preserve source-specific details, and prepare records for Elasticsearch-compatible destinations.
The TQL examples in this guide use nested records that serialize to ECS field
paths. For example, source.ip in ECS becomes source: {ip: ...} in TQL.
Use the ECS skill
Section titled “Use the ECS skill”Install the tenzir-ecs skill when you want an agent to help with ECS schema
decisions. See Use agent skills
for installation and usage examples.
Ask the agent to choose event.kind, event.category, and event.type values
before it maps fieldsets. ECS treats event.category and event.type as lists,
and values such as allowed or denied belong in event.type or
event.action, not automatically in event.outcome.
Choose ECS categorization
Section titled “Choose ECS categorization”Start by setting the fields that define the document contract and search behavior:
@timestamp: The time when the event originated.ecs.version: The ECS version used for the mapping.event.kind: The high-level kind, such asevent,alert,metric, orpipeline_error.event.category: The broad category list, such asnetwork.event.type: The subtype list, such asconnection,allowed, ordenied.
For a firewall connection event, choose event.kind: event,
event.category: [network], and an event.type list that includes
connection plus the normalized disposition when the source records one.
Write a small mapping
Section titled “Write a small mapping”The following example maps a parsed firewall connection event to ECS. It keeps
the source data under fw, sets ECS categorization fields, maps network
participants, normalizes the transport protocol, and preserves unmapped residue
under a custom namespace.
from { ts: 2024-01-15T10:30:45Z, action: "allowed", src_ip: "10.0.0.5", src_port: 51544, dst_ip: "203.0.113.10", dst_port: 443, proto: "tcp", bytes_out: 1280, bytes_in: 8192, device: "edge-fw-01",}
this = {fw: this}
let $event_types = { allow: ["connection", "allowed"], allowed: ["connection", "allowed"], block: ["connection", "denied"], blocked: ["connection", "denied"], deny: ["connection", "denied"], denied: ["connection", "denied"], drop: ["connection", "denied"], dropped: ["connection", "denied"],}
ecs["@timestamp"] = move fw.tsecs.ecs.version = "9.4.0"ecs.event.kind = "event"ecs.event.category = ["network"]ecs.event.type = $event_types[fw.action.to_lower()]? else ["connection"]ecs.event.action = (move fw.action).to_lower()ecs.source.ip = move fw.src_ipecs.source.port = move fw.src_portecs.source.bytes = move fw.bytes_outecs.destination.ip = move fw.dst_ipecs.destination.port = move fw.dst_portecs.destination.bytes = move fw.bytes_inecs.network.transport = (move fw.proto).to_lower()ecs.network.bytes = ecs.source.bytes + ecs.destination.bytesecs.observer.hostname = move fw.deviceecs.observer.vendor = "Example Networks"ecs.observer.product = "Example Firewall"
this = {...ecs, example: {unmapped: fw}}{ "@timestamp": 2024-01-15T10:30:45Z, ecs: { version: "9.4.0", }, event: { kind: "event", category: [ "network", ], type: [ "connection", "allowed", ], action: "allowed", }, source: { ip: "10.0.0.5", port: 51544, bytes: 1280, }, destination: { ip: "203.0.113.10", port: 443, bytes: 8192, }, network: { transport: "tcp", bytes: 9472, }, observer: { hostname: "edge-fw-01", vendor: "Example Networks", product: "Example Firewall", }, example: { unmapped: {}, },}Apply the mapping pattern
Section titled “Apply the mapping pattern”Use the same structure for larger mappings:
- Keep a source namespace: Move the parsed event under a short namespace
such as
fw,dns,edr, oreventbefore you create ECS fields. - Populate required fields first: Set
@timestampandecs.versionbefore you map event-specific fieldsets. - Choose categorization before fields: Let
event.kind,event.category, andevent.typedrive which fieldsets you populate. - Use role-specific fieldsets: Use
sourceanddestinationfor packet or flow direction, and useclientandserverwhen the protocol role matters. - Normalize categorical values explicitly: Map source strings to ECS
categorization values such as
network,connection,allowed, ordenied. - Preserve custom data deliberately: Put source-specific details that don’t have an ECS target into a custom namespace, not into a field that ECS already defines for another purpose.
Send ECS to Elastic-compatible destinations
Section titled “Send ECS to Elastic-compatible destinations”After you map events to ECS, send them to Elasticsearch or OpenSearch with the Bulk API operators:
my_source::ecs::mapto_elasticsearch "https://elasticsearch.example.com:9200", action="index", index="ecs-events"Use to_opensearch instead when the destination is OpenSearch.