This guide shows you how to receive events from message brokers using TQL. You’ll learn to subscribe to topics and queues from Apache Kafka (including Amazon MSK), AMQP-based brokers (like RabbitMQ), Amazon SQS, and Google Cloud Pub/Sub.
Apache Kafka
Section titled “Apache Kafka”Apache Kafka is a distributed message broker commonly
used for high-throughput event streaming. Use
from_kafka to subscribe to topics.
Subscribe to a topic
Section titled “Subscribe to a topic”from_kafka "security-events"By default, from_kafka produces events with the raw message in a message
field. Parse the message content to extract structured data:
from_kafka "security-events"this = message.parse_json()Control the read offset
Section titled “Control the read offset”The offset option determines where to start reading:
| Value | Description |
|---|---|
"beginning" | Start from the oldest available message |
"end" | Start from the newest messages only |
"stored" | Resume from the last committed offset |
1000 | Start from a specific offset |
-100 | Start 100 messages before the end |
from_kafka "events", offset="beginning"Configure consumer groups
Section titled “Configure consumer groups”Kafka uses consumer groups to distribute messages across multiple consumers. Specify a group ID for coordinated consumption:
from_kafka "events", group_id="tenzir-ingest"Connect to Amazon MSK
Section titled “Connect to Amazon MSK”Amazon MSK is a managed Kafka service. Use the
aws_iam option for IAM authentication:
from_kafka "security-logs", options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"}, aws_iam={region: "us-east-1"}AMQP (RabbitMQ)
Section titled “AMQP (RabbitMQ)”AMQP is supported by brokers like RabbitMQ. Use AMQP URLs with
from or
load_amqp directly.
Receive from a queue
Section titled “Receive from a queue”from "amqp://user:pass@broker:5672/vhost"The URL structure is amqp://user:password@host:port/vhost. Configure
additional options like exchange and routing key in the operator parameters.
Amazon SQS
Section titled “Amazon SQS”Amazon SQS is a managed message queue. Use load_sqs or the sqs://
URL scheme.
Receive from a queue
Section titled “Receive from a queue”from "sqs://my-queue" { read_json}Configure polling
Section titled “Configure polling”Use long polling to reduce API calls and receive messages in batches:
from "sqs://my-queue", poll_interval=5s { read_json}SQS automatically deletes messages after successful receipt.
Google Cloud Pub/Sub
Section titled “Google Cloud Pub/Sub”Google Cloud Pub/Sub provides managed
messaging for Google Cloud. Use
from_google_cloud_pubsub to
subscribe.
Receive from a subscription
Section titled “Receive from a subscription”from_google_cloud_pubsub project_id="my-project", subscription_id="my-subscription"parsed = message.parse_json()The operator produces events with a message field containing the raw message
content. Parse it to extract structured data.