Skip to content

Sends messages to an Apache Kafka topic.

to_kafka topic:string, [message=blob|string, key=string, timestamp=time,
options=record, aws_iam=record, aws_region=string]

The to_kafka operator sends one message per event to a Kafka topic.

The implementation uses the official librdkafka from Confluent and supports all configuration options. You can specify them via options parameter as {key: value, ...}.

The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:

  • bootstrap.servers: localhost
  • client.id: tenzir

The exact Kafka topic to send messages to. Regular expressions aren’t supported for producers.

An expression that evaluates to the message content for each row.

Defaults to this.print_ndjson() when not specified.

Sets a fixed key for all messages.

Sets a fixed timestamp for all messages.

A record of key-value configuration options for librdkafka, e.g., {"acks": "all", "batch.size": 16384}.

The to_kafka operator passes the key-value pairs directly to librdkafka. Consult the list of available configuration options to configure Kafka according to your needs.

We recommend factoring these options into the plugin-specific kafka.yaml so that they are independent of the to_kafka arguments.

Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the AWS SDK's default credential chain.

{
region: string, // AWS region for API requests.
access_key_id: string, // AWS access key ID.
secret_access_key: string, // AWS secret access key.
session_token: string, // session token for temporary credentials.
assume_role: string, // ARN of IAM role to assume.
session_name: string, // session name for role assumption.
external_id: string, // external ID for role assumption.
web_identity: record, // OIDC web identity token configuration.
}

See AWS Authentication for a description of every field, the default credential chain, web identity configuration, and local authentication with the AWS CLI.

The AWS region used to construct the MSK authentication URL. Required when connecting to MSK with IAM authentication.

Send NDJSON-formatted events to topic events (using default)

Section titled “Send NDJSON-formatted events to topic events (using default)”

Stream security events to a Kafka topic with automatic JSON formatting:

subscribe "security-alerts"
where severity >= "high"
select timestamp, source_ip, alert_type, details
to_kafka "events"

This pipeline subscribes to security alerts, filters for high-severity events, selects relevant fields, and sends them to Kafka as JSON. Each event is formatted using this.print_ndjson(), producing one compact JSON object per message:

{"timestamp":"2024-03-15T10:30:00.000000","severity":"high"}

Send NDJSON-formatted events with explicit message

Section titled “Send NDJSON-formatted events with explicit message”
subscribe "logs"
to_kafka "events", message=this.print_ndjson()

Send specific field values with a timestamp

Section titled “Send specific field values with a timestamp”
subscribe "logs"
to_kafka "alerts", message=alert_msg, timestamp=2024-01-01T00:00:00

Send data with a fixed key for partitioning

Section titled “Send data with a fixed key for partitioning”
metrics
to_kafka "metrics", message=this.print_ndjson(), key="server-01"

Last updated: