Job Board
Consulting

Spark Scala First and Last

first and last are aggregate functions that return the earliest or latest value in a group — but only when the data is ordered. They're most useful as window functions paired with an orderBy clause, where "first" and "last" actually mean something specific. first_value and last_value are SQL-only synonyms, and any_value returns an arbitrary value when you don't care which one you get.

Using first and last as window functions

Both functions take a column (by name or as a Column) and have an ignoreNulls overload:

def first(columnName: String): Column

def first(e: Column): Column

def first(columnName: String, ignoreNulls: Boolean): Column

def first(e: Column, ignoreNulls: Boolean): Column

def last(columnName: String): Column

def last(e: Column): Column

def last(columnName: String, ignoreNulls: Boolean): Column

def last(e: Column, ignoreNulls: Boolean): Column

Without an ordering, these functions are non-deterministic — Spark just picks whichever value it sees first or last while executing. To get meaningful results, use them as window functions over an ordered window. A common pattern is finding the opening and closing price for each ticker across a date range:

import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("AAPL", "2026-01-01", 185.50),
  ("AAPL", "2026-01-02", 188.20),
  ("AAPL", "2026-01-03", 190.10),
  ("AAPL", "2026-01-04", 187.80),
  ("AAPL", "2026-01-05", 192.40),
  ("GOOG", "2026-01-01", 140.10),
  ("GOOG", "2026-01-02", 142.50),
  ("GOOG", "2026-01-03", 141.80),
  ("GOOG", "2026-01-04", 144.20),
  ("GOOG", "2026-01-05", 145.00),
).toDF("ticker", "date", "price")

val window = Window
  .partitionBy("ticker")
  .orderBy("date")
  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

val df2 = df
  .withColumn("first_price", first(col("price")).over(window))
  .withColumn("last_price", last(col("price")).over(window))

df2.show(false)
// +------+----------+-----+-----------+----------+
// |ticker|date      |price|first_price|last_price|
// +------+----------+-----+-----------+----------+
// |AAPL  |2026-01-01|185.5|185.5      |192.4     |
// |AAPL  |2026-01-02|188.2|185.5      |192.4     |
// |AAPL  |2026-01-03|190.1|185.5      |192.4     |
// |AAPL  |2026-01-04|187.8|185.5      |192.4     |
// |AAPL  |2026-01-05|192.4|185.5      |192.4     |
// |GOOG  |2026-01-01|140.1|140.1      |145.0     |
// |GOOG  |2026-01-02|142.5|140.1      |145.0     |
// |GOOG  |2026-01-03|141.8|140.1      |145.0     |
// |GOOG  |2026-01-04|144.2|140.1      |145.0     |
// |GOOG  |2026-01-05|145.0|140.1      |145.0     |
// +------+----------+-----+-----------+----------+

The rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) frame is important. By default, an ordered window uses the frame RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, which makes last return the current row's value at every row — not what you want. Explicitly setting the frame to span the entire partition makes first and last return the actual earliest and latest values in each group.

Skipping nulls with ignoreNulls

The ignoreNulls overload tells the function to skip null values and return the next non-null one. Without it, first and last happily return null if that's what's at the boundary of the window:

import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("AAPL", "2026-01-01", None),
  ("AAPL", "2026-01-02", Some(188.20)),
  ("AAPL", "2026-01-03", Some(190.10)),
  ("AAPL", "2026-01-04", Some(187.80)),
  ("AAPL", "2026-01-05", None),
  ("GOOG", "2026-01-01", None),
  ("GOOG", "2026-01-02", Some(142.50)),
  ("GOOG", "2026-01-03", Some(141.80)),
  ("GOOG", "2026-01-04", None),
  ("GOOG", "2026-01-05", Some(145.00)),
).toDF("ticker", "date", "price")

val window = Window
  .partitionBy("ticker")
  .orderBy("date")
  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

val df2 = df
  .withColumn("first_default", first(col("price")).over(window))
  .withColumn("first_ignore_nulls", first(col("price"), true).over(window))
  .withColumn("last_default", last(col("price")).over(window))
  .withColumn("last_ignore_nulls", last(col("price"), true).over(window))

df2.show(false)
// +------+----------+-----+-------------+------------------+------------+-----------------+
// |ticker|date      |price|first_default|first_ignore_nulls|last_default|last_ignore_nulls|
// +------+----------+-----+-------------+------------------+------------+-----------------+
// |AAPL  |2026-01-01|null |null         |188.2             |null        |187.8            |
// |AAPL  |2026-01-02|188.2|null         |188.2             |null        |187.8            |
// |AAPL  |2026-01-03|190.1|null         |188.2             |null        |187.8            |
// |AAPL  |2026-01-04|187.8|null         |188.2             |null        |187.8            |
// |AAPL  |2026-01-05|null |null         |188.2             |null        |187.8            |
// |GOOG  |2026-01-01|null |null         |142.5             |145.0       |145.0            |
// |GOOG  |2026-01-02|142.5|null         |142.5             |145.0       |145.0            |
// |GOOG  |2026-01-03|141.8|null         |142.5             |145.0       |145.0            |
// |GOOG  |2026-01-04|null |null         |142.5             |145.0       |145.0            |
// |GOOG  |2026-01-05|145.0|null         |142.5             |145.0       |145.0            |
// +------+----------+-----+-------------+------------------+------------+-----------------+

For AAPL, the first and last rows both have null prices. With default behavior, first_default and last_default are both null. With ignoreNulls = true, Spark skips inward to find 188.2 as the first non-null and 187.8 as the last. GOOG has a non-null value at the end of its partition, so last_default and last_ignore_nulls agree there. This is the more useful behavior when you're filling forward or backward from sparse time-series data.

first_value and last_value via expr()

first_value and last_value are SQL aliases for first and last — they behave identically. They aren't exposed in org.apache.spark.sql.functions, so you call them through expr():

The first_value and last_value functions first appeared in version 2.0.0.

first_value(expr[, isIgnoreNull]) — via expr()

last_value(expr[, isIgnoreNull]) — via expr()

You'd reach for these when porting SQL code or when the SQL names read more naturally in your pipeline. The window mechanics are the same — explicitly set the frame to span the partition:

import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("Alice", "2026-01-01", 100),
  ("Alice", "2026-01-02", 150),
  ("Alice", "2026-01-03", 200),
  ("Bob",   "2026-01-01", 80),
  ("Bob",   "2026-01-02", 90),
  ("Bob",   "2026-01-03", 110),
).toDF("user", "date", "score")

val window = Window
  .partitionBy("user")
  .orderBy("date")
  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

val df2 = df
  .withColumn("first_score", expr("first_value(score)").over(window))
  .withColumn("last_score", expr("last_value(score)").over(window))

df2.show(false)
// +-----+----------+-----+-----------+----------+
// |user |date      |score|first_score|last_score|
// +-----+----------+-----+-----------+----------+
// |Alice|2026-01-01|100  |100        |200       |
// |Alice|2026-01-02|150  |100        |200       |
// |Alice|2026-01-03|200  |100        |200       |
// |Bob  |2026-01-01|80   |80         |110       |
// |Bob  |2026-01-02|90   |80         |110       |
// |Bob  |2026-01-03|110  |80         |110       |
// +-----+----------+-----+-----------+----------+

any_value when you don't care which one

any_value returns some value from each group — Spark won't guarantee which one. It exists so the query planner knows you've explicitly accepted a non-deterministic pick, rather than relying on the implicit behavior of first without ordering. The usual use case is selecting a column that's constant within each group: every row in a group has the same value, so any one of them is the right answer.

The any_value function first appeared in version 3.4.0.

any_value(expr[, isIgnoreNull]) — via expr()
val df = Seq(
  ("Alice", "Engineering", "Building A"),
  ("Bob",   "Engineering", "Building A"),
  ("Carol", "Engineering", "Building A"),
  ("Dave",  "Sales",       "Building B"),
  ("Eve",   "Sales",       "Building B"),
  ("Frank", "Marketing",   "Building C"),
).toDF("employee", "department", "building")

val df2 = df
  .groupBy("department")
  .agg(
    count("*").as("headcount"),
    expr("any_value(building)").as("building"),
  )
  .orderBy("department")

df2.show(false)
// +-----------+---------+----------+
// |department |headcount|building  |
// +-----------+---------+----------+
// |Engineering|3        |Building A|
// |Marketing  |1        |Building C|
// |Sales      |2        |Building B|
// +-----------+---------+----------+

Because every employee in a department works in the same building, picking "any value" is safe. Without any_value, you'd need to wrap building in min or max (or add it to the groupBy) just to satisfy Spark's requirement that non-grouped columns be aggregated. any_value makes the intent explicit and reads cleanly.

For the row associated with the smallest or largest value of an ordering column, see min_by and max_by — they're the right tool when you want "the employee with the highest salary" rather than just "the first or last employee in date order". For column-wise extremes without ordering, see min and max.

Example Details

Created: 2026-06-11 10:28:44 PM

Last Updated: 2026-06-11 10:28:44 PM