Skip to content

Sends records to an Amazon Kinesis Data Streams stream.

to_amazon_kinesis stream:string, [message=blob|string, partition_key=string,
batch_size=int, batch_timeout=duration, parallel=int,
aws_region=string, aws_iam=record, endpoint=string]

The to_amazon_kinesis operator sends records to a Kinesis data stream with PutRecords.

By default, to_amazon_kinesis serializes each input event as NDJSON with this.print_ndjson(). Use the message option to send a specific string or blob expression instead. If the expression evaluates to null or another type, the operator skips that event and emits a warning.

If you omit partition_key, the operator generates a random UUID for each record. A partition key expression must produce non-empty strings up to 256 characters. Kinesis record payloads plus their partition key must fit the stream’s configured maximum record size, which defaults to 1 MiB and can be raised to 10 MiB. The operator discovers the limit at startup via kinesis:DescribeStreamSummary; without that permission it falls back to the 10 MiB API maximum and lets the service enforce the effective limit. Events that exceed these limits are skipped with a warning.

The operator retries failed PutRecords entries with exponential backoff before emitting an error.

The operator requires the kinesis:PutRecords AWS permission and uses kinesis:DescribeStreamSummary when available.

The name of the Kinesis data stream to send records to.

The expression that produces the Kinesis record payload for each event.

Defaults to this.print_ndjson().

The expression that produces the Kinesis partition key for each event.

If omitted, the operator generates a random UUID per event.

The maximum number of records per PutRecords request.

The value must be between 1 and 500.

Defaults to 500.

How long to wait before flushing a non-empty batch.

The value must be positive.

Defaults to 1s.

The maximum number of concurrent PutRecords requests.

Values greater than one overlap request round trips and multiply write throughput accordingly, but concurrent requests can complete out of order. If you use a fixed partition_key and depend on strict ordering within a shard, keep the default.

The value must be greater than zero.

Defaults to 1.

The AWS region for writing to the stream.

If omitted, the operator uses the region from aws_iam when present. Otherwise, it uses the default AWS SDK region resolution.

A custom Kinesis endpoint URL.

If omitted, the operator uses AWS_ENDPOINT_URL_KINESIS when set, then AWS_ENDPOINT_URL, then the default AWS SDK endpoint for the region.

Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the AWS SDK's default credential chain.

{
region: string, // AWS region for API requests.
access_key_id: string, // AWS access key ID.
secret_access_key: string, // AWS secret access key.
session_token: string, // session token for temporary credentials.
assume_role: string, // ARN of IAM role to assume.
session_name: string, // session name for role assumption.
external_id: string, // external ID for role assumption.
web_identity: record, // OIDC web identity token configuration.
}

See AWS Authentication for a description of every field, the default credential chain, web identity configuration, and local authentication with the AWS CLI.

subscribe "alerts"
to_amazon_kinesis "security-events"
from {payload: "security event detected", tenant: "acme"}
to_amazon_kinesis "security-events",
message=payload,
partition_key=tenant

Send JSON strings with an explicit partition key

Section titled “Send JSON strings with an explicit partition key”
subscribe "alerts"
to_amazon_kinesis "security-events",
message=this.print_json(),
partition_key=string(src_ip)
subscribe "alerts"
to_amazon_kinesis "security-events",
batch_size=100,
batch_timeout=500ms
subscribe "alerts"
to_amazon_kinesis "security-events", aws_iam={
region: "us-east-1",
access_key_id: secret("aws-key"),
secret_access_key: secret("aws-secret")
}
from {alert: "test"}
to_amazon_kinesis "security-events",
aws_region="us-east-1",
endpoint="http://127.0.0.1:4566"

Last updated: