This guide shows you how to receive events from message brokers using TQL. You’ll learn to subscribe to topics and queues from Apache Kafka (including Amazon MSK), NATS JetStream, AMQP-based brokers (like RabbitMQ), Amazon SQS, and Google Cloud Pub/Sub.
Apache Kafka
Section titled “Apache Kafka”Apache Kafka is a distributed message broker commonly
used for high-throughput event streaming. Use from_kafka to subscribe to
topics.
Subscribe to a topic
Section titled “Subscribe to a topic”from_kafka "security-events"By default, from_kafka produces events with the raw message in a message
field. Parse the message content to extract structured data:
from_kafka "security-events"this = message.parse_json()Control the read offset
Section titled “Control the read offset”The offset option determines where to start reading:
| Value | Description |
|---|---|
"beginning" | Start from the oldest available message |
"end" | Start from the newest messages only |
"stored" | Resume from the last committed offset |
1000 | Start from a specific offset |
-100 | Start 100 messages before the end |
from_kafka "events", offset="beginning"Configure consumer groups
Section titled “Configure consumer groups”Kafka uses consumer groups to distribute messages across multiple consumers. Specify a group ID for coordinated consumption:
from_kafka "events", group_id="tenzir-ingest"Connect to Amazon MSK
Section titled “Connect to Amazon MSK”Amazon MSK is a managed Kafka service. Use the
aws_iam option for IAM authentication:
from_kafka "security-logs", options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"}, aws_iam={region: "us-east-1"}NATS JetStream
Section titled “NATS JetStream”NATS is a messaging system for services, edge deployments,
and cloud-native applications. Use from_nats to consume messages from
JetStream subjects.
Consume from a subject
Section titled “Consume from a subject”from_nats "alerts", durable="tenzir-alerts"this = string(message).parse_json()The NATS server must have a JetStream stream that captures the subject you consume from.
Preserve message metadata
Section titled “Preserve message metadata”Use metadata_field to copy NATS metadata into events:
from_nats "alerts", metadata_field=natsparsed = string(message).parse_json()nats_subject = nats.subjectnats_stream = nats.streamnats_sequence = nats.stream_sequenceAMQP (RabbitMQ)
Section titled “AMQP (RabbitMQ)”AMQP is supported by brokers such as RabbitMQ. Use
from_amqp directly with AMQP URLs.
Receive from a queue
Section titled “Receive from a queue”from_amqp "amqp://user:pass@broker:5672/vhost", queue="events"this = string(message).parse_json()The URL structure is amqp://user:password@host:port/vhost. Configure
additional options like exchange, routing_key, and queue in the operator
parameters. The operator emits each AMQP payload in the message field.
Amazon SQS
Section titled “Amazon SQS”Amazon SQS is a managed message queue. Use
from_sqs to receive events from SQS queues.
Receive from a queue
Section titled “Receive from a queue”from_sqs "sqs://my-queue"this = message.parse_json()Configure polling
Section titled “Configure polling”Use long polling to reduce empty responses, and set batch_size to control the
maximum number of messages per receive request:
from_sqs "sqs://my-queue", poll_time=5s, batch_size=10By default, Tenzir deletes each SQS message after emitting it. Set
keep_messages=true to leave messages in the queue so SQS makes them visible
again after the queue’s visibility timeout:
from_sqs "sqs://my-queue", keep_messages=true, visibility_timeout=30sGoogle Cloud Pub/Sub
Section titled “Google Cloud Pub/Sub”Google Cloud Pub/Sub provides managed
messaging for Google Cloud. Use from_google_cloud_pubsub to subscribe.
Receive from a subscription
Section titled “Receive from a subscription”from_google_cloud_pubsub project_id="my-project", subscription_id="my-subscription"parsed = message.parse_json()The operator produces events with a message field containing the raw message
content. Parse it to extract structured data.