Saves a byte stream to a Apache Kafka topic.
save_kafka topic:string, [key=string, timestamp=time, options=record, aws_iam=record]Description
Section titled “Description”The save_kafka operator saves bytes to a Kafka topic.
The implementation uses the official librdkafka from Confluent and
supports all configuration options. You can specify them
via options parameter as {key: value, ...}.
The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:
bootstrap.servers:localhostclient.id:tenzirgroup.id:tenzir
topic: string
Section titled “topic: string”The Kafka topic to use.
key = string (optional)
Section titled “key = string (optional)”Sets a fixed key for all messages.
timestamp = time (optional)
Section titled “timestamp = time (optional)”Sets a fixed timestamp for all messages.
options = record (optional)
Section titled “options = record (optional)”A record of key-value configuration options for
librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.
The save_kafka operator passes the key-value pairs directly to
librdkafka. Consult the list of available configuration
options to configure Kafka according to your needs.
We recommend factoring these options into the plugin-specific kafka.yaml so
that they are independent of the save_kafka arguments.
aws_iam = record (optional)
Section titled “aws_iam = record (optional)”If specified, enables using AWS IAM Authentication for MSK. The keys must be non-empty when specified.
Available keys:
region: Region of the MSK Clusters. Must be specified when using IAM.assume_role: Optional Role ARN to assume.session_name: Optional session name to use when assuming a role.external_id: Optional external id to use when assuming a role.
The operator will try to get credentials in the following order:
- Checks your environment variables for AWS Credentials.
- Checks your
$HOME/.aws/credentialsfile for a profile and credentials - Contacts and logs in to a trusted identity provider. The login information to
these providers can either be on the environment variables:
AWS_ROLE_ARN,AWS_WEB_IDENTITY_TOKEN_FILE,AWS_ROLE_SESSION_NAMEor on a profile in your$HOME/.aws/credentials. - Checks for an external method set as part of a profile on
$HOME/.aws/configto generate or look up credentials that are not directly supported by AWS. - Contacts the ECS Task Role to request credentials if Environment variable
AWS_CONTAINER_CREDENTIALS_RELATIVE_URIhas been set. - Contacts the EC2 Instance Metadata service to request credentials if
AWS_EC2_METADATA_DISABLEDis NOT set to ON.
Examples
Section titled “Examples”Write the Tenzir version to topic tenzir with timestamp from the past
Section titled “Write the Tenzir version to topic tenzir with timestamp from the past”versionwrite_jsonsave_kafka "tenzir", timestamp=1984-01-01Follow a CSV file and publish it to topic data
Section titled “Follow a CSV file and publish it to topic data”load_file "/tmp/data.csv"read_csvwrite_jsonsave_kafka "data"