Skip to main content
Version: Next

save_kafka

Saves a byte stream to a Apache Kafka topic.

save_kafka [topic=str, key=str, timestamp=time, options=record]

Description

The save_kafka operator saves bytes to a Kafka topic.

The implementation uses the official librdkafka from Confluent and supports all configuration options. You can specify them via options parameter as {key: value, ...}.

We recommend putting your Kafka options into the dedicated kafka.yaml plugin config file. This way you can configure your all your environment-specific options once, independent of the per-connector invocations.

The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:

  • bootstrap.servers: localhost
  • client.id: tenzir
  • group.id: tenzir

topic = str (optional)

The Kafka topic to use.

Defaults to "tenzir".

key = str (optional)

Sets a fixed key for all messages.

timestamp = time (optional)

Sets a fixed timestamp for all messages.

options = record (optional)

A record of key-value configuration options for librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.

The save_kafka operator passes the key-value pairs directly to librdkafka. Consult the list of available configuration options to configure Kafka according to your needs.

We recommand factoring these options into the plugin-specific kafka.yaml so that they are indpendent of the save_kafka arguments.

Examples

Write the Tenzir version to topic tenzir with timestamp from the past

version
write_json
save_kafka timestamp=1984-01-01

Follow a CSV file and publish it to topic data

load_file "/tmp/data.csv"
read_csv
write_json
save_kafka topic="data"