Skip to content

This guide shows you how to receive events from message brokers using TQL. You’ll learn to subscribe to topics and queues from Apache Kafka (including Amazon MSK), NATS JetStream, AMQP-based brokers (like RabbitMQ), Amazon SQS, and Google Cloud Pub/Sub.

Apache Kafka is a distributed message broker commonly used for high-throughput event streaming. Use from_kafka to subscribe to topics.

from_kafka "security-events"

By default, from_kafka produces events with the raw message in a message field. Parse the message content to extract structured data:

from_kafka "security-events"
this = message.parse_json()

The offset option determines where to start reading:

ValueDescription
"beginning"Start from the oldest available message
"end"Start from the newest messages only
"stored"Resume from the last committed offset
1000Start from a specific offset
-100Start 100 messages before the end
from_kafka "events", offset="beginning"

Kafka uses consumer groups to distribute messages across multiple consumers. Specify a group ID for coordinated consumption:

from_kafka "events", group_id="tenzir-ingest"

Amazon MSK is a managed Kafka service. Use the aws_iam option for IAM authentication:

from_kafka "security-logs",
options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"},
aws_iam={region: "us-east-1"}

NATS is a messaging system for services, edge deployments, and cloud-native applications. Use from_nats to consume messages from JetStream subjects.

from_nats "alerts", durable="tenzir-alerts"
this = string(message).parse_json()

The NATS server must have a JetStream stream that captures the subject you consume from.

Use metadata_field to copy NATS metadata into events:

from_nats "alerts", metadata_field=nats
parsed = string(message).parse_json()
nats_subject = nats.subject
nats_stream = nats.stream
nats_sequence = nats.stream_sequence

AMQP is supported by brokers such as RabbitMQ. Use from_amqp directly with AMQP URLs.

from_amqp "amqp://user:pass@broker:5672/vhost", queue="events"
this = string(message).parse_json()

The URL structure is amqp://user:password@host:port/vhost. Configure additional options like exchange, routing_key, and queue in the operator parameters. The operator emits each AMQP payload in the message field.

Amazon SQS is a managed message queue. Use from_sqs to receive events from SQS queues.

from_sqs "sqs://my-queue"
this = message.parse_json()

Use long polling to reduce empty responses, and set batch_size to control the maximum number of messages per receive request:

from_sqs "sqs://my-queue", poll_time=5s, batch_size=10

By default, Tenzir deletes each SQS message after emitting it. Set keep_messages=true to leave messages in the queue so SQS makes them visible again after the queue’s visibility timeout:

from_sqs "sqs://my-queue", keep_messages=true, visibility_timeout=30s

Google Cloud Pub/Sub provides managed messaging for Google Cloud. Use from_google_cloud_pubsub to subscribe.

from_google_cloud_pubsub project_id="my-project",
subscription_id="my-subscription"
parsed = message.parse_json()

The operator produces events with a message field containing the raw message content. Parse it to extract structured data.

Last updated: