Receives events from an Apache Kafka topic.
from_kafka topic:string, [count=int, exit=bool, offset=int|string, options=record, aws_iam=record, aws_region=string, commit_batch_size=int]Description
Section titled “Description”The from_kafka operator consumes messages from a Kafka topic and produces
events containing the message payload as a string field.
The implementation uses the official librdkafka from Confluent and
supports all configuration options. You can specify them
via options parameter as {key: value, ...}.
The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:
bootstrap.servers:localhostclient.id:tenzirgroup.id:tenzirenable.auto.commit:false(This option cannot be changed)
Each consumed message is produced as an event with the following schema:
{ message: string}topic: string
Section titled “topic: string”The Kafka topic to consume from.
count = int (optional)
Section titled “count = int (optional)”Exit successfully after having consumed count messages.
exit = bool (optional)
Section titled “exit = bool (optional)”Exit successfully after having received the last message from all partitions.
Without this option, the operator waits for new messages after consuming the last one.
offset = int|string (optional)
Section titled “offset = int|string (optional)”The offset to start consuming from. Possible values are:
"beginning": first offset"end": last offset"stored": stored offset<value>: absolute offset-<value>: relative offset from end
The default is "stored".
options = record (optional)
Section titled “options = record (optional)”A record of key-value configuration options for
librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.
The from_kafka operator passes the key-value pairs directly to
librdkafka. Consult the list of available configuration
options to configure Kafka according to your needs.
We recommend factoring these options into the plugin-specific kafka.yaml so
that they are independent of the from_kafka arguments.
commit_batch_size = int (optional)
Section titled “commit_batch_size = int (optional)”The operator commits offsets after receiving commit_batch_size messages
to improve throughput. If you need to ensure exactly-once semantics for your
pipeline, set this option to 1 to commit every message individually.
Defaults to 1000.
Amazon MSK
Section titled “Amazon MSK”The operator supports Amazon MSK with IAM authentication.
aws_iam = record (optional)
Section titled “aws_iam = record (optional)”Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the default AWS credential chain.
{ region: string, // AWS region for API requests. access_key_id: string, // AWS access key ID. secret_access_key: string, // AWS secret access key. session_token: string, // session token for temporary credentials. assume_role: string, // ARN of IAM role to assume. session_name: string, // session name for role assumption. external_id: string, // external ID for role assumption.}The access_key_id and secret_access_key must be specified together. If
neither is specified, the operator uses the default AWS credential chain:
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY) - Shared credentials file (
~/.aws/credentials) - IAM role for Amazon EC2 or ECS task role
- Instance metadata service
aws_region = string (optional)
Section titled “aws_region = string (optional)”The AWS region used to construct the MSK authentication URL. Required when connecting to MSK with IAM authentication.
Consume from MSK using AWS IAM authentication
Section titled “Consume from MSK using AWS IAM authentication”from_kafka "security-logs", options={"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098"}, aws_iam={region: "us-east-1"}Examples
Section titled “Examples”Consume JSON messages and parse them
Section titled “Consume JSON messages and parse them”from_kafka "logs"message = message.parse_json()Consume 100 messages starting from the beginning
Section titled “Consume 100 messages starting from the beginning”from_kafka "events", count=100, offset="beginning"Consume messages and exit when caught up
Section titled “Consume messages and exit when caught up”from_kafka "alerts", exit=true