Skip to content

Groups streaming events into event-time windows and runs a subpipeline for each window.

window size=duration, on=expr, [every=duration, tolerance=duration, idle_timeout=duration] {}

The window operator splits an event stream into bounded time ranges. Each window has its own subpipeline. Events are streamed directly into the subpipeline rather than buffered and replayed when the window closes, so a subpipeline that only filters or transforms forwards events as they arrive. The subpipeline either emits events—which are forwarded as the operator’s output—or ends with a sink, in which case window itself becomes a sink. The subpipeline must not produce bytes.

Unlike every, which reruns a subpipeline on a wall-clock schedule, window operates on event time: it assigns each event to windows by the timestamp that on evaluates to. Use summarize without window to aggregate the complete input, or summarize with options={frequency: ...} for processing-time periodic emission when you don’t need event-time windows or late-event handling.

window creates fixed windows of width size. These include tumbling windows (non-overlapping) and hopping windows (overlapping). Fixed windows use left-closed, right-open intervals: [start, end). An event whose timestamp equals the window end belongs to the next window. Window boundaries are aligned to the Unix epoch.

window has no built-in partition key. To maintain independent windows per entity—per user, host, or source IP address—wrap window in an outer group operator. The grouping key is then available inside the subpipeline as $group. Each key advances its own event-time clock, so sparse keys may close their windows late, or not until the input ends, unless idle_timeout is set.

window processes events in stream order and uses the timestamp that on evaluates to for window assignment. The current time is the largest timestamp observed so far, which only ever moves forward.

A window closes once the clock reaches its end plus tolerance. The window’s subpipeline then closes and its output is streamed out. Because closing is driven by observed timestamps, the most recent windows stay open until a later event arrives. Two mechanisms bound this wait:

  • idle_timeout force-closes an open window after that much wall-clock time passes without a new event.
  • When the input ends, all open windows are closed.

Output is streamed directly out of each window. Windows are usually closed in window-time order, but window makes no ordering guarantees about its output.

Events are processed in stream order, independent of how they are grouped into batches: an event is late when an earlier event already advanced the clock to the close point of its window. Late events—those for a window that has already closed—are dropped with a warning. With overlapping windows an event is still delivered to whichever of its target windows are open; it is only reported as dropped when all of its target windows have already closed. The operator does not emit empty windows.

The length of a fixed window.

If every is omitted, window starts one window every size, creating non-overlapping tumbling windows.

The event-time expression; it must evaluate to a timestamp. window uses this value as the event time of each event.

Use on=now() to window by processing time, i.e. by wall-clock arrival.

Events for which on evaluates to null, or to a value that is not a timestamp, are dropped with a warning.

The distance between the start times of fixed windows. Defaults to size.

If every < size, windows overlap and an event can belong to multiple windows. Use this form for sliding-style detections that should update frequently. An event that falls into several overlapping windows is copied to each participating window’s subpipeline. Each overlapping window runs its own subpipeline, so a non-aggregating subpipeline duplicates its pass-through output once per window.

every must not be greater than size.

The amount of out-of-order event time to wait for before closing a window. It applies to event time only. Defaults to 0s: a window closes as soon as the event-time clock reaches the window end. Increase it to tolerate out-of-order data.

window tracks the largest timestamp observed so far and closes a window once that value reaches the window end plus tolerance. Events that arrive after their window closed are dropped with a warning.

The maximum wall-clock time a window may stay open without receiving a new event before it is force-closed and emitted. This makes window well suited to low-volume streams: results arrive promptly even when the next event is far off, instead of waiting for it or for the end of the input. Defaults to infinite: windows then close only via tolerance or at the end of the input.

The subpipeline to run for each window. Inside the subpipeline, $window is a record with the current window’s boundaries:

FieldTypeDescription
$window.starttimeThe inclusive window start.
$window.endtimeThe exclusive window end.

The operator does not add these fields automatically; assign them explicitly when you want them in the output.

Count events by severity in one-hour tumbling windows. Grouping by severity inside summarize keeps a single global hourly clock.

from_kafka "security-events"
this = message.parse_json()
ts = ts.time()
window size=1h, on=ts, tolerance=5min {
summarize severity, events=count()
start = $window.start
end = $window.end
}

Detect brute-force logins with a hopping window

Section titled “Detect brute-force logins with a hopping window”

Detect many failed logins for the same user from the same source IP address in a 10-minute window that advances every minute. The outer group gives each user/IP pair its own window clock.

from_kafka "auth-events"
this = message.parse_json()
ts = ts.time()
where action == "login" and outcome == "failure"
group {user: user, src_ip: src_ip} {
window size=10min, every=1min, on=ts, tolerance=2min, idle_timeout=5min {
summarize failures=count(), target_hosts=distinct(host)
user = $group.user
src_ip = $group.src_ip
start = $window.start
end = $window.end
}
}
where failures >= 20

Detect a single source IP address that fails authentication for many distinct users within 15 minutes.

from_kafka "auth-events"
this = message.parse_json()
ts = ts.time()
where action == "login" and outcome == "failure"
group src_ip {
window size=15min, every=1min, on=ts, tolerance=2min, idle_timeout=5min {
summarize attempts=count(), users=count_distinct(user)
src_ip = $group
start = $window.start
end = $window.end
}
}
where users >= 25 and attempts >= 50

Send alerts directly from the window subpipeline

Section titled “Send alerts directly from the window subpipeline”

The subpipeline can end with a sink. This form makes window a sink and sends detections as soon as each window closes.

from_kafka "auth-events"
this = message.parse_json()
ts = ts.time()
where action == "login" and outcome == "failure"
group {user: user, src_ip: src_ip} {
window size=10min, every=1min, on=ts, tolerance=2min, idle_timeout=5min {
summarize failures=count()
user = $group.user
src_ip = $group.src_ip
where failures >= 20
to_http "https://example.org/security-alerts" {
write_json
}
}
}

Last updated: