Skip to content

This guide shows you how to receive events from message brokers using TQL. You’ll learn to subscribe to topics and queues from Apache Kafka (including Amazon MSK), AMQP-based brokers (like RabbitMQ), Amazon SQS, and Google Cloud Pub/Sub.

Apache Kafka is a distributed message broker commonly used for high-throughput event streaming. Use from_kafka to subscribe to topics.

from_kafka "security-events"

By default, from_kafka produces events with the raw message in a message field. Parse the message content to extract structured data:

from_kafka "security-events"
this = message.parse_json()

The offset option determines where to start reading:

ValueDescription
"beginning"Start from the oldest available message
"end"Start from the newest messages only
"stored"Resume from the last committed offset
1000Start from a specific offset
-100Start 100 messages before the end
from_kafka "events", offset="beginning"

Kafka uses consumer groups to distribute messages across multiple consumers. Specify a group ID for coordinated consumption:

from_kafka "events", group_id="tenzir-ingest"

Amazon MSK is a managed Kafka service. Use the aws_iam option for IAM authentication:

from_kafka "security-logs",
options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"},
aws_iam={region: "us-east-1"}

AMQP is supported by brokers like RabbitMQ. Use AMQP URLs with from or load_amqp directly.

from "amqp://user:pass@broker:5672/vhost"

The URL structure is amqp://user:password@host:port/vhost. Configure additional options like exchange and routing key in the operator parameters.

Amazon SQS is a managed message queue. Use load_sqs or the sqs:// URL scheme.

from "sqs://my-queue" {
read_json
}

Use long polling to reduce API calls and receive messages in batches:

from "sqs://my-queue", poll_interval=5s {
read_json
}

SQS automatically deletes messages after successful receipt.

Google Cloud Pub/Sub provides managed messaging for Google Cloud. Use from_google_cloud_pubsub to subscribe.

from_google_cloud_pubsub project_id="my-project",
subscription_id="my-subscription"
parsed = message.parse_json()

The operator produces events with a message field containing the raw message content. Parse it to extract structured data.

Last updated: