Skip to content

This guide shows you how to migrate legacy TQL pipelines to Tenzir v6. For most pipelines, the upgrade is small: many pipelines keep working as-is, and others need only a few local updates to sources and sinks. The sections that follow help you find the pipelines that do need attention and update them without changing the intent of your data flow.

Starting with the Tenzir v6 release candidate, the new engine is the default for pipelines. The legacy engine is still available during a temporary migration window, but it is deprecated in v6 and will be removed after the window closes.

Choose one migration mode for your upgrade:

  • Default opt-out: Keep the v6 default. Pipelines use the new engine, and you fix failing pipelines as you find them. If an existing critical pipeline cannot be migrated immediately, add //neo:false temporarily.
  • Controlled opt-in: Set neo: false in your settings so the legacy engine remains the default during your migration. Add // neo to individual pipelines after you update and test them. This reduces the blast radius, but you must track the pipelines that still need migration.

Use compatibility mode only while you migrate. It buys time, but it is not a long-term migration strategy.

Starting with the Tenzir v6 RC, you can temporarily keep the legacy engine as the default by explicitly setting neo: false in your node configuration:

neo: false

With neo: false, opt individual migrated pipelines into the new engine by placing // neo on the first line of the pipeline definition:

// neo
from_http "https://api.example.com/events" {
read_ndjson
}
import

If you keep the v6 default, you can temporarily select the legacy engine for an existing critical pipeline by placing //neo:false on the first line:

//neo:false
load_http "https://api.example.com/events"
read_ndjson
import

The // neo and //neo:false comments must be the first line of the pipeline definition. Pipeline-level comments apply to every pipeline definition form, including saved pipelines, deployed pipelines, ad-hoc CLI pipelines, and API-submitted pipelines.

Use this checklist to plan the migration:

  • Inventory existing pipelines.
  • Choose a migration mode: default opt-out for faster migration, or controlled opt-in with neo: false for a smaller blast radius.
  • Test pipelines against the Tenzir v6 RC.
  • Find legacy-only syntax and operators.
  • Replace legacy patterns with v6 patterns.
  • Use //neo:false only for existing critical pipelines that need temporary continuity.
  • If you use compatibility mode, track pipelines that still need // neo.
  • Deploy and monitor migrated pipelines.

The most important migration change is the move away from old byte-stream source and sink operators toward event-oriented operators with explicit subpipelines.

Message brokers now use event-based source operators. For example, from_amqp emits one event per Kafka message with the message payload in the message field:

from_amqp "amqp://broker/vhost"
this = message.parse_json()
import

This differs from load_amqp, which exposed a byte stream and could lose message boundaries before the parser saw the data.

Similarly, the respective sink operators now accept events directly, sending one message per event:

from { field: "value" }
to_amqp "amqp://broker/vhost", message=this.print_ndjson()

This differs from save_amqp, which expected a byte stream that it would send according to its chunking.

Operators that read or write bytes use subpipelines to keep transport and data format separate. Source operators such as from_http and from_file receive bytes and use a parsing subpipeline:

from_http "https://example.com/events.json.gz" {
decompress_gzip
read_ndjson
}

Sink operators such as to_tcp and serve_tcp receive events and use a printing subpipeline:

export
to_tcp "collector.example.com:5044" {
write_ndjson
}

Use each when every incoming event describes a separate job. The current input event is available as $this, and the nested pipeline starts with its own source:

from {url: "https://example.com/a.json"},
{url: "https://example.com/b.json"}
each parallel=4 {
from_http $this.url {
read_json
}
}

This is the v6-compatible pattern for old pipelines that used a transform such as http to start a per-event request from upstream values.

Use the tables below to find the v6-compatible pattern for legacy syntax, operators, and pipeline patterns. The notes after each table explain when a replacement needs a parsing or printing subpipeline.

Legacy operatorUse in v6
load_filefrom_file
load_s3from_file or from_s3
load_gcsfrom_file or from_google_cloud_storage
load_azure_blob_storagefrom_file or from_azure_blob_storage
save_fileto_file
save_s3to_file or to_s3
save_gcsto_file or to_google_cloud_storage
save_azure_blob_storageto_file or to_azure_blob_storage

Prefer from_file and to_file for URI-style file access. Use provider-specific operators when you need provider-specific options. Put decompression, parsing, and printing in a subpipeline when automatic format detection is not enough.

