Skip to main content
Version: Next

from

Obtains events from an URI, inferring the source, compression and format.

from uri:string, [loader_args… { … }]
from events…
Use from if you can

The from operator is designed as an easy way to get data into Tenzir, without having to manually write the separate steps of data ingestion manually.

Description

The from operator is an easy way to get data into Tenzir. It will try to infer the connector, compression and format based on the given URI.

Alternatively, it can be used to create events from records.

uri: string

The URI to load from.

loader_args… (optional)

An optional set of arguments passed to the loader. This can be used to e.g. pass credentials to a connector:

from "https://example.org/file.json", headers={Token: "XYZ"}

{ … } (optional)

The optional pipeline argument allows for explicitly specifying how from decompresses and parses data. By default, the pipeline is inferred based on a set of rules.

If inference is not possible, or not sufficient, this argument can be used to control the decompression and parsing. Providing this pipeline disables the inference. Examples

events…

Instead of a URI, you can also provide one or more records, which will be the operators output. This is mostly useful for testing pipelines without loading actual data.

Explanation

Loading a resource into tenzir consists of three steps:

The from operator tries to infer all three steps from the given URI.

Loading

The connector is inferred based on the URI scheme://. See the URI schemes section for supported schemes. If no scheme is present, the connector attempts to load from the filesystem.

Decompressing

The compression is inferred from the "file-ending" in the URI. Under the hood, this uses the decompress operator. Supported compressions can be found in the list of supported codecs.

The decompression step is optional and will only happen if a compression could be inferred. If you know that the source is compressed and the compression cannot be inferred, you can use the pipeline argument to specify the decompression manually.

Reading

The format to read is, just as the compression, inferred from the file-ending. Supported file formats are the common file endings for our read_* operators.

If you want to provide additional arguments to the parser, you can use the pipeline argument to specify the parsing manually. This can be useful, if you e.g. know that the input is suricata or ndjson instead of just plain json.

The pipeline argument & its relation to the loader

Some loaders, such as the load_tcp operator, accept a sub-pipeline directly. If the selected loader accepts a sub-pipeline, the from operator will dispatch decompression and parsing into that sub-pipeline. If a an explicit pipeline argument is provided it is forwarded as-is. If the loader does not accept a sub-pipeline, the decompression and parsing steps are simply performed as part of the regular pipeline.

Example transformation:

from operator
from "myfile.json.gz"
Effective pipeline
load_file "myfile.json.gz"
decompress "gzip"
read_json

Example with pipeline argument:

from operator
from "tcp://0.0.0.0:12345", parallel=10 {
  read_gelf
}
Effective pipeline
load_tcp "tcp://0.0.0.0:12345", parallel=10 {
  read_gelf
}

Supported Deductions

URI schemes

SchemeOperatorExample
abfs,abfssload_azure_blob_storagefrom "abfs://path/to/file.json"
amqpload_amqpfrom "amqp://…
fileload_filefrom "file://path/to/file.json"
fluent-bitfrom_fluent_bitfrom "fluent-bit://elasticsearch"
ftp, ftpsload_ftpfrom "ftp://example.com/file.json"
gcpsload_google_cloud_pubsubfrom "gcps://project_id/subscription_id" { … }
http, httpsload_httpfrom "http://example.com/file.json"
inprocload_zmqfrom "inproc://127.0.0.1:56789" { read_json }
kafkaload_kafkafrom "kafka://topic" { read_json }
s3load_s3from "s3://bucket/file.json"
sqsload_sqsfrom "sqs://my-queue" { read_json }
tcpload_tcpfrom "tcp://127.0.0.1:13245" { read_json }
udpload_udpfrom "udp://127.0.0.1:56789" { read_json }
zmqload_zmqfrom "zmq://127.0.0.1:56789" { read_json }

Please see the respective operator pages for details on the URI's locator format.

File extensions

Format

The from operator can deduce the file format based on these file-endings:

FormatFile EndingsOperator
CSV.csvread_csv
Feather.feather, .arrowread_feather
JSON.jsonread_json
NDJSON.ndjson, .jsonlread_ndjson
Parquet.parquetread_parquet
Pcap.pcapread_pcap
SSV.ssvread_ssv
TSV.tsvread_tsv
YAML.yamlread_yaml

Compression

The from operator can deduce the following compressions based on these file-endings:

CompressionFile Endings
Brotli.br, .brotli
Bzip2.bz2
Gzip.gz, .gzip
LZ4.lz4
Zstd.zst, .zstd

Examples

Load a local file

from "path/to/my/load/file.csv"

Load a compressed file

from "path/to/my/load/file.json.bz2"

Load a file with parser arguments

Provide an explicit header to the CSV parser:

from "path/to/my/load/file.csv.bz2" {
  decompress "brotli" // this is now necessary due to the pipeline argument
  read_csv header="col1,col2,col3"
}

Pick a more suitable parser

The file eve.json contains Suricata logs, but the from operator does not know this. We provide an explicit read_suricata instead:

from "path/to/my/load/eve.json" {
  read_suricata
}

Load from HTTP with a header

from "https://example.org/file.json", headers={Token: "1234"}

Create events from records

from {message: "Value", endpoint: {ip: 127.0.0.1, port: 42}},
     {message: "Value", endpoint: {ip: 127.0.0.1, port: 42}, raw: "text"},
     {message: "Value", endpoint: null}
{
  message: "Value",
  endpoint: {
    ip: 127.0.0.1,
    port: 42
  }
}
{
  message: "Value",
  endpoint: {
    ip: 127.0.0.1,
    port: 42
  },
  raw: "text"
}
{
  message: "Value",
  endpoint: null
}

See Also

to