A Tenzir pipeline is a chain of operators that represents a dataflow.
Operators are the atomic building blocks that produce, transform, or consume
data. Think of them as Unix or Powershell commands where the result from one
command is feeding into the next:
Our pipelines have 3 types of operators: inputs that produce data,
outputs that consume data, and transformations that do both:
Tenzir pipelines make one more distinction: the elements that the operators push
through the pipeline are typed. An operator has an upstream and
downstream type:
When composing pipelines out of operators, upstream/downstream types of adjacent
operators have to match. Otherwise the pipeline is malformed. We call any
void-to-void operator sequence a closed pipeline. Only closed pipelines
can execute.
If a pipeline is not closed, Tenzir attempts to auto-complete missing input and
output operators. When you run a pipeline on
the command line, we implicitly read JSON from stdin and write JSON to stdout.
When you run a pipeline in the app and do not provide a sink, we only append
serve to make the pipeline a REST API and extract
data piecemeal through your browser.
Operators can be polymorphic in that they can have more than a single upstream
and downstream type. For example, buffer accepts
both bytes and events.
Many Tenzir pipelines use the from and
to operators to get data in and out, respectively.
For example, to load data from a local JSON file system, filter events where a
certain field matches a predicate, and store the result to an S3 bucket in
Parquet format, you can write the following pipeline:
This pipelines consists of three operators:
The operator from is a void-to-events
input operator, where an events-to-events
transformation operator, and to an events-to-void
output operator.
Other inputs provide bytes first and you need to interpret them in order to
transform them as events.
With these building blocks in place, you can create all kinds of pipelines, as
long as they follow the two principal rules of (1) sequencing inputs,
transformations, and outputs, and (2) ensuring that operator upstream/downstream
types match. Here is an example of other valid pipeline instances:
Every event that flows through a pipeline is part of a data frame with a
schema. Internally, these data frames are represented as Apache Arrow record
batch, encoding potentially of tens of thousands of events at once. This
innate batching is the reason why the pipelines can achieve a high throughput.
Unique about Tenzir is that a single pipeline can run with multiple schemas,
even though the events are data frames internally. Tenzir parsers
(bytes-to-event operators) are capable of emitting events with changing schemas.
This behavior is different from other engines that work with data frames where
operators can only execute on a single schema. In this light, Tenzir combines
the performance of structured query engines with the flexibility of
document-oriented engines.
If an operator detects a schema changes, it creates a new batch of events. In
terms of performance, the worst case for Tenzir is a ordered stream of
schema-switching events, with every event having a new schema than the previous
one. But even for those scenarios operators can efficiently build homogeneous
batches when the inter-event order does not matter. Similar to predicate
pushdown, Tenzir operators support "ordering pushdown" to signal to upstream
operators that the event order only matters intra-schema but not inter-schema.
In this case the operator transparently "demultiplex" a heterogeneous event
stream into N homogeneous streams. The sort operator
is an example of such an operator; it pushes its ordering requirements upstream,
allowing parsers to efficiently create multiple streams events in parallel.
Some operators only work with exactly one instance per schema internally, such
as write_csv, which first writes a header
and then all subsequent rows have to adhere to the emitted schema. Such
operators cannot handle events with changing schemas.
It's important to mention that most of the time you don't have to worry about
schemas. They are there for you when you want to work with them, but it's often
enough to just specified the fields that you want to work with, e.g., where
id.orig_h in 10.0.0.0/8, or select src_ip, dest_ip, proto. Schemas are
inferred automatically in parsers, but you can also seed a parser with a schema
that you define explicitly.
Unified Live Stream Processing and Historical Queries
Engines for event stream processing and batch processing of historical data have
vastly different requirements. We believe that we found a sweetspot with our
language and accompanying execution engine that makes working with both types of
workloads incredibly easy: just pick a input operator at the beginning the a
pipeline that points to your data source, be it infinitely streaming or stored
dataset. Tenzir will figure out the rest.
Our desired user experience for interacting with historical data looks like
this:
Ingest: to store data at a node, create a pipeline that ends with
import.
Query: to run a historical query over data at the node, create a pipeline
that begins with export.
For example, to ingest JSON from a Kafka, you write from "kafka://topic |
import. To query the stored data, you write export | where file == 42.
The example with export suggests that the pipeline first exports everything,
and only then starts filtering with where, performing a full scan over the
stored data. But this is not what's happening. Pipelines support predicate
pushdown for every operator. This means that export receives the filter
expression before it starts executing, enabling index lookups or other
optimizations to efficiently execute queries with high selectivity where scans
would be sub-optimal.
The key insight here is to realize that optimizations like predicate pushdown
extend to the storage engine and do not only apply to the streaming executor.
The Tenzir native storage engine is not a full-fledged database, but rather a
catalog with a thin indexing layer over a set of Parquet/Feather files. These
sparse indexes (sketch data structures, such as min-max synopses, Bloom filters,
etc.) avoid full scans for every query. The catalog tracks evolving schemas,
performs expression binding, and provides a transactional interface to add and
replace partitions during compaction.
The diagram below shows the main components of the storage engine:
Because of this transparent optimization, you can just exchange the input
operator of a pipeline and switch between historical and streaming execution
and everything works as expected. A typical use case begins some exploratory
data analysis involving a few export pipelines, but then would deploy the
pipeline on streaming data by exchanging the input with a Kafka stream.
Tenzir pipelines have built-in network communication, allowing you to create a
distributed fabric of dataflows to express intricate use cases that go beyond
single-machine processing. There are two types of network connections:
implicit and explicit ones:
An implicit network connection exists, for example, when you use the tenzir
binary on the command line to run a pipeline that ends in
import:
Tenzir pipelines are eschewing networking to minimize latency and maximize
throughput, which results in the following operator placement for the above examples:
The executor generally transfers ownership of operators between
processes as late as possible to prefer local, high-bandwidth communication. For
maximum control over placement of computation, you can override the automatic
operator location with the local and
remote operators.
The above examples are implicit network connections because they're not visible
in the pipeline definition. An explicit network connection terminates a pipeline
as with an input or output operator:
This fictive data fabric above consists of a heterogeneous set of technologies,
interconnected by pipelines. Because you have full control over the location
where you run the pipeline, you can push it all the way to the "last mile." This
helps especially when there are compliance and data residency concerns that must
be properly addressed.