Legacy operatorUse in v6
load_httpfrom_http
save_httpto_http
load_kafkafrom_kafka
save_kafkato_kafka
load_amqpfrom_amqp
save_amqpto_amqp
load_sqsfrom_sqs
save_sqsto_sqs
load_google_cloud_pubsubfrom_google_cloud_pubsub
save_google_cloud_pubsubto_google_cloud_pubsub

Use a parsing subpipeline with from_http when the response body contains encoded events. Use a printing subpipeline with to_http when you need to build the request body explicitly.

Kafka and message-broker sources preserve message boundaries. For example, from_kafka emits the payload in message, which you can parse with functions such as parse_json.

Legacy operator or modeUse in v6
load_tcp client modefrom_tcp
load_tcp server modeaccept_tcp
save_tcp client modeto_tcp
save_tcp server-style fanoutserve_tcp
load_udpaccept_udp
save_udpto_udp
load_zmqfrom_zmq
save_zmqto_zmq
load_nicfrom_nic

Use accept_tcp when the pipeline should listen for incoming TCP connections, and serve_tcp when clients connect to receive pipeline output. Use parsing subpipelines for received bytes and printing subpipelines for outgoing bytes. accept_udp emits one event per datagram with the payload in data.

Legacy operatorUse in v6
load_stdinfrom_stdin
save_stdoutto_stdout

Add a parsing subpipeline when reading encoded bytes from standard input. Add a printing subpipeline when writing encoded bytes to standard output.

Legacy patternUse in v6
http as a per-event API calleach with from_http
from_http ..., metadata_field=$response inside the parsing subpipeline
from_http ..., server=trueaccept_http

Use each when the nested work should start its own source for every input event. Keep http when you need to enrich the current event with an HTTP response. Listening for incoming HTTP requests is now handled by accept_http.

The old from operator (no suffix) accepted a URI and dispatched to the appropriate load_* operator based on the scheme. Use the table below to find the v6 operator for each URI scheme.

URI schemeUse in v6
abfs, abfssfrom_azure_blob_storage
amqpfrom_amqp
elasticsearchfrom_opensearch
filefrom_file
fluent-bitfrom_fluent_bit
ftp, ftpsfrom_ftp
gsfrom_google_cloud_storage
http, httpsfrom_http
inproc, zmqfrom_zmq
kafkafrom_kafka
opensearchfrom_opensearch
s3from_s3
sqsfrom_sqs
tcpfrom_tcp or accept_tcp
udpaccept_udp

Move any parsing steps that followed the old from call into a parsing subpipeline on the new operator. For TCP, use from_tcp when connecting to a remote endpoint and accept_tcp when listening for incoming connections.

The old to operator (no suffix) accepted a URI and dispatched to the appropriate save_* operator based on the scheme. Use the table below to find the v6 operator for each URI scheme.

URI schemeUse in v6
abfs, abfssto_azure_blob_storage
amqpto_amqp
elasticsearch, opensearchto_opensearch
fileto_file
fluent-bitto_fluent_bit
ftp, ftpsto_ftp
gsto_google_cloud_storage
http, httpsto_http
inproc, zmqto_zmq
kafkato_kafka
s3to_s3
sqsto_sqs
tcpto_tcp or serve_tcp
udpto_udp
smtp, smtps, mailto, emailNo replacement — see below.

Move any printing steps that preceded the old to call into a printing subpipeline on the new operator. For TCP, use to_tcp when connecting to a remote endpoint and serve_tcp when clients connect to receive output. Email schemes (smtp, smtps, mailto, email) have no direct replacement; use the python or shell operator to send email instead.

These items need redesign rather than a direct replacement:

Removed itemWhat to do instead
bufferRemove it.
legacyRewrite the operation using the existing operators.
save_emailUse the python or shell operator to send an email.
Operator-level metricsMonitor pipeline-level metrics instead.

Before:

load_file "/var/log/app/*.json"
read_ndjson
import

After:

from_file "/var/log/app/*.json" {
read_ndjson
}
import

Before:

load_s3 "s3://bucket/logs/*.json.gz"
decompress_gzip
read_ndjson
import

After:

from_file "s3://bucket/logs/*.json.gz" {
decompress_gzip
read_ndjson
}
import

Before:

export
write_ndjson
save_file "/tmp/events.ndjson"

After:

export
to_file "/tmp/events.ndjson" {
write_ndjson
}

Before:

load_http "https://api.example.com/events"
read_ndjson
import

After:

from_http "https://api.example.com/events" {
read_ndjson
}
import

Before:

export
write_ndjson
save_http "https://collector.example.com/events"

After:

export
to_http "https://collector.example.com/events" {
write_ndjson
}

Before:

load_tcp "0.0.0.0:5514" {
read_syslog
}
import

After:

accept_tcp "0.0.0.0:5514" {
read_syslog
}
import

Before:

export
write_json
save_tcp "collector.example.com:9000"

After:

export
to_tcp "collector.example.com:9000" {
write_json
}

Before:

load_udp "0.0.0.0:5514", insert_newlines=true
read_ndjson
import

After:

accept_udp "0.0.0.0:5514"
select this = data.parse_json()
import

Before:

export
write_ndjson
save_udp "127.0.0.1:5514"

After:

export
to_udp "127.0.0.1:5514"

Before:

load_kafka "security-events"
read_json
import

After:

from_kafka "security-events"
this = message.parse_json()
import

Before:

export
write_ndjson
save_kafka "security-events"

After:

export
to_kafka "security-events"

to_kafka sends one message per event and serializes it as NDJSON by default.

Before:

load_amqp "amqp://user:pass@broker:5672/vhost", queue="events"
read_json

After:

from_amqp "amqp://user:pass@broker:5672/vhost", queue="events"
this = string(message).parse_json()

Before:

export
write_json
save_amqp "amqp://user:pass@broker:5672/vhost"

After:

export
to_amqp "amqp://user:pass@broker:5672/vhost"

to_amqp sends one message per event and serializes it as NDJSON by default. Use message= to send a custom expression instead.

Use the named AMQP and SQS operators for these brokers. Prefer those forms over generic URI routing when you update pipeline definitions.

Before:

load_google_cloud_pubsub project_id="acme", subscription_id="events"
read_json

After:

from_google_cloud_pubsub project_id="acme", subscription_id="events"
this = message.parse_json()

Before:

export
write_json
save_google_cloud_pubsub project_id="acme", topic_id="events"

After:

export
to_google_cloud_pubsub project_id="acme", topic_id="events", message=this.print_json()

to_google_cloud_pubsub requires an explicit message= expression. There is no default serialization format.

Before:

load_sqs "queue"
read_json

After:

from_sqs "queue"
this = message.parse_json()

Before:

export
write_json
save_sqs "queue"

After:

export
to_sqs "queue"

to_sqs sends one SQS message per event and serializes it as NDJSON by default.

Before:

load_zmq "tcp://127.0.0.1:5555"
read_json

After:

from_zmq "tcp://127.0.0.1:5555" {
read_json
}

Before:

export
write_json
save_zmq "tcp://127.0.0.1:5555"

After:

export
to_zmq "tcp://127.0.0.1:5555", encoding="json"

to_zmq requires an explicit encoding= argument. Use the same format names as the corresponding write_* operators, for example "json", "ndjson", or "csv".

Before:

load_stdin
read_ndjson

After:

from_stdin {
read_ndjson
}

Before:

version
write_json
save_stdout

After:

version
to_stdout {
write_json
}

Before:

load_nic "en0"
read_pcap

After:

from_nic "en0"

Replace from_http response metadata fields

Section titled “Replace from_http response metadata fields”

Before:

from_http "https://api.example.com/events", metadata_field=http_meta {
read_json
}
where http_meta.code == 200

After:

from_http "https://api.example.com/events" {
read_json
where $response.code == 200
http_meta = $response
}

Before:

from_http "0.0.0.0:8080", server=true {
read_json
}
import

After:

accept_http "0.0.0.0:8080" {
read_json
}
import

Before:

from {url: "https://api.example.com/a"},
{url: "https://api.example.com/b"}
http url

After:

from {url: "https://api.example.com/a"},
{url: "https://api.example.com/b"}
each parallel=4 {
from_http $this.url {
read_json
}
}

Before:

export live=true
buffer 100k, policy="drop"
write_ndjson
save_http "https://collector.example.com/events"

After:

export live=true
to_http "https://collector.example.com/events" {
write_ndjson
}

Last updated: