Sends messages to an Apache Kafka topic.
to_kafka topic:string, [message=blob|string, key=string, timestamp=time, options=record, aws_iam=record]Description
Section titled “Description”The to_kafka operator sends one message per event to a Kafka topic.
The implementation uses the official librdkafka from Confluent and
supports all configuration options. You can specify them
via options parameter as {key: value, ...}.
The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:
bootstrap.servers:localhostclient.id:tenzir
topic: string
Section titled “topic: string”The Kafka topic to send messages to.
message = blob|string (optional)
Section titled “message = blob|string (optional)”An expression that evaluates to the message content for each row.
Defaults to this.print_json() when not specified.
key = string (optional)
Section titled “key = string (optional)”Sets a fixed key for all messages.
timestamp = time (optional)
Section titled “timestamp = time (optional)”Sets a fixed timestamp for all messages.
options = record (optional)
Section titled “options = record (optional)”A record of key-value configuration options for
librdkafka, e.g., {"acks": "all", "batch.size": 16384}.
The to_kafka operator passes the key-value pairs directly to
librdkafka. Consult the list of available configuration
options to configure Kafka according to your needs.
We recommend factoring these options into the plugin-specific kafka.yaml so
that they are independent of the to_kafka arguments.
aws_iam = record (optional)
Section titled “aws_iam = record (optional)”If specified, enables using AWS IAM Authentication for MSK. The keys must be non-empty when specified.
Available keys:
region: Region of the MSK Clusters. Must be specified when using IAM.assume_role: Optional Role ARN to assume.session_name: Optional session name to use when assuming a role.external_id: Optional external id to use when assuming a role.
The operator will try to get credentials in the following order:
- Checks your environment variables for AWS Credentials.
- Checks your
$HOME/.aws/credentialsfile for a profile and credentials - Contacts and logs in to a trusted identity provider. The login information to
these providers can either be on the environment variables:
AWS_ROLE_ARN,AWS_WEB_IDENTITY_TOKEN_FILE,AWS_ROLE_SESSION_NAMEor on a profile in your$HOME/.aws/credentials. - Checks for an external method set as part of a profile on
$HOME/.aws/configto generate or look up credentials that are not directly supported by AWS. - Contacts the ECS Task Role to request credentials if Environment variable
AWS_CONTAINER_CREDENTIALS_RELATIVE_URIhas been set. - Contacts the EC2 Instance Metadata service to request credentials if
AWS_EC2_METADATA_DISABLEDis NOT set to ON.
Examples
Section titled “Examples”Send JSON-formatted events to topic events (using default)
Section titled “Send JSON-formatted events to topic events (using default)”Stream security events to a Kafka topic with automatic JSON formatting:
subscribe "security-alerts"where severity >= "high"select timestamp, source_ip, alert_type, detailsto_kafka "events"This pipeline subscribes to security alerts, filters for high-severity events,
selects relevant fields, and sends them to Kafka as JSON. Each event is
automatically formatted using this.print_json(), producing messages like:
{ "timestamp": "2024-03-15T10:30:00.000000", "source_ip": "192.168.1.100", "alert_type": "brute_force", "details": "Multiple failed login attempts detected"}Send JSON-formatted events with explicit message
Section titled “Send JSON-formatted events with explicit message”subscribe "logs"to_kafka "events", message=this.print_json()Send specific field values with a timestamp
Section titled “Send specific field values with a timestamp”subscribe "logs"to_kafka "alerts", message=alert_msg, timestamp=2024-01-01T00:00:00Send data with a fixed key for partitioning
Section titled “Send data with a fixed key for partitioning”metricsto_kafka "metrics", message=this.print_json(), key="server-01"