Saves a byte stream to a Apache Kafka topic.
save_kafka topic:string, [key=string, timestamp=time, options=record, aws_iam=record, aws_region=string]Description
Section titled “Description”The save_kafka operator saves bytes to a Kafka topic.
The implementation uses the official librdkafka from Confluent and
supports all configuration options. You can specify them
via options parameter as {key: value, ...}.
The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:
bootstrap.servers:localhostclient.id:tenzirgroup.id:tenzir
topic: string
Section titled “topic: string”The Kafka topic to use.
key = string (optional)
Section titled “key = string (optional)”Sets a fixed key for all messages.
timestamp = time (optional)
Section titled “timestamp = time (optional)”Sets a fixed timestamp for all messages.
options = record (optional)
Section titled “options = record (optional)”A record of key-value configuration options for
librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.
The save_kafka operator passes the key-value pairs directly to
librdkafka. Consult the list of available configuration
options to configure Kafka according to your needs.
We recommend factoring these options into the plugin-specific kafka.yaml so
that they are independent of the save_kafka arguments.
aws_iam = record (optional)
Section titled “aws_iam = record (optional)”Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the default AWS credential chain.
{ region: string, // AWS region for API requests. access_key_id: string, // AWS access key ID. secret_access_key: string, // AWS secret access key. session_token: string, // session token for temporary credentials. assume_role: string, // ARN of IAM role to assume. session_name: string, // session name for role assumption. external_id: string, // external ID for role assumption.}The access_key_id and secret_access_key must be specified together. If
neither is specified, the operator uses the default AWS credential chain:
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY) - Shared credentials file (
~/.aws/credentials) - IAM role for Amazon EC2 or ECS task role
- Instance metadata service
aws_region = string (optional)
Section titled “aws_region = string (optional)”The AWS region used to construct the MSK authentication URL. Required when connecting to MSK with IAM authentication.
Examples
Section titled “Examples”Write the Tenzir version to topic tenzir with timestamp from the past
Section titled “Write the Tenzir version to topic tenzir with timestamp from the past”versionwrite_jsonsave_kafka "tenzir", timestamp=1984-01-01Follow a CSV file and publish it to topic data
Section titled “Follow a CSV file and publish it to topic data”load_file "/tmp/data.csv"read_csvwrite_jsonsave_kafka "data"