kafka
Loads bytes from and saves bytes to Kafka.
Synopsis
Loader:
kafka [-t <topic>] [-c|--count <n>] [-e|--exit] [-o|--offset <offset>]
[-X|--set <key=value>,...]
Saver:
kafka [-t <topic>] [-k|--key <key>] [-T|--timestamp <time>]
[-X|--set <key=value>]
Description
The kafka
loader reads bytes from a Kafka topic. The kafka
saver writes
bytes to a Kafka topic.
The implementation uses the official librdkafka from Confluent and
supports all configuration options. You can specify them
via -X <key=value>,...
. We recommend putting your Kafka options into the
dedicated kafka.yaml
plugin config file.
This way you can configure your all your environment-specific options once,
independent of the per-connector invocations.
The connector injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:
bootstrap.servers
:localhost
client.id
:tenzir
group.id
:tenzir
The default format for the kafka
connector is json
.
-t|--topic <topic>
(Loader, Saver)
The Kafka topic use.
Defaults to tenzir
.
-c|--count <n>
(Loader)
Exit successfully after having consumed n
messages.
-e|--exit
(Loader)
Exit successfully after having received the last message.
Without this option, the loader waits for new messages after having consumed the last one.
-o|--offset <offset>
(Loader)
The offset to start consuming from. Possible values are:
beginning
: first offsetend
: last offsetstored
: stored offset<value>
: absolute offset-<value>
: relative offset from end
-X|--set <key=value>
(Loader, Saver)
A comma-separated list of key-value configuration options for
librdkafka, e.g., -X
auto.offset.reset=earliest,enable.partition.eof=true
.
The kafka
operator passes the key-value pairs directly to
librdkafka. Consult the list of available configuration
options to configure Kafka according to your needs.
We recommand factoring these options into the plugin-specific kafka.yaml
so
that they are indpendent of the kafka
connector arguments.
-k|--key <key>
(Saver)
Sets a fixed key for all messages.
-T|--timestamp <time>
(Saver)
Sets a fixed timestamp for all messages.
Examples
Read 100 JSON messages from the topic tenzir
:
from kafka -c 100 read json
Read Zeek Streaming JSON logs from topic zeek
starting at the beginning:
from kafka -t zeek -o beginning read zeek-json
Write the Tenzir version to topic tenzir
with timestamp from the past:
version | to kafka -T 1984-01-01
Follow a CSV file and publish it to topic data
:
from file -f /tmp/data.csv read csv | to kafka -t data