Skip to main content
Version: v4.29


Loads a byte stream from a Apache Kafka topic.

load_kafka topic:string, [count=int, exit=bool, offset=int|string,
          options=record, aws_iam=record]


The load_kafka operator reads bytes from a Kafka topic.

The implementation uses the official librdkafka from Confluent and supports all configuration options. You can specify them via options parameter as {key: value, ...}.

We recommend putting your Kafka options into the dedicated kafka.yaml plugin config file. This way you can configure your all your environment-specific options once, independent of the per-connector invocations.

The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:

  • bootstrap.servers: localhost
  • tenzir
  • tenzir

topic: string

The Kafka topic to use.

count = int (optional)

Exit successfully after having consumed count messages.

exit = bool (optional)

Exit successfully after having received the last message.

Without this option, the operator waits for new messages after having consumed the last one.

offset = int|string (optional)

The offset to start consuming from. Possible values are:

  • "beginning": first offset
  • "end": last offset
  • "stored": stored offset
  • <value>: absolute offset
  • -<value>: relative offset from end

options = record (optional)

A record of key-value configuration options for librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.

The load_kafka operator passes the key-value pairs directly to librdkafka. Consult the list of available configuration options to configure Kafka according to your needs.

We recommand factoring these options into the plugin-specific kafka.yaml so that they are indpendent of the load_kafka arguments.

aws_iam = record (optional)

If specified, enables using AWS IAM Authentication for MSK. The keys must be non-empty when specified.

Available keys:

  • region: Region of the MSK Clusters. Must be specified when using IAM.
  • assume_role: Optional role ARN to assume.
  • session_name: Optional session name to use when assume a role.
  • external_id: Optional external id to use when assuming a role.

The operator will try to get credentials in the following order:

  1. Checks your environment variables for AWS Credentials.
  2. Checks your $HOME/.aws/credentials file for a profile and credentials
  3. Contacts and logs in to a trusted identity provider. The login information to these providers can either be on the environment variables: AWS_ROLE_ARN, AWS_WEB_IDENTITY_TOKEN_FILE, AWS_ROLE_SESSION_NAME or on a profile in your $HOME/.aws/credentials.
  4. Checks for an external method set as part of a profile on $HOME/.aws/config to generate or look up credentials that isn't directly supported by AWS.
  5. Contacts the ECS Task Role to request credentials if Environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI has been set.
  6. Contacts the EC2 Instance Metadata service to request credentials if AWS_EC2_METADATA_DISABLED is NOT set to ON.


Read 100 JSON messages from the topic tenzir

load_kafka "tenzir", count=100

Read Zeek Streaming JSON logs starting at the beginning

load_kafka "zeek", offset="beginning"