Amazon Kinesis Data Streams is a managed streaming data service on AWS.
Tenzir can receive records from Kinesis streams with
from_amazon_kinesis and send records to Kinesis streams with
to_amazon_kinesis.
When Tenzir reads from Kinesis, it emits one event per Kinesis record. The event
uses the tenzir.amazon_kinesis schema and contains the raw record payload in
the message field as a blob, together with metadata such as the stream,
shard, sequence number, partition key, arrival time, and lag.
Tenzir does not parse or decompress Kinesis payloads automatically. Convert the
message blob explicitly in TQL when the stream contains structured data:
from_amazon_kinesis "security-events", start="trim_horizon"this = string(message).parse_json()The source operator lists shards during startup and discovers new shards when an existing shard closes, such as after a resharding operation, so running pipelines follow reshards without a restart. Snapshots store per-shard sequence numbers and resume with at-least-once semantics.
Configuration
Section titled “Configuration”Follow the Amazon integration configuration to authenticate with your AWS credentials.
Alternatively, use the aws_iam parameter to provide explicit credentials:
from_amazon_kinesis "security-events", aws_iam={ region: "us-east-1", access_key_id: secret("aws-key"), secret_access_key: secret("aws-secret")}You can also use aws_iam to assume an IAM role:
from_amazon_kinesis "security-events", aws_iam={ region: "eu-west-1", assume_role: "arn:aws:iam::123456789012:role/my-kinesis-role", session_name: "tenzir-session"}Set endpoint to use a Kinesis-compatible endpoint, such as LocalStack:
from_amazon_kinesis "security-events", aws_region="us-east-1", endpoint="http://127.0.0.1:4566"When endpoint is omitted, Tenzir checks AWS_ENDPOINT_URL_KINESIS first, then
AWS_ENDPOINT_URL, then uses the default AWS SDK endpoint for the region.
Tenzir needs these Kinesis permissions:
| Operator | Required permissions |
|---|---|
from_amazon_kinesis | kinesis:ListShards, kinesis:GetShardIterator, kinesis:GetRecords |
to_amazon_kinesis | kinesis:PutRecords |
Examples
Section titled “Examples”Send events to a stream
Section titled “Send events to a stream”subscribe "alerts"to_amazon_kinesis "security-events"This sends one NDJSON record per input event by using the default
message=this.print_ndjson() expression.
Send a custom payload
Section titled “Send a custom payload”from {payload: "security event detected", tenant: "acme"}to_amazon_kinesis "security-events", message=payload, partition_key=tenantIf you omit partition_key, Tenzir generates a random UUID per event.
Receive and parse JSON records
Section titled “Receive and parse JSON records”from_amazon_kinesis "security-events", start="trim_horizon", count=100, exit=truethis = string(message).parse_json()Read from a timestamp
Section titled “Read from a timestamp”from_amazon_kinesis "security-events", start=2026-01-01T00:00:00Z