OpenSearch is a search and observability suite for unstructured data. Tenzir can send events to OpenSearch and emulate a OpenSearch-compatible Bulk API endpoint.
When sending data to OpenSearch, Tenzir uses the Bulk API and attempts to maximally batch events
for throughput, accumulating multiple events before shipping them within a
single API call. You can control batching behavior with the max_content_length
and buffer_timeout options.
Use to_opensearch to send events to OpenSearch Bulk API
endpoints.
For more details, see the documentation for the to_opensearch operator.
Tenzir can also present a OpenSearch-compatible REST API via the accept_opensearch operator.
Examples
Section titled “Examples”Query documents from a OpenSearch index
Section titled “Query documents from a OpenSearch index”Use from_http to query the
Search API. The Search API supports
GET and POST requests to /{index}/_search. When you provide a request
body, from_http uses post by default.
from_http "https://localhost:9200/main/_search", body={ size: 100, query: {match_all: {}}, sort: [{"@timestamp": {order: "desc"}}], } { read_json}unroll hits.hitsthis = hits.hits._sourceAdd an Authorization header if your OpenSearch cluster requires
authentication.
Export large result sets with search_after
Section titled “Export large result sets with search_after”For larger exports, prefer search_after with a stable sort. Return a request
record from paginate to replace the request body for each next page. The next
request keeps the same URL, method, and headers unless the request record
changes them.
Keep only the response parser inside the from_http subpipeline.
Operators such as unroll belong after from_http, so
the pagination lambda receives the whole search response envelope.
from_http "https://localhost:9200/main/_search", body={ size: 1000, query: {match_all: {}}, sort: [{"@timestamp": "asc"}, {"_id": "asc"}], }, paginate=(x => { body: { size: 1000, query: {match_all: {}}, sort: [{"@timestamp": "asc"}, {"_id": "asc"}], search_after: x.hits.hits[-1].sort, }, } if x.hits.hits != []) { read_json}unroll hits.hitsthis = hits.hits._sourceAdd an Authorization header if your OpenSearch cluster requires
authentication.
Export large result sets with scroll
Section titled “Export large result sets with scroll”Use the Scroll API when you need a server-side
search context for a long-running export. Start with a search request that
includes a scroll query parameter, then return a request record that switches
the next URL to /_search/scroll and sends the latest _scroll_id in the body:
let $search = "https://localhost:9200/main/_search?scroll=10m"let $scroll = "https://localhost:9200/_search/scroll"
from_http $search, body={ size: 10000, query: {match_all: {}}, }, paginate=(x => { url: $scroll, body: { scroll: "10m", scroll_id: x._scroll_id, }, } if x.hits.hits != []) { read_json}unroll hits.hitsthis = hits.hits._sourceThe paginated request inherits the POST method from the initial request
because the initial request has a body. It also keeps the configured headers.
Close the scroll context when the export is complete, because OpenSearch
keeps the search context alive until the scroll timeout expires.
Send events to a OpenSearch index
Section titled “Send events to a OpenSearch index”Send an example event to the main index:
from {event: "example"}to_opensearch "1.2.3.4:9200", action="create", index="main"Instead of treating the entire event as a document to be indexed by OpenSearch, you can designate a nested record as the document:
from {category: "qux", doc_id: "XXX", event: {foo: "bar"}}to_opensearch "localhost:9200", id=doc_id, doc=event, action="update", index=categoryThe above example updates the document with ID XXX with the contents from the
nested field event.
Accept data by emulating OpenSearch
Section titled “Accept data by emulating OpenSearch”Tenzir can act as a drop-in replacement for OpenSearch by accepting data via a Bulk API endpoint. This allows you to point your Logstash or Beats instances to Tenzir instead.
accept_opensearch "0.0.0.0:9200", keep_actions=truepublish "opensearch"This pipeline accepts data on port 9200 and publishes all received events on
the opensearch topic for further processing by
other pipelines.
Use accept_opensearch for new pipelines that receive bulk
ingestion data.
Setting keep_actions=true causes action objects to remain in the stream, like
this:
{create:{_index:"filebeat-8.17.3"}} // 👈 action object{"@timestamp":2025-03-31T13:42:28.068Z,log:{offset:1,file:{path:"/mounted/logfile"}},message:"hello",input:{type:"log"},host:{name:"eb21"},agent:{id:"682cfcf4-f251-4576-abcb-6c8bcadfda08",name:"eb21",type:"filebeat",version:"8.17.3",ephemeral_id:"17f74f6e-36f0-4045-93e6-c549874716df"},ecs:{version:"8.0.0"}}{create:{_index:"filebeat-8.17.3"}} // 👈 action object{"@timestamp":2025-03-31T13:42:28.068Z,log:{offset:7,file:{path:"/mounted/logfile"}},message:"this",input:{type:"log"},host:{name:"eb21"},agent:{id:"682cfcf4-f251-4576-abcb-6c8bcadfda08",name:"eb21",type:"filebeat",version:"8.17.3",ephemeral_id:"17f74f6e-36f0-4045-93e6-c549874716df"},ecs:{version:"8.0.0"}}Ship data via Filebeat
Section titled “Ship data via Filebeat”Configure Filebeat as follows to ship data to Tenzir:
output.elasticsearch: hosts: ["localhost:9200"]Set hosts to the endpoint of the Tenzir pipeline accepting data.
Ship data via Logstash
Section titled “Ship data via Logstash”Configure Logstash with the
opensearch output plugin to ship data to Tenzir:
output { opensearch { hosts => "https://localhost:9200" }}Set hosts to the endpoint of the Tenzir pipeline accepting data.