Skip to content

Receives events from an Apache Kafka topic.

from_kafka topic:string, [count=int, exit=bool, offset=int|string, options=record,
aws_iam=record, aws_region=string, batch_size=int]

The from_kafka operator consumes messages from a Kafka topic and produces events containing the message payload as a string field.

The implementation uses the official librdkafka from Confluent and supports all configuration options. You can specify them via options parameter as {key: value, ...}.

The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:

  • bootstrap.servers: localhost
  • client.id: tenzir
  • group.id: tenzir
  • enable.auto.commit: false (This option cannot be changed)

Each consumed message is produced as an event with the following schema:

{
message: string
}

The Kafka topic to consume from.

If the topic starts with ^, from_kafka treats it as a regular expression and subscribes to all matching topics. The regular expression must match the complete topic name, so use patterns like ^tenant-.*\.alerts$.

Exit successfully after having consumed count messages.

Exit successfully after having received the last message from all partitions. This option isn’t supported with regular expression topic subscriptions.

Without this option, the operator waits for new messages after consuming the last one.

The offset to start consuming from. Possible values are:

  • "beginning": first offset
  • "end": last offset
  • "stored": stored offset
  • <value>: absolute offset
  • -<value>: relative offset from end

The default is "stored".

A record of key-value configuration options for librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.

The from_kafka operator passes the key-value pairs directly to librdkafka. Consult the list of available configuration options to configure Kafka according to your needs.

We recommend factoring these options into the plugin-specific kafka.yaml so that they are independent of the from_kafka arguments.

The number of messages to accumulate before emitting a batch. The operator commits offsets after each batch to improve throughput.

Defaults to 10k.

The operator supports Amazon MSK with IAM authentication.

Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the AWS SDK's default credential chain.

{
region: string, // AWS region for API requests.
access_key_id: string, // AWS access key ID.
secret_access_key: string, // AWS secret access key.
session_token: string, // session token for temporary credentials.
assume_role: string, // ARN of IAM role to assume.
session_name: string, // session name for role assumption.
external_id: string, // external ID for role assumption.
web_identity: record, // OIDC web identity token configuration.
}

See AWS Authentication for a description of every field, the default credential chain, web identity configuration, and local authentication with the AWS CLI.

The AWS region used to construct the MSK authentication URL. Required when connecting to MSK with IAM authentication.

Consume from MSK using AWS IAM authentication

Section titled “Consume from MSK using AWS IAM authentication”
from_kafka "security-logs",
options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"},
aws_iam={region: "us-east-1"}
from_kafka "logs"
message = message.parse_json()

Consume 100 messages starting from the beginning

Section titled “Consume 100 messages starting from the beginning”
from_kafka "events", count=100, offset="beginning"
from_kafka "alerts", exit=true

Consume from topics that match a regular expression

Section titled “Consume from topics that match a regular expression”
from_kafka "^tenant-.*\\.alerts$", offset="beginning"

Last updated: