save_kafka
Saves a byte stream to a Apache Kafka topic.
save_kafka topic:string, [key=string, timestamp=time, options=record,
aws_iam=record]
Description
The save_kafka
operator saves bytes to a Kafka topic.
The implementation uses the official librdkafka from Confluent and
supports all configuration options. You can specify them
via options
parameter as {key: value, ...}
.
We recommend putting your Kafka options into the dedicated kafka.yaml
plugin
config file. This way you can configure
your all your environment-specific options once, independent of the
per-connector invocations.
The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:
bootstrap.servers
:localhost
client.id
:tenzir
group.id
:tenzir
topic: string
The Kafka topic to use.
key = string (optional)
Sets a fixed key for all messages.
timestamp = time (optional)
Sets a fixed timestamp for all messages.
options = record (optional)
A record of key-value configuration options for
librdkafka, e.g., {"auto.offset.reset" : "earliest",
"enable.partition.eof": true}
.
The save_kafka
operator passes the key-value pairs directly to
librdkafka. Consult the list of available configuration
options to configure Kafka according to your needs.
We recommand factoring these options into the plugin-specific kafka.yaml
so
that they are indpendent of the save_kafka
arguments.
aws_iam = record (optional)
If specified, enables using AWS IAM Authentication for MSK. The keys must be non-empty when specified.
Available keys:
region
: Region of the MSK Clusters. Must be specified when using IAM.assume_role
: Optional Role ARN to assume.session_name
: Optional session name to use when assuming a role.external_id
: Optional external id to use when assuming a role.
The operator will try to get credentials in the following order:
- Checks your environment variables for AWS Credentials.
- Checks your
$HOME/.aws/credentials
file for a profile and credentials - Contacts and logs in to a trusted identity provider. The login information to
these providers can either be on the environment variables:
AWS_ROLE_ARN
,AWS_WEB_IDENTITY_TOKEN_FILE
,AWS_ROLE_SESSION_NAME
or on a profile in your$HOME/.aws/credentials
. - Checks for an external method set as part of a profile on
$HOME/.aws/config
to generate or look up credentials that are not directly supported by AWS. - Contacts the ECS Task Role to request credentials if Environment variable
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
has been set. - Contacts the EC2 Instance Metadata service to request credentials if
AWS_EC2_METADATA_DISABLED
is NOT set to ON.
Examples
Write the Tenzir version to topic tenzir
with timestamp from the past
version
write_json
save_kafka "tenzir", timestamp=1984-01-01
Follow a CSV file and publish it to topic data
load_file "/tmp/data.csv"
read_csv
write_json
save_kafka "data"