Receives messages from an AMQP queue.
from_amqp url:secret, [channel=int, exchange=str, routing_key=str, queue=str, options=record, queue_arguments=record, passive=bool, durable=bool, exclusive=bool, no_auto_delete=bool, no_local=bool, ack=bool]Description
Section titled “Description”The from_amqp operator is an AMQP 0-9-1 client that
receives messages from a queue. It emits one event per AMQP message with the
schema tenzir.amqp and a message: blob field containing the message body.
url: secret
Section titled “url: secret”The URL of the AMQP server. The URL must have the following format:
amqp://[USERNAME[:PASSWORD]@]HOSTNAME[:PORT]/[VHOST]The URL is required. Values in the URL override corresponding entries from
options.
channel = int (optional)
Section titled “channel = int (optional)”The channel number to use.
The value must fit into an unsigned 16-bit integer.
Defaults to 1.
exchange = str (optional)
Section titled “exchange = str (optional)”The exchange to interact with.
Defaults to "amq.direct".
routing_key = str (optional)
Section titled “routing_key = str (optional)”The routing key to bind the queue to the exchange.
Defaults to the empty string.
queue = str (optional)
Section titled “queue = str (optional)”The name of the queue to declare and bind.
Defaults to the empty string, which lets the broker generate a queue name, such
as "amq.gen-XNTLF0FwabIn9FFKKtQHzg".
options = record (optional)
Section titled “options = record (optional)”An option record for the AMQP connection. Values must be numbers, booleans, strings, or secrets.
Use options to configure the AMQP connection. Use queue_arguments to
configure the queue declaration.
Available options are:
hostname: 127.0.0.1port: 5672ssl: falsevhost: /max_channels: 2047frame_size: 131072heartbeat: 0sasl_method: plainusername: guestpassword: guestqueue_arguments = record (optional)
Section titled “queue_arguments = record (optional)”A record of AMQP field-table arguments to pass to the queue declaration.
Use this parameter for broker-specific queue settings, such as RabbitMQ quorum queues, queue length limits, message TTLs, single active consumers, and dead-lettering settings.
Values must be numbers, booleans, or strings. Nested records, lists, blobs, nulls, and secrets are not supported.
passive = bool (optional)
Section titled “passive = bool (optional)”If true, the broker replies with OK if a queue already exists with the same
name and raises an error otherwise.
Defaults to false.
durable = bool (optional)
Section titled “durable = bool (optional)”If true, a newly created queue is durable and remains active when the
broker restarts.
Defaults to false.
exclusive = bool (optional)
Section titled “exclusive = bool (optional)”If true, the queue is exclusive to the current connection and is deleted when
the connection closes.
Defaults to false.
no_auto_delete = bool (optional)
Section titled “no_auto_delete = bool (optional)”If true, the queue is not deleted when all consumers have finished using it.
Defaults to false.
no_local = bool (optional)
Section titled “no_local = bool (optional)”If true, the broker doesn’t send messages to the connection that published
them.
Defaults to false.
ack = bool (optional)
Section titled “ack = bool (optional)”If true, the broker expects acknowledgements for messages. If false, the
broker assumes delivery will succeed and immediately dequeues the message.
Defaults to false.
Examples
Section titled “Examples”Receive messages from an AMQP queue
Section titled “Receive messages from an AMQP queue”from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"Parse JSON messages
Section titled “Parse JSON messages”from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"this = string(message).parse_json()Configure connection options
Section titled “Configure connection options”from_amqp "amqp://broker/vhost", options={ username: "tenzir", password: secret("amqp-password"), heartbeat: 30,}Declare a RabbitMQ quorum queue
Section titled “Declare a RabbitMQ quorum queue”from_amqp "amqp://broker/vhost", queue="events", durable=true, no_auto_delete=true, queue_arguments={ "x-queue-type": "quorum", "x-quorum-initial-group-size": 1, }