Loads a byte stream from an Apache Kafka topic.
load_kafka topic:string, [count=int, exit=bool, offset=int|string, options=record, aws_iam=record, commit_batch_size=int, commit_timeout=duration]Description
Section titled “Description”The load_kafka operator reads bytes from a Kafka topic.
The implementation uses the official librdkafka from Confluent and
supports all configuration options. You can specify them
via options parameter as {key: value, ...}.
The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:
bootstrap.servers:localhostclient.id:tenzirgroup.id:tenzirenable.auto.commit:false(This option cannot be changed)
topic: string
Section titled “topic: string”The Kafka topic to use.
count = int (optional)
Section titled “count = int (optional)”Exit successfully after having consumed count messages.
exit = bool (optional)
Section titled “exit = bool (optional)”Exit successfully after having received the last message.
Without this option, the operator waits for new messages after consuming the last one.
offset = int|string (optional)
Section titled “offset = int|string (optional)”The offset to start consuming from. Possible values are:
"beginning": first offset"end": last offset"stored": stored offset<value>: absolute offset-<value>: relative offset from end
The default is "stored".
options = record (optional)
Section titled “options = record (optional)”A record of key-value configuration options for
librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.
The load_kafka operator passes the key-value pairs directly to
librdkafka. Consult the list of available configuration
options to configure Kafka according to your needs.
We recommend factoring these options into the plugin-specific kafka.yaml so
that they are independent of the load_kafka arguments.
commit_batch_size = int (optional)
Section titled “commit_batch_size = int (optional)”The operator commits offsets after receiving commit_batch_size messages
to improve throughput. If you need to ensure exactly-once semantics for your
pipeline, set this option to 1 to commit every message individually.
Defaults to 1000.
commit_timeout = duration (optional)
Section titled “commit_timeout = duration (optional)”A timeout after which the operator commits messages, even if it accepted fewer than commit_batch_size. This helps with long-running, low-volume pipelines.
Defaults to 10s.
aws_iam = record (optional)
Section titled “aws_iam = record (optional)”If specified, enables using AWS IAM Authentication for MSK. The keys must be non-empty when specified.
Available keys:
region: Region of the MSK Clusters. Must be specified when using IAM.assume_role: Optional role ARN to assume.session_name: Optional session name to use when assuming a role.external_id: Optional external id to use when assuming a role.
The operator tries to get credentials in the following order:
- Checks your environment variables for AWS Credentials.
- Checks your
$HOME/.aws/credentialsfile for a profile and credentials - Contacts and logs in to a trusted identity provider. The login information to
these providers can either be on the environment variables:
AWS_ROLE_ARN,AWS_WEB_IDENTITY_TOKEN_FILE,AWS_ROLE_SESSION_NAMEor on a profile in your$HOME/.aws/credentials. - Checks for an external method set as part of a profile on
$HOME/.aws/configto generate or look up credentials that isn’t directly supported by AWS. - Contacts the ECS Task Role to request credentials if Environment variable
AWS_CONTAINER_CREDENTIALS_RELATIVE_URIhas been set. - Contacts the EC2 Instance Metadata service to request credentials if
AWS_EC2_METADATA_DISABLEDis NOT set to ON.
Examples
Section titled “Examples”Read 100 JSON messages from the topic tenzir
Section titled “Read 100 JSON messages from the topic tenzir”load_kafka "tenzir", count=100read_jsonRead Zeek Streaming JSON logs starting at the beginning
Section titled “Read Zeek Streaming JSON logs starting at the beginning”load_kafka "zeek", offset="beginning"read_zeek_json