Receives records from an Amazon Kinesis Data Streams stream.
from_amazon_kinesis stream:string, [start=string|time, count=int, exit=bool, records_per_call=int, poll_idle=duration, aws_region=string, aws_iam=record, endpoint=string]Description
Section titled “Description”The from_amazon_kinesis operator reads records from the existing shards of a
Kinesis data stream and emits one event per Kinesis record. All shards are
read concurrently, so throughput scales with the shard count. Record order is
preserved within a shard but interleaved across shards, matching the ordering
guarantees of Kinesis itself.
The emitted events use the tenzir.amazon_kinesis schema with these fields:
| Field | Type | Description |
|---|---|---|
message | blob | The raw Kinesis record payload. |
stream | string | The stream name. |
shard_id | string | The shard that contained the record. |
sequence_number | string | The Kinesis sequence number. |
partition_key | string | The record partition key. |
arrival_time | time | The approximate time when Kinesis received it. |
encryption_type | string | The server-side encryption type, when available. |
behind_latest | duration | The shard lag reported by Kinesis. |
The arrival_time and encryption_type fields are optional because Kinesis
only returns them when they are present on the record.
The operator lists shards during startup and discovers new shards when an
existing shard closes, such as after a resharding operation. Pipeline
snapshots store the next sequence number per shard, and restarts resume with
AFTER_SEQUENCE_NUMBER. This gives at-least-once restart behavior.
The operator requires these AWS permissions:
kinesis:ListShardskinesis:GetShardIteratorkinesis:GetRecords
stream: string
Section titled “stream: string”The name of the Kinesis data stream to receive records from.
start = string | time (optional)
Section titled “start = string | time (optional)”The position for the initial shard iterator when no snapshot is available.
The value can be one of:
"latest": start after the latest record"trim_horizon": start at the oldest available record<timestamp>: start at the given timestamp
Defaults to "latest".
count = int (optional)
Section titled “count = int (optional)”Exit successfully after emitting count records.
The value must be greater than zero.
exit = bool (optional)
Section titled “exit = bool (optional)”Exit successfully after the operator is caught up on all shards.
Without this option, the operator waits for new records after consuming the currently available records.
Defaults to false.
records_per_call = int (optional)
Section titled “records_per_call = int (optional)”The maximum number of records to fetch in one GetRecords request.
The value must be between 1 and 10000.
Defaults to 1000.
poll_idle = duration (optional)
Section titled “poll_idle = duration (optional)”How long to wait after a GetRecords request returns no records.
The value must be non-negative and less than 5min.
Defaults to 1s.
aws_region = string (optional)
Section titled “aws_region = string (optional)”The AWS region for reading from the stream.
If omitted, the operator uses the region from aws_iam when present. Otherwise,
it uses the default AWS SDK region resolution.
endpoint = string (optional)
Section titled “endpoint = string (optional)”A custom Kinesis endpoint URL.
If omitted, the operator uses AWS_ENDPOINT_URL_KINESIS when set, then
AWS_ENDPOINT_URL, then the default AWS SDK endpoint for the region.
aws_iam = record (optional)
Section titled “aws_iam = record (optional)”Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the AWS SDK's default credential chain.
{ region: string, // AWS region for API requests. access_key_id: string, // AWS access key ID. secret_access_key: string, // AWS secret access key. session_token: string, // session token for temporary credentials. assume_role: string, // ARN of IAM role to assume. session_name: string, // session name for role assumption. external_id: string, // external ID for role assumption. web_identity: record, // OIDC web identity token configuration.}See AWS Authentication for a description of every field, the default credential chain, web identity configuration, and local authentication with the AWS CLI.
Examples
Section titled “Examples”Read from the latest position
Section titled “Read from the latest position”from_amazon_kinesis "security-events"Read from the oldest available record
Section titled “Read from the oldest available record”from_amazon_kinesis "security-events", start="trim_horizon"Parse JSON payloads
Section titled “Parse JSON payloads”from_amazon_kinesis "security-events", start="trim_horizon"this = string(message).parse_json()Read a bounded number of records
Section titled “Read a bounded number of records”from_amazon_kinesis "security-events", start="trim_horizon", count=100, exit=trueUse explicit credentials
Section titled “Use explicit credentials”from_amazon_kinesis "security-events", aws_iam={ region: "us-east-1", access_key_id: secret("aws-key"), secret_access_key: secret("aws-secret")}Use a LocalStack endpoint
Section titled “Use a LocalStack endpoint”from_amazon_kinesis "security-events", aws_region="us-east-1", endpoint="http://127.0.0.1:4566"