Skip to main content
Version: v4.24

load_kafka

Loads a byte stream from a Apache Kafka topic.

load_kafka topic:string, [count=int, exit=bool, offset=int|string, options=record]

Description

The load_kafka operator reads bytes from a Kafka topic.

The implementation uses the official librdkafka from Confluent and supports all configuration options. You can specify them via options parameter as {key: value, ...}.

We recommend putting your Kafka options into the dedicated kafka.yaml plugin config file. This way you can configure your all your environment-specific options once, independent of the per-connector invocations.

The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:

  • bootstrap.servers: localhost
  • client.id: tenzir
  • group.id: tenzir

topic: string

The Kafka topic to use.

count = int (optional)

Exit successfully after having consumed count messages.

exit = bool (optional)

Exit successfully after having received the last message.

Without this option, the operator waits for new messages after having consumed the last one.

offset = int|string (optional)

The offset to start consuming from. Possible values are:

  • "beginning": first offset
  • "end": last offset
  • "stored": stored offset
  • <value>: absolute offset
  • -<value>: relative offset from end

options = record (optional)

A record of key-value configuration options for librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.

The load_kafka operator passes the key-value pairs directly to librdkafka. Consult the list of available configuration options to configure Kafka according to your needs.

We recommand factoring these options into the plugin-specific kafka.yaml so that they are indpendent of the load_kafka arguments.

Examples

Read 100 JSON messages from the topic tenzir

load_kafka "tenzir", count=100
read_json

Read Zeek Streaming JSON logs starting at the beginning

load_kafka "zeek", offset="beginning"
read_zeek_json