This guide shows you how to receive events from message brokers using TQL. You’ll learn to subscribe to topics and queues from Apache Kafka (including Amazon MSK), Amazon Kinesis Data Streams, NATS JetStream, AMQP-based brokers (like RabbitMQ), Amazon SQS, and Google Cloud Pub/Sub.
Apache Kafka
Section titled “Apache Kafka”Apache Kafka is a distributed message broker commonly
used for high-throughput event streaming. Use from_kafka to subscribe to
topics.
Subscribe to a topic
Section titled “Subscribe to a topic”from_kafka "security-events"By default, from_kafka produces events with the raw message in a message
field. Parse the message content to extract structured data:
from_kafka "security-events"this = message.parse_json()Control the read offset
Section titled “Control the read offset”The offset option determines where to start reading:
| Value | Description |
|---|---|
"beginning" | Start from the oldest available message |
"end" | Start from the newest messages only |
"stored" | Resume from the last committed offset |
1000 | Start from a specific offset |
-100 | Start 100 messages before the end |
from_kafka "events", offset="beginning"Configure consumer groups
Section titled “Configure consumer groups”Kafka uses consumer groups to distribute messages across multiple consumers. Specify a group ID for coordinated consumption:
from_kafka "events", group_id="tenzir-ingest"Connect to Amazon MSK
Section titled “Connect to Amazon MSK”Amazon MSK is a managed Kafka service. Use the
aws_iam option for IAM authentication:
from_kafka "security-logs", options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"}, aws_iam={region: "us-east-1"}Amazon Kinesis
Section titled “Amazon Kinesis”Amazon Kinesis Data Streams is a managed
streaming service on AWS. Use from_amazon_kinesis to receive records from a
stream.
Receive from a stream
Section titled “Receive from a stream”from_amazon_kinesis "security-events", start="trim_horizon"this = string(message).parse_json()The operator emits one event per Kinesis record. The raw payload is stored in
the message field as a blob, together with stream, shard, sequence number,
partition key, and lag metadata.
NATS JetStream
Section titled “NATS JetStream”NATS is a messaging system for services, edge deployments,
and cloud-native applications. Use from_nats to consume messages from
JetStream subjects.
Consume from a subject
Section titled “Consume from a subject”from_nats "alerts", durable="tenzir-alerts"this = string(message).parse_json()The NATS server must have a JetStream stream that captures the subject you consume from.
Preserve message metadata
Section titled “Preserve message metadata”Use metadata_field to copy NATS metadata into events:
from_nats "alerts", metadata_field=natsparsed = string(message).parse_json()nats_subject = nats.subjectnats_stream = nats.streamnats_sequence = nats.stream_sequenceAMQP (RabbitMQ)
Section titled “AMQP (RabbitMQ)”AMQP is supported by brokers such as RabbitMQ. Use
from_amqp directly with AMQP URLs.
Receive from a queue
Section titled “Receive from a queue”from_amqp "amqp://user:pass@broker:5672/vhost", queue="events"this = string(message).parse_json()The URL structure is amqp://user:password@host:port/vhost. Configure
additional options like exchange, routing_key, and queue in the operator
parameters. The operator emits each AMQP payload in the message field.
Amazon SQS
Section titled “Amazon SQS”Amazon SQS is a managed message queue. Use
from_amazon_sqs to receive events from SQS queues.
Receive from a queue
Section titled “Receive from a queue”from_amazon_sqs "sqs://my-queue"this = message.parse_json()Configure polling
Section titled “Configure polling”Use long polling to reduce empty responses, and set batch_size to control the
maximum number of messages per receive request:
from_amazon_sqs "sqs://my-queue", poll_time=5s, batch_size=10By default, Tenzir deletes each SQS message after emitting it. Set
keep_messages=true to leave messages in the queue so SQS makes them visible
again after the queue’s visibility timeout:
from_amazon_sqs "sqs://my-queue", keep_messages=true, visibility_timeout=30sGoogle Cloud Pub/Sub
Section titled “Google Cloud Pub/Sub”Google Cloud Pub/Sub provides managed
messaging for Google Cloud. Use from_google_cloud_pubsub to subscribe.
Receive from a subscription
Section titled “Receive from a subscription”from_google_cloud_pubsub project_id="my-project", subscription_id="my-subscription"parsed = message.parse_json()The operator produces events with a message field containing the raw message
content. Parse it to extract structured data.