Spark Scala Time Windows: window, session_window, and window_time
When you need to aggregate event-time data — page views per 5 minutes, average price every minute, user activity per session — Spark provides window, session_window, and window_time. These functions bucket timestamps into intervals so you can group and aggregate them with the regular DataFrame API. They work on both batch and streaming DataFrames; the examples below all use batch.
Tumbling Windows with window
A tumbling window is a fixed-size, non-overlapping bucket. Pass a timestamp column and a duration string, and window produces a StructType column with start and end fields covering each bucket the row falls into.
def window(timeColumn: Column, windowDuration: String): Column
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
The duration strings accept formats like "5 minutes", "1 hour", or "30 seconds". Use the resulting struct column in a groupBy to aggregate rows per window:
val df = Seq(
("2026-05-18 09:00:15", "click"),
("2026-05-18 09:04:42", "click"),
("2026-05-18 09:09:30", "purchase"),
("2026-05-18 09:11:05", "click"),
("2026-05-18 09:14:55", "click"),
("2026-05-18 09:22:10", "purchase"),
).toDF("event_time", "event_type")
.withColumn("event_time", to_timestamp(col("event_time")))
val df2 = df
.groupBy(window(col("event_time"), "5 minutes"))
.count()
.orderBy("window")
df2.show(false)
// +------------------------------------------+-----+
// |window |count|
// +------------------------------------------+-----+
// |{2026-05-18 09:00:00, 2026-05-18 09:05:00}|2 |
// |{2026-05-18 09:05:00, 2026-05-18 09:10:00}|1 |
// |{2026-05-18 09:10:00, 2026-05-18 09:15:00}|2 |
// |{2026-05-18 09:20:00, 2026-05-18 09:25:00}|1 |
// +------------------------------------------+-----+
Notice the bucket 09:15 — 09:20 is missing — there were no events in that range, and Spark only emits windows that contain data. The window boundaries are also aligned to the Unix epoch by default, which is why the first bucket starts at 09:00:00 and not at the time of the first row.
Sliding Windows
A sliding window has a windowDuration and a shorter slideDuration. Each new window starts every slideDuration, so a single row can land in multiple overlapping windows. This is the right shape when you want a moving average or a rolling count.
val df = Seq(
("2026-05-18 09:00:00", 10.0),
("2026-05-18 09:02:00", 12.0),
("2026-05-18 09:05:00", 15.0),
("2026-05-18 09:07:00", 11.0),
("2026-05-18 09:09:00", 9.0),
("2026-05-18 09:12:00", 14.0),
).toDF("event_time", "price")
.withColumn("event_time", to_timestamp(col("event_time")))
val df2 = df
.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.agg(avg(col("price")).as("avg_price"))
.orderBy("window")
df2.show(false)
// +------------------------------------------+---------+
// |window |avg_price|
// +------------------------------------------+---------+
// |{2026-05-18 08:55:00, 2026-05-18 09:05:00}|11.0 |
// |{2026-05-18 09:00:00, 2026-05-18 09:10:00}|11.4 |
// |{2026-05-18 09:05:00, 2026-05-18 09:15:00}|12.25 |
// |{2026-05-18 09:10:00, 2026-05-18 09:20:00}|14.0 |
// +------------------------------------------+---------+
The 10-minute window slides forward every 5 minutes. The row at 09:02:00 falls into both the 08:55 — 09:05 and the 09:00 — 09:10 windows, which is why the same row contributes to multiple averages.
Shifting Window Boundaries with startTime
The third overload accepts a startTime offset, which shifts every window boundary by that amount. This is useful when your reporting intervals don't align with the epoch — for instance, hourly windows that start at 5 minutes past the hour.
val df = Seq(
("2026-05-18 09:00:00", "click"),
("2026-05-18 09:14:30", "click"),
("2026-05-18 09:30:00", "click"),
("2026-05-18 09:44:30", "click"),
).toDF("event_time", "event_type")
.withColumn("event_time", to_timestamp(col("event_time")))
val df2 = df
.groupBy(window(col("event_time"), "15 minutes", "15 minutes", "2 minutes"))
.count()
.orderBy("window")
df2.show(false)
// +------------------------------------------+-----+
// |window |count|
// +------------------------------------------+-----+
// |{2026-05-18 08:47:00, 2026-05-18 09:02:00}|1 |
// |{2026-05-18 09:02:00, 2026-05-18 09:17:00}|1 |
// |{2026-05-18 09:17:00, 2026-05-18 09:32:00}|1 |
// |{2026-05-18 09:32:00, 2026-05-18 09:47:00}|1 |
// +------------------------------------------+-----+
The window boundaries now land on the 2-minute, 17-minute, 32-minute, and 47-minute marks instead of on the 15-minute grid. The slideDuration is required when you pass a startTime, so for non-sliding tumbling windows just pass the same value for both.
Session Windows with session_window
A session window is dynamic — instead of a fixed size, the window grows as long as consecutive events arrive within a gapDuration. When the gap is exceeded, the session closes and a new one starts on the next event. This matches how analytics tools model user sessions.
def session_window(timeColumn: Column, gapDuration: Column): Column
def session_window(timeColumn: Column, gapDuration: String): Column
The session_window function first appeared in version 3.2.0 and produces the same start/end struct shape as window. Group on a user identifier along with the session window to get per-user sessions:
val df = Seq(
("user_1", "2026-05-18 09:00:00"),
("user_1", "2026-05-18 09:02:30"),
("user_1", "2026-05-18 09:04:00"),
("user_1", "2026-05-18 09:20:00"),
("user_1", "2026-05-18 09:21:30"),
("user_2", "2026-05-18 09:01:00"),
("user_2", "2026-05-18 09:03:45"),
).toDF("user_id", "event_time")
.withColumn("event_time", to_timestamp(col("event_time")))
val df2 = df
.groupBy(col("user_id"), session_window(col("event_time"), "5 minutes"))
.count()
.orderBy("user_id", "session_window")
df2.show(false)
// +-------+------------------------------------------+-----+
// |user_id|session_window |count|
// +-------+------------------------------------------+-----+
// |user_1 |{2026-05-18 09:00:00, 2026-05-18 09:09:00}|3 |
// |user_1 |{2026-05-18 09:20:00, 2026-05-18 09:26:30}|2 |
// |user_2 |{2026-05-18 09:01:00, 2026-05-18 09:08:45}|2 |
// +-------+------------------------------------------+-----+
user_1 has two sessions because the gap between 09:04:00 and 09:20:00 exceeds the 5-minute threshold. Each session's end is the last event's timestamp plus the gap duration — that's why user_1's first session ends at 09:09:00 (09:04:00 + 5 minutes) rather than at the last event itself.
The Column-typed overload is handy when the gap depends on per-row state — for example, a longer gap for premium users — by passing a derived column instead of a constant.
Extracting the Window End with window_time
The window column is a struct, so you can pull out start and end with col("window.start") and col("window.end"). There's also a dedicated helper, window_time, which returns the event-time-equivalent end of the window (one microsecond before window.end).
def window_time(windowColumn: Column): Column
The window_time function first appeared in version 3.4.0. It's most useful in streaming pipelines where you want to chain a windowed aggregation into another time-based operation — the output column behaves like an event-time column and is recognized as such by Spark's watermark machinery.
val df = Seq(
("2026-05-18 09:00:00", 10.0),
("2026-05-18 09:03:00", 12.0),
("2026-05-18 09:06:00", 15.0),
("2026-05-18 09:11:00", 14.0),
).toDF("event_time", "price")
.withColumn("event_time", to_timestamp(col("event_time")))
val df2 = df
.groupBy(window(col("event_time"), "5 minutes"))
.agg(avg(col("price")).as("avg_price"))
.withColumn("window_end_time", window_time(col("window")))
.orderBy("window")
df2.show(false)
// +------------------------------------------+---------+--------------------------+
// |window |avg_price|window_end_time |
// +------------------------------------------+---------+--------------------------+
// |{2026-05-18 09:00:00, 2026-05-18 09:05:00}|11.0 |2026-05-18 09:04:59.999999|
// |{2026-05-18 09:05:00, 2026-05-18 09:10:00}|15.0 |2026-05-18 09:09:59.999999|
// |{2026-05-18 09:10:00, 2026-05-18 09:15:00}|14.0 |2026-05-18 09:14:59.999999|
// +------------------------------------------+---------+--------------------------+
The one-microsecond offset matters because window ends are exclusive — a window [09:00, 09:05) covers events up to but not including 09:05:00, so the last representable event time inside the window is 09:04:59.999999.
Related Functions
For row-level timestamp manipulation (rather than aggregation), see date_trunc, which rounds a timestamp down to the start of a time unit, and hour, minute, and second for extracting individual fields.