Skip to content

IndexmainHTTPsBulk APIBulk APIto_opensearchSearch APIHTTPsClientaccept_opensearchClientfrom_http

OpenSearch is a search and observability suite for unstructured data. Tenzir can send events to OpenSearch and emulate a OpenSearch-compatible Bulk API endpoint.

When sending data to OpenSearch, Tenzir uses the Bulk API and attempts to maximally batch events for throughput, accumulating multiple events before shipping them within a single API call. You can control batching behavior with the max_content_length and buffer_timeout options.

Use to_opensearch to send events to OpenSearch Bulk API endpoints.

For more details, see the documentation for the to_opensearch operator.

Tenzir can also present a OpenSearch-compatible REST API via the accept_opensearch operator.

Use from_http to query the Search API. The Search API supports GET and POST requests to /{index}/_search. When you provide a request body, from_http uses post by default.

from_http "https://localhost:9200/main/_search",
body={
size: 100,
query: {match_all: {}},
sort: [{"@timestamp": {order: "desc"}}],
} {
read_json
}
unroll hits.hits
this = hits.hits._source

Add an Authorization header if your OpenSearch cluster requires authentication.

Export large result sets with search_after

Section titled “Export large result sets with search_after”

For larger exports, prefer search_after with a stable sort. Return a request record from paginate to replace the request body for each next page. The next request keeps the same URL, method, and headers unless the request record changes them.

Keep only the response parser inside the from_http subpipeline. Operators such as unroll belong after from_http, so the pagination lambda receives the whole search response envelope.

from_http "https://localhost:9200/main/_search",
body={
size: 1000,
query: {match_all: {}},
sort: [{"@timestamp": "asc"}, {"_id": "asc"}],
},
paginate=(x => {
body: {
size: 1000,
query: {match_all: {}},
sort: [{"@timestamp": "asc"}, {"_id": "asc"}],
search_after: x.hits.hits[-1].sort,
},
} if x.hits.hits != []) {
read_json
}
unroll hits.hits
this = hits.hits._source

Add an Authorization header if your OpenSearch cluster requires authentication.

Use the Scroll API when you need a server-side search context for a long-running export. Start with a search request that includes a scroll query parameter, then return a request record that switches the next URL to /_search/scroll and sends the latest _scroll_id in the body:

let $search = "https://localhost:9200/main/_search?scroll=10m"
let $scroll = "https://localhost:9200/_search/scroll"
from_http $search,
body={
size: 10000,
query: {match_all: {}},
},
paginate=(x => {
url: $scroll,
body: {
scroll: "10m",
scroll_id: x._scroll_id,
},
} if x.hits.hits != []) {
read_json
}
unroll hits.hits
this = hits.hits._source

The paginated request inherits the POST method from the initial request because the initial request has a body. It also keeps the configured headers. Close the scroll context when the export is complete, because OpenSearch keeps the search context alive until the scroll timeout expires.

Send an example event to the main index:

from {event: "example"}
to_opensearch "1.2.3.4:9200", action="create", index="main"

Instead of treating the entire event as a document to be indexed by OpenSearch, you can designate a nested record as the document:

from {category: "qux", doc_id: "XXX", event: {foo: "bar"}}
to_opensearch "localhost:9200", id=doc_id, doc=event, action="update", index=category

The above example updates the document with ID XXX with the contents from the nested field event.

Tenzir can act as a drop-in replacement for OpenSearch by accepting data via a Bulk API endpoint. This allows you to point your Logstash or Beats instances to Tenzir instead.

accept_opensearch "0.0.0.0:9200", keep_actions=true
publish "opensearch"

This pipeline accepts data on port 9200 and publishes all received events on the opensearch topic for further processing by other pipelines.

Use accept_opensearch for new pipelines that receive bulk ingestion data.

Setting keep_actions=true causes action objects to remain in the stream, like this:

{create:{_index:"filebeat-8.17.3"}} // 👈 action object
{"@timestamp":2025-03-31T13:42:28.068Z,log:{offset:1,file:{path:"/mounted/logfile"}},message:"hello",input:{type:"log"},host:{name:"eb21"},agent:{id:"682cfcf4-f251-4576-abcb-6c8bcadfda08",name:"eb21",type:"filebeat",version:"8.17.3",ephemeral_id:"17f74f6e-36f0-4045-93e6-c549874716df"},ecs:{version:"8.0.0"}}
{create:{_index:"filebeat-8.17.3"}} // 👈 action object
{"@timestamp":2025-03-31T13:42:28.068Z,log:{offset:7,file:{path:"/mounted/logfile"}},message:"this",input:{type:"log"},host:{name:"eb21"},agent:{id:"682cfcf4-f251-4576-abcb-6c8bcadfda08",name:"eb21",type:"filebeat",version:"8.17.3",ephemeral_id:"17f74f6e-36f0-4045-93e6-c549874716df"},ecs:{version:"8.0.0"}}

Configure Filebeat as follows to ship data to Tenzir:

filebeat.yml
output.elasticsearch:
hosts: ["localhost:9200"]

Set hosts to the endpoint of the Tenzir pipeline accepting data.

Configure Logstash with the

opensearch output plugin to ship data to Tenzir:

pipeline.conf
output {
opensearch {
hosts => "https://localhost:9200"
}
}

Set hosts to the endpoint of the Tenzir pipeline accepting data.

Last updated: