This guide shows you how to migrate legacy TQL pipelines to Tenzir v6. For most pipelines, the upgrade is small: many pipelines keep working as-is, and others need only a few local updates to sources and sinks. The sections that follow help you find the pipelines that do need attention and update them without changing the intent of your data flow.
Starting with the Tenzir v6 release candidate, the new engine is the default for pipelines. The legacy engine is still available during a temporary migration window, but it is deprecated in v6 and will be removed after the window closes.
Choose a migration mode
Section titled “Choose a migration mode”Choose one migration mode for your upgrade:
- Default opt-out: Keep the v6 default. Pipelines use the new engine, and
you fix failing pipelines as you find them. If an existing critical pipeline
cannot be migrated immediately, add
//neo:falsetemporarily. - Controlled opt-in: Set
neo: falsein your settings so the legacy engine remains the default during your migration. Add// neoto individual pipelines after you update and test them. This reduces the blast radius, but you must track the pipelines that still need migration.
Use compatibility mode only while you migrate. It buys time, but it is not a long-term migration strategy.
Temporary compatibility controls
Section titled “Temporary compatibility controls”Starting with the Tenzir v6 RC, you can temporarily keep the legacy engine as
the default by explicitly setting neo: false in your node configuration:
neo: falseWith neo: false, opt individual migrated pipelines into the new engine by
placing // neo on the first line of the pipeline definition:
// neofrom_http "https://api.example.com/events" { read_ndjson}importIf you keep the v6 default, you can temporarily select the legacy engine for an
existing critical pipeline by placing //neo:false on the first line:
//neo:falseload_http "https://api.example.com/events"read_ndjsonimportThe // neo and //neo:false comments must be the first line of the pipeline
definition. Pipeline-level comments apply to every pipeline definition form,
including saved pipelines, deployed pipelines, ad-hoc CLI pipelines, and
API-submitted pipelines.
Migration checklist
Section titled “Migration checklist”Use this checklist to plan the migration:
- Inventory existing pipelines.
- Choose a migration mode: default opt-out for faster migration, or controlled
opt-in with
neo: falsefor a smaller blast radius. - Test pipelines against the Tenzir v6 RC.
- Find legacy-only syntax and operators.
- Replace legacy patterns with v6 patterns.
- Use
//neo:falseonly for existing critical pipelines that need temporary continuity. - If you use compatibility mode, track pipelines that still need
// neo. - Deploy and monitor migrated pipelines.
New v6 execution patterns
Section titled “New v6 execution patterns”The most important migration change is the move away from old byte-stream source and sink operators toward event-oriented operators with explicit subpipelines.
Message-based sources and destinations
Section titled “Message-based sources and destinations”Sources
Section titled “Sources”Message brokers now use event-based source operators. For
example, from_amqp emits one event per Kafka message with the message payload
in the message field:
from_amqp "amqp://broker/vhost"this = message.parse_json()importThis differs from load_amqp, which exposed a byte stream and could lose
message boundaries before the parser saw the data.
Destinations
Section titled “Destinations”Similarly, the respective sink operators now accept events directly, sending one message per event:
from { field: "value" }to_amqp "amqp://broker/vhost", message=this.print_ndjson()This differs from save_amqp, which expected a byte stream that it would send
according to its chunking.
Parsing and printing subpipelines
Section titled “Parsing and printing subpipelines”Operators that read or write bytes use subpipelines to keep transport and data
format separate. Source operators such as from_http
and from_file receive bytes and use a parsing subpipeline:
from_http "https://example.com/events.json.gz" { decompress_gzip read_ndjson}Sink operators such as to_tcp and serve_tcp receive events and
use a printing subpipeline:
exportto_tcp "collector.example.com:5044" { write_ndjson}Per-event subpipelines
Section titled “Per-event subpipelines”Use each when every incoming event describes a separate job. The
current input event is available as $this, and the nested pipeline starts with
its own source:
from {url: "https://example.com/a.json"}, {url: "https://example.com/b.json"}each parallel=4 { from_http $this.url { read_json }}This is the v6-compatible pattern for old pipelines that used a transform such
as http to start a per-event request from upstream values.
Migration reference
Section titled “Migration reference”Use the tables below to find the v6-compatible pattern for legacy syntax, operators, and pipeline patterns. The notes after each table explain when a replacement needs a parsing or printing subpipeline.
Files and object storage
Section titled “Files and object storage”| Legacy operator | Use in v6 |
|---|---|
load_file | from_file |
load_s3 | from_file or from_s3 |
load_gcs | from_file or from_google_cloud_storage |
load_azure_blob_storage | from_file or from_azure_blob_storage |
save_file | to_file |
save_s3 | to_file or to_s3 |
save_gcs | to_file or to_google_cloud_storage |
save_azure_blob_storage | to_file or to_azure_blob_storage |
Prefer from_file and to_file for URI-style file access. Use
provider-specific operators when you need provider-specific options. Put
decompression, parsing, and printing in a subpipeline when automatic format
detection is not enough.
HTTP and message brokers
Section titled “HTTP and message brokers”| Legacy operator | Use in v6 |
|---|---|
load_http | from_http |
save_http | to_http |
load_kafka | from_kafka |
save_kafka | to_kafka |
load_amqp | from_amqp |
save_amqp | to_amqp |
load_sqs | from_sqs |
save_sqs | to_sqs |
load_google_cloud_pubsub | from_google_cloud_pubsub |
save_google_cloud_pubsub | to_google_cloud_pubsub |
Use a parsing subpipeline with from_http when the response body contains
encoded events. Use a printing subpipeline with to_http when you need to build
the request body explicitly.
Kafka and message-broker sources preserve message boundaries. For example,
from_kafka emits the payload in message, which you can parse with functions
such as parse_json.
Network transports
Section titled “Network transports”| Legacy operator or mode | Use in v6 |
|---|---|
load_tcp client mode | from_tcp |
load_tcp server mode | accept_tcp |
save_tcp client mode | to_tcp |
save_tcp server-style fanout | serve_tcp |
load_udp | accept_udp |
save_udp | to_udp |
load_zmq | from_zmq |
save_zmq | to_zmq |
load_nic | from_nic |
Use accept_tcp when the pipeline should listen for incoming TCP connections,
and serve_tcp when clients connect to receive pipeline output. Use parsing
subpipelines for received bytes and printing subpipelines for outgoing bytes.
accept_udp emits one event per datagram with the payload in data.
Standard input and output
Section titled “Standard input and output”| Legacy operator | Use in v6 |
|---|---|
load_stdin | from_stdin |
save_stdout | to_stdout |
Add a parsing subpipeline when reading encoded bytes from standard input. Add a printing subpipeline when writing encoded bytes to standard output.
Changed patterns
Section titled “Changed patterns”| Legacy pattern | Use in v6 |
|---|---|
http as a per-event API call | each with from_http |
from_http ..., metadata_field= | $response inside the parsing subpipeline |
from_http ..., server=true | accept_http |
Use each when the nested work should start its own source for every input
event. Keep http when you need to enrich the current event with an HTTP
response. Listening for incoming HTTP requests is now handled by accept_http.
Generic from URI routing
Section titled “Generic from URI routing”The old from operator (no suffix) accepted a URI and dispatched to the
appropriate load_* operator based on the scheme. Use the table below to find
the v6 operator for each URI scheme.
| URI scheme | Use in v6 |
|---|---|
abfs, abfss | from_azure_blob_storage |
amqp | from_amqp |
elasticsearch | from_opensearch |
file | from_file |
fluent-bit | from_fluent_bit |
ftp, ftps | from_ftp |
gs | from_google_cloud_storage |
http, https | from_http |
inproc, zmq | from_zmq |
kafka | from_kafka |
opensearch | from_opensearch |
s3 | from_s3 |
sqs | from_sqs |
tcp | from_tcp or accept_tcp |
udp | accept_udp |
Move any parsing steps that followed the old from call into a parsing
subpipeline on the new operator. For TCP, use from_tcp when connecting to a
remote endpoint and accept_tcp when listening for incoming connections.
Generic to URI routing
Section titled “Generic to URI routing”The old to operator (no suffix) accepted a URI and dispatched to the
appropriate save_* operator based on the scheme. Use the table below to find
the v6 operator for each URI scheme.
| URI scheme | Use in v6 |
|---|---|
abfs, abfss | to_azure_blob_storage |
amqp | to_amqp |
elasticsearch, opensearch | to_opensearch |
file | to_file |
fluent-bit | to_fluent_bit |
ftp, ftps | to_ftp |
gs | to_google_cloud_storage |
http, https | to_http |
inproc, zmq | to_zmq |
kafka | to_kafka |
s3 | to_s3 |
sqs | to_sqs |
tcp | to_tcp or serve_tcp |
udp | to_udp |
smtp, smtps, mailto, email | No replacement — see below. |
Move any printing steps that preceded the old to call into a printing
subpipeline on the new operator. For TCP, use to_tcp when connecting to a
remote endpoint and serve_tcp when clients connect to receive output. Email
schemes (smtp, smtps, mailto, email) have no direct replacement; use
the python or shell operator to send email instead.
Removed without replacement
Section titled “Removed without replacement”These items need redesign rather than a direct replacement:
| Removed item | What to do instead |
|---|---|
buffer | Remove it. |
legacy | Rewrite the operation using the existing operators. |
save_email | Use the python or shell operator to send an email. |
| Operator-level metrics | Monitor pipeline-level metrics instead. |
Before and after examples
Section titled “Before and after examples”Read files and cloud objects
Section titled “Read files and cloud objects”Before:
load_file "/var/log/app/*.json"read_ndjsonimportAfter:
from_file "/var/log/app/*.json" { read_ndjson}importBefore:
load_s3 "s3://bucket/logs/*.json.gz"decompress_gzipread_ndjsonimportAfter:
from_file "s3://bucket/logs/*.json.gz" { decompress_gzip read_ndjson}importWrite files and cloud objects
Section titled “Write files and cloud objects”Before:
exportwrite_ndjsonsave_file "/tmp/events.ndjson"After:
exportto_file "/tmp/events.ndjson" { write_ndjson}Fetch HTTP data
Section titled “Fetch HTTP data”Before:
load_http "https://api.example.com/events"read_ndjsonimportAfter:
from_http "https://api.example.com/events" { read_ndjson}importSend HTTP requests
Section titled “Send HTTP requests”Before:
exportwrite_ndjsonsave_http "https://collector.example.com/events"After:
exportto_http "https://collector.example.com/events" { write_ndjson}Receive TCP data
Section titled “Receive TCP data”Before:
load_tcp "0.0.0.0:5514" { read_syslog}importAfter:
accept_tcp "0.0.0.0:5514" { read_syslog}importSend TCP data
Section titled “Send TCP data”Before:
exportwrite_jsonsave_tcp "collector.example.com:9000"After:
exportto_tcp "collector.example.com:9000" { write_json}Read and write UDP data
Section titled “Read and write UDP data”Before:
load_udp "0.0.0.0:5514", insert_newlines=trueread_ndjsonimportAfter:
accept_udp "0.0.0.0:5514"select this = data.parse_json()importBefore:
exportwrite_ndjsonsave_udp "127.0.0.1:5514"After:
exportto_udp "127.0.0.1:5514"Message Broker Connectors
Section titled “Message Broker Connectors”Consume
Section titled “Consume”Before:
load_kafka "security-events"read_jsonimportAfter:
from_kafka "security-events"this = message.parse_json()importProduce
Section titled “Produce”Before:
exportwrite_ndjsonsave_kafka "security-events"After:
exportto_kafka "security-events"to_kafka sends one message per event and serializes it as NDJSON by default.
Consume
Section titled “Consume”Before:
load_amqp "amqp://user:pass@broker:5672/vhost", queue="events"read_jsonAfter:
from_amqp "amqp://user:pass@broker:5672/vhost", queue="events"this = string(message).parse_json()Produce
Section titled “Produce”Before:
exportwrite_jsonsave_amqp "amqp://user:pass@broker:5672/vhost"After:
exportto_amqp "amqp://user:pass@broker:5672/vhost"to_amqp sends one message per event and serializes it as NDJSON by default.
Use message= to send a custom expression instead.
Use the named AMQP and SQS operators for these brokers. Prefer those forms over generic URI routing when you update pipeline definitions.
Google Cloud PubSub
Section titled “Google Cloud PubSub”Consume
Section titled “Consume”Before:
load_google_cloud_pubsub project_id="acme", subscription_id="events"read_jsonAfter:
from_google_cloud_pubsub project_id="acme", subscription_id="events"this = message.parse_json()Produce
Section titled “Produce”Before:
exportwrite_jsonsave_google_cloud_pubsub project_id="acme", topic_id="events"After:
exportto_google_cloud_pubsub project_id="acme", topic_id="events", message=this.print_json()to_google_cloud_pubsub requires an explicit message= expression. There is
no default serialization format.
Consume
Section titled “Consume”Before:
load_sqs "queue"read_jsonAfter:
from_sqs "queue"this = message.parse_json()Produce
Section titled “Produce”Before:
exportwrite_jsonsave_sqs "queue"After:
exportto_sqs "queue"to_sqs sends one SQS message per event and serializes it as NDJSON by default.
Consume
Section titled “Consume”Before:
load_zmq "tcp://127.0.0.1:5555"read_jsonAfter:
from_zmq "tcp://127.0.0.1:5555" { read_json}Produce
Section titled “Produce”Before:
exportwrite_jsonsave_zmq "tcp://127.0.0.1:5555"After:
exportto_zmq "tcp://127.0.0.1:5555", encoding="json"to_zmq requires an explicit encoding= argument. Use the same format names
as the corresponding write_* operators, for example "json", "ndjson", or
"csv".
Replace explicit stdio
Section titled “Replace explicit stdio”Before:
load_stdinread_ndjsonAfter:
from_stdin { read_ndjson}Before:
versionwrite_jsonsave_stdoutAfter:
versionto_stdout { write_json}Capture packets from a network interface
Section titled “Capture packets from a network interface”Before:
load_nic "en0"read_pcapAfter:
from_nic "en0"Replace from_http response metadata fields
Section titled “Replace from_http response metadata fields”Before:
from_http "https://api.example.com/events", metadata_field=http_meta { read_json}where http_meta.code == 200After:
from_http "https://api.example.com/events" { read_json where $response.code == 200 http_meta = $response}Replace from_http server mode
Section titled “Replace from_http server mode”Before:
from_http "0.0.0.0:8080", server=true { read_json}importAfter:
accept_http "0.0.0.0:8080" { read_json}importReplace per-event HTTP jobs
Section titled “Replace per-event HTTP jobs”Before:
from {url: "https://api.example.com/a"}, {url: "https://api.example.com/b"}http urlAfter:
from {url: "https://api.example.com/a"}, {url: "https://api.example.com/b"}each parallel=4 { from_http $this.url { read_json }}Remove explicit buffers
Section titled “Remove explicit buffers”Before:
export live=truebuffer 100k, policy="drop"write_ndjsonsave_http "https://collector.example.com/events"After:
export live=trueto_http "https://collector.example.com/events" { write_ndjson}