Skip to content

Receives records from an Amazon Kinesis Data Streams stream.

from_amazon_kinesis stream:string, [start=string|time, count=int, exit=bool,
records_per_call=int, poll_idle=duration,
aws_region=string, aws_iam=record, endpoint=string]

The from_amazon_kinesis operator reads records from the existing shards of a Kinesis data stream and emits one event per Kinesis record. All shards are read concurrently, so throughput scales with the shard count. Record order is preserved within a shard but interleaved across shards, matching the ordering guarantees of Kinesis itself.

The emitted events use the tenzir.amazon_kinesis schema with these fields:

FieldTypeDescription
messageblobThe raw Kinesis record payload.
streamstringThe stream name.
shard_idstringThe shard that contained the record.
sequence_numberstringThe Kinesis sequence number.
partition_keystringThe record partition key.
arrival_timetimeThe approximate time when Kinesis received it.
encryption_typestringThe server-side encryption type, when available.
behind_latestdurationThe shard lag reported by Kinesis.

The arrival_time and encryption_type fields are optional because Kinesis only returns them when they are present on the record.

The operator lists shards during startup and discovers new shards when an existing shard closes, such as after a resharding operation. Pipeline snapshots store the next sequence number per shard, and restarts resume with AFTER_SEQUENCE_NUMBER. This gives at-least-once restart behavior.

The operator requires these AWS permissions:

  • kinesis:ListShards
  • kinesis:GetShardIterator
  • kinesis:GetRecords

The name of the Kinesis data stream to receive records from.

The position for the initial shard iterator when no snapshot is available.

The value can be one of:

  • "latest": start after the latest record
  • "trim_horizon": start at the oldest available record
  • <timestamp>: start at the given timestamp

Defaults to "latest".

Exit successfully after emitting count records.

The value must be greater than zero.

Exit successfully after the operator is caught up on all shards.

Without this option, the operator waits for new records after consuming the currently available records.

Defaults to false.

The maximum number of records to fetch in one GetRecords request.

The value must be between 1 and 10000.

Defaults to 1000.

How long to wait after a GetRecords request returns no records.

The value must be non-negative and less than 5min.

Defaults to 1s.

The AWS region for reading from the stream.

If omitted, the operator uses the region from aws_iam when present. Otherwise, it uses the default AWS SDK region resolution.

A custom Kinesis endpoint URL.

If omitted, the operator uses AWS_ENDPOINT_URL_KINESIS when set, then AWS_ENDPOINT_URL, then the default AWS SDK endpoint for the region.

Configures explicit AWS credentials or IAM role assumption. If not specified, the operator uses the AWS SDK's default credential chain.

{
region: string, // AWS region for API requests.
access_key_id: string, // AWS access key ID.
secret_access_key: string, // AWS secret access key.
session_token: string, // session token for temporary credentials.
assume_role: string, // ARN of IAM role to assume.
session_name: string, // session name for role assumption.
external_id: string, // external ID for role assumption.
web_identity: record, // OIDC web identity token configuration.
}

See AWS Authentication for a description of every field, the default credential chain, web identity configuration, and local authentication with the AWS CLI.

from_amazon_kinesis "security-events"
from_amazon_kinesis "security-events", start="trim_horizon"
from_amazon_kinesis "security-events", start="trim_horizon"
this = string(message).parse_json()
from_amazon_kinesis "security-events",
start="trim_horizon",
count=100,
exit=true
from_amazon_kinesis "security-events", aws_iam={
region: "us-east-1",
access_key_id: secret("aws-key"),
secret_access_key: secret("aws-secret")
}
from_amazon_kinesis "security-events",
aws_region="us-east-1",
endpoint="http://127.0.0.1:4566"

Last updated: