Job Board
Consulting

Comparing Two DataFrames to Find Added, Removed, and Changed Rows in Spark Scala

Diffing two DataFrames — yesterday's snapshot against today's, or the output of a pipeline against an expected baseline — is one of the most useful patterns in Spark Scala. This tutorial walks through three approaches: exceptAll for full-row diffs, anti-joins for keyed adds and removes, and a full_outer join that classifies every row as ADDED, REMOVED, CHANGED, or UNCHANGED.

The Two Snapshots

Throughout this tutorial we'll diff two snapshots of a tiny product catalog. The before snapshot is yesterday's data; after is today's. Between the two, one product was removed, one was added, and one had its price drop.

val before = Seq(
  ("P001", "Wireless Mouse",      24.99),
  ("P002", "Mechanical Keyboard", 89.00),
  ("P003", "USB-C Hub",           34.50),
  ("P004", "Laptop Stand",        42.00),
).toDF("sku", "name", "price")

val after = Seq(
  ("P001", "Wireless Mouse",      24.99),
  ("P002", "Mechanical Keyboard", 79.00),
  ("P004", "Laptop Stand",        42.00),
  ("P005", "Webcam",              59.00),
).toDF("sku", "name", "price")

before.show(false)
// +----+-------------------+-----+
// |sku |name               |price|
// +----+-------------------+-----+
// |P001|Wireless Mouse     |24.99|
// |P002|Mechanical Keyboard|89.0 |
// |P003|USB-C Hub          |34.5 |
// |P004|Laptop Stand       |42.0 |
// +----+-------------------+-----+

after.show(false)
// +----+-------------------+-----+
// |sku |name               |price|
// +----+-------------------+-----+
// |P001|Wireless Mouse     |24.99|
// |P002|Mechanical Keyboard|79.0 |
// |P004|Laptop Stand       |42.0 |
// |P005|Webcam             |59.0 |
// +----+-------------------+-----+

The diff we want to find is:

  • P003 was removed
  • P005 was added
  • P002 changed (price 89.0079.00)
  • P001 and P004 are unchanged

If you're new to creating small DataFrames inline like this, the toDF tutorial covers it in depth.

Approach 1: Full-Row Diff with exceptAll

The simplest diff doesn't care about keys — it just asks "which rows are in one DataFrame but not the other?" That's exactly what exceptAll does. It returns every row in the left DataFrame that doesn't appear in the right.

val onlyInAfter  = after.exceptAll(before)
val onlyInBefore = before.exceptAll(after)

onlyInAfter.show(false)
// +----+-------------------+-----+
// |sku |name               |price|
// +----+-------------------+-----+
// |P005|Webcam             |59.0 |
// |P002|Mechanical Keyboard|79.0 |
// +----+-------------------+-----+

onlyInBefore.show(false)
// +----+-------------------+-----+
// |sku |name               |price|
// +----+-------------------+-----+
// |P003|USB-C Hub          |34.5 |
// |P002|Mechanical Keyboard|89.0 |
// +----+-------------------+-----+

exceptAll compares whole rows. P002 shows up in both result sets — the old price row is in before but not after, and the new price row is in after but not before. That's accurate but not always what you want: a single logical "changed" record is split across two outputs, and you have to reconcile them yourself if you want to label it as a change rather than a remove-plus-add.

A few things worth knowing about exceptAll:

  • Both DataFrames must have the same schema (column names and types). If they don't, you'll get an AnalysisException.
  • exceptAll preserves duplicates. If a row appears three times in before and once in after, before.exceptAll(after) includes it twice. There's also a plain except that deduplicates — use exceptAll when you care about row counts.
  • Nulls compare as equal here, which is different from how === behaves. exceptAll uses set-style equality, so a row with a null matches another row with the same null in the same column.

Use this approach when you don't have a stable primary key, or when you genuinely want a row-level diff (e.g. comparing pipeline output against a golden test fixture).

Approach 2: Added and Removed via Anti-Joins

When your data has a primary key — a sku, an id, a user_id — you usually want to talk about adds and removes in terms of that key, not whole rows. A row whose key disappeared is "removed" even if you could match it row-for-row with something in the new snapshot. A left_anti join is the right tool: it returns rows from the left DataFrame that have no matching key in the right.

val added   = after.join(before, Seq("sku"), "left_anti")
val removed = before.join(after,  Seq("sku"), "left_anti")

added.show(false)
// +----+------+-----+
// |sku |name  |price|
// +----+------+-----+
// |P005|Webcam|59.0 |
// +----+------+-----+

removed.show(false)
// +----+---------+-----+
// |sku |name     |price|
// +----+---------+-----+
// |P003|USB-C Hub|34.5 |
// +----+---------+-----+

Notice that P002 is no longer in either result — its key exists in both snapshots, so it's neither added nor removed. The price change still needs to be detected separately, which is what the next section does.

Anti-joins are cheap and produce clean results: added only contains rows whose sku is new, and removed only contains rows whose sku is gone. The schema is whatever the left DataFrame's schema was — the right side is only used to filter the left.

Approach 3: Changed Rows via Inner Join

For rows whose key exists in both DataFrames, you join on the key and compare the non-key columns. The cleanest way to do this is to alias each DataFrame and reference columns through the alias.

val b = before.as("b")
val a = after.as("a")

val changed = b.join(a, Seq("sku"), "inner")
  .where(
    !(col("b.name")  <=> col("a.name")) ||
    !(col("b.price") <=> col("a.price"))
  )
  .select(
    col("sku"),
    col("b.name").as("name_before"),
    col("a.name").as("name_after"),
    col("b.price").as("price_before"),
    col("a.price").as("price_after"),
  )

changed.show(false)
// +----+-------------------+-------------------+------------+-----------+
// |sku |name_before        |name_after         |price_before|price_after|
// +----+-------------------+-------------------+------------+-----------+
// |P002|Mechanical Keyboard|Mechanical Keyboard|89.0        |79.0       |
// +----+-------------------+-------------------+------------+-----------+

There's a subtlety in the where clause that's easy to miss: it uses <=> (null-safe equality) instead of ===. If a value goes from null to "something" or vice versa, === returns null for that comparison and the row gets silently dropped from the result. <=> treats null <=> null as true and null <=> "x" as false, so changes that involve nulls are detected correctly. If you're not familiar with this gotcha, the null comparison tutorial covers it.

The output keeps both old and new values for every column you compared, which is exactly what you want for a change report — you can see what flipped without having to look back at the source data.

Putting It All Together: A Unified Diff

Most of the time you don't want three separate result DataFrames — you want one diff with a column that classifies each row. A full_outer join gives you rows from both sides; from there, withColumn plus a when/otherwise chain classifies each row based on which side has nulls and which non-key columns differ.

val joined = before.as("b")
  .join(after.as("a"), Seq("sku"), "full_outer")

val diff = joined
  .withColumn("change_type",
    when(col("b.name").isNull, lit("ADDED"))
    .when(col("a.name").isNull, lit("REMOVED"))
    .when(
      !(col("b.name")  <=> col("a.name")) ||
      !(col("b.price") <=> col("a.price")),
      lit("CHANGED"),
    )
    .otherwise(lit("UNCHANGED"))
  )
  .select(
    col("sku"),
    col("change_type"),
    col("b.name").as("name_before"),
    col("a.name").as("name_after"),
    col("b.price").as("price_before"),
    col("a.price").as("price_after"),
  )

diff.filter(col("change_type") =!= "UNCHANGED")
  .orderBy("sku")
  .show(false)
// +----+-----------+-------------------+-------------------+------------+-----------+
// |sku |change_type|name_before        |name_after         |price_before|price_after|
// +----+-----------+-------------------+-------------------+------------+-----------+
// |P002|CHANGED    |Mechanical Keyboard|Mechanical Keyboard|89.0        |79.0       |
// |P003|REMOVED    |USB-C Hub          |null               |34.5        |null       |
// |P005|ADDED      |null               |Webcam             |null        |59.0       |
// +----+-----------+-------------------+-------------------+------------+-----------+

The change_type chain reads top-to-bottom and the order matters:

  1. If everything from the before side is null, the row only exists in afterADDED.
  2. Otherwise, if everything from the after side is null, the row only exists in beforeREMOVED.
  3. Otherwise, if any non-key column differs (using <=> to handle nulls correctly), the row → CHANGED.
  4. Everything else → UNCHANGED.

In the example we filter out UNCHANGED rows for display, but in a real pipeline you might keep them — they're useful for sanity-checking that the diff includes every key that existed in either snapshot. The total row count after the full outer join is the size of the union of keys, so it's a quick way to validate that nothing slipped through the cracks.

Scaling to Many Columns

The patterns above hard-code each non-key column name into the comparison. That's fine for a handful of columns; with thirty or fifty it gets unwieldy. The fix is to derive the comparison expression from the schema.

val keyCols = Seq("sku")
val compareCols = before.columns.filterNot(keyCols.contains)

val changedExpr = compareCols
  .map(c => !(col(s"b.$c") <=> col(s"a.$c")))
  .reduce(_ || _)

val diff = before.as("b")
  .join(after.as("a"), keyCols, "full_outer")
  .withColumn("change_type",
    when(col("b." + compareCols.head).isNull && col("a." + compareCols.head).isNotNull, lit("ADDED"))
    .when(col("a." + compareCols.head).isNull && col("b." + compareCols.head).isNotNull, lit("REMOVED"))
    .when(changedExpr, lit("CHANGED"))
    .otherwise(lit("UNCHANGED"))
  )

Two things changed. First, changedExpr is built by mapping every non-key column to a <=> comparison and OR-ing them all together with reduce. Adding or removing columns is now a schema concern, not a code change. Second, the ADDED/REMOVED checks pick one representative column to test for null — any non-key column works, since the full outer join nulls them out as a block.

This pattern composes well with reading the column list from config, a contract file, or schema metadata. If a column is added on both sides, just include it in compareCols and the diff picks up the new comparison automatically.

Which Approach to Use

Situation Approach
No stable key, want raw row-level diff exceptAll
Comparing pipeline output to a golden fixture in tests exceptAll
Have a key, only need to know what was added/removed Anti-joins
Have a key, need to know what changed and how Full outer join with change_type
Lots of columns, schema may evolve Full outer join with derived expressions

The full-outer-join-with-change_type pattern is the workhorse — once you have it set up, it answers every question about the diff in one query, and the change_type column is easy to filter, group by, or aggregate on. The other two approaches are simpler when you only need part of that information.

One last tip: when you're building a diff for ad-hoc investigation rather than production, groupBy("change_type").count() is a fast way to sanity-check the size of each bucket before drilling into the rows themselves.

Tutorial Details

Created: 2026-05-13 10:39:54 PM

Last Updated: 2026-05-13 10:39:54 PM