Job Board
Consulting

Why Your Spark Scala Join Doubled Your Row Count

You join two DataFrames on what you thought was a primary key, expect the same number of rows you started with, and instead get more. This isn't a bug in Spark — it's how SQL joins are defined. Every row on the left gets matched to every matching row on the right, and if either side has duplicate keys, the output multiplies.

The Surprising Doubling

Here's a setup that looks completely innocent. You have an orders DataFrame and a customers DataFrame, and you want to attach customer names to each order.

val orders = Seq(
  ("O001", "C100", 49.99),
  ("O002", "C101", 24.50),
  ("O003", "C100", 89.00),
  ("O004", "C102", 12.00),
).toDF("order_id", "customer_id", "amount")

val customers = Seq(
  ("C100", "Alice",   "2024-01-15"),
  ("C100", "Alice",   "2024-07-01"),
  ("C101", "Bob",     "2024-02-10"),
  ("C102", "Charlie", "2024-03-05"),
).toDF("customer_id", "name", "as_of_date")

println(s"orders row count:    ${orders.count()}")
println(s"customers row count: ${customers.count()}")

val joined = orders.join(customers, Seq("customer_id"), "inner")
println(s"joined row count:    ${joined.count()}")

joined.orderBy("order_id", "as_of_date").show(false)
// orders row count:    4
// customers row count: 4
// joined row count:    6
// +-----------+--------+------+-------+----------+
// |customer_id|order_id|amount|name   |as_of_date|
// +-----------+--------+------+-------+----------+
// |C100       |O001    |49.99 |Alice  |2024-01-15|
// |C100       |O001    |49.99 |Alice  |2024-07-01|
// |C101       |O002    |24.5  |Bob    |2024-02-10|
// |C100       |O003    |89.0  |Alice  |2024-01-15|
// |C100       |O003    |89.0  |Alice  |2024-07-01|
// |C102       |O004    |12.0  |Charlie|2024-03-05|
// +-----------+--------+------+-------+----------+

There are 4 orders. The joined result has 6 rows. Orders O001 and O003 each appear twice — once with as_of_date 2024-01-15 and once with 2024-07-01. If you were computing total revenue from this join, you'd double-count Alice's spending.

The cause is right there in the customers DataFrame: C100 appears twice. The customer_id looks like a primary key, but it isn't — there are two rows for Alice, presumably because somebody updated her address and kept the history. An inner join with two matching rows on the right produces two output rows for every order with that customer.

Why This Happens

An SQL join, by definition, returns every combination of left rows and right rows where the join condition is true. If a left row has n matching rows on the right, it appears n times in the output. Spark isn't doing anything clever or wrong — it's doing exactly what the join definition requires.

The trap is that the word "primary key" doesn't exist in DataFrames. Nothing about customer_id declares it unique. Spark can't warn you because, from its point of view, you might want the multiplication — that's what a many-to-many join is for. The only thing that decides whether a join preserves row counts is whether the key is unique on the right side, and that's a property of your data, not your code.

There are two common shapes this takes in practice:

  1. A "primary key" that isn't. A lookup table that secretly contains history — slowly-changing-dimension rows, soft-deleted rows, multiple versions per entity. The key is unique per logical entity, but the table has multiple rows per entity.
  2. Many-to-many fan-out. Both sides have multiple rows per key, and the join produces the cross product of every pair. We'll see this at the end.

Diagnosing: Find the Duplicates

Before you change any code, confirm the cause. Group by the join key on the side you suspect, count occurrences, and filter to keys that appear more than once:

val dupes = customers.groupBy("customer_id")
  .count()
  .filter(col("count") > 1)

dupes.show(false)
// +-----------+-----+
// |customer_id|count|
// +-----------+-----+
// |C100       |2    |
// +-----------+-----+

If this result is empty, the duplicates are on the other side — run the same check against orders. If both sides have duplicate keys, you've got a many-to-many situation and you need to decide whether the multiplication is what you actually want.

A useful sanity check before any join: compare customers.count() to customers.select("customer_id").distinct.count(). If they don't match, the join key isn't unique. This is cheap and worth doing as a guard in production pipelines on any table you treat as a lookup.

The Fix: Reduce the Right Side to One Row Per Key

Once you know the duplicate keys are an artifact of history, the fix is to pick the row you want and throw the rest away before joining. A row_number() window over the partition key lets you keep "the latest", "the first", or any other rule:

import org.apache.spark.sql.expressions.Window

val latest = Window
  .partitionBy("customer_id")
  .orderBy(col("as_of_date").desc)

val currentCustomers = customers
  .withColumn("rn", row_number().over(latest))
  .filter(col("rn") === 1)
  .drop("rn")

currentCustomers.orderBy("customer_id").show(false)
// +-----------+-------+----------+
// |customer_id|name   |as_of_date|
// +-----------+-------+----------+
// |C100       |Alice  |2024-07-01|
// |C101       |Bob    |2024-02-10|
// |C102       |Charlie|2024-03-05|
// +-----------+-------+----------+

val joined = orders.join(currentCustomers, Seq("customer_id"), "inner")
println(s"joined row count: ${joined.count()}")
joined.orderBy("order_id").show(false)
// joined row count: 4
// +-----------+--------+------+-------+----------+
// |customer_id|order_id|amount|name   |as_of_date|
// +-----------+--------+------+-------+----------+
// |C100       |O001    |49.99 |Alice  |2024-07-01|
// |C101       |O002    |24.5  |Bob    |2024-02-10|
// |C100       |O003    |89.0  |Alice  |2024-07-01|
// |C102       |O004    |12.0  |Charlie|2024-03-05|
// +-----------+--------+------+-------+----------+

currentCustomers has exactly one row per customer_id — the most recent one — and the join now returns the 4 rows we expected from the start.

A note on dropDuplicates(Seq("customer_id")): it does collapse the table to one row per key, but it doesn't let you choose which row to keep. The row that survives is whichever one Spark sees first, which depends on partitioning and is effectively non-deterministic across runs. Reach for dropDuplicates only when the duplicates are byte-for-byte identical. When the duplicates differ — different addresses, different timestamps, different statuses — use row_number() so the choice is deterministic and explicit.

The Other Common Shape: Many-to-Many Fan-Out

The second way a join blows up your row count is when both sides have multiple rows per key. The join then returns the cross product of all matching pairs:

val orders = Seq(
  ("O001", "C100", 49.99),
  ("O002", "C100", 18.25),
).toDF("order_id", "customer_id", "amount")

val tickets = Seq(
  ("T001", "C100", "shipping"),
  ("T002", "C100", "billing"),
).toDF("ticket_id", "customer_id", "issue")

val joined = orders.join(tickets, Seq("customer_id"), "inner")

joined.orderBy("order_id", "ticket_id").show(false)
// orders:  2
// tickets: 2
// joined:  4
// +-----------+--------+------+---------+--------+
// |customer_id|order_id|amount|ticket_id|issue   |
// +-----------+--------+------+---------+--------+
// |C100       |O001    |49.99 |T001     |shipping|
// |C100       |O001    |49.99 |T002     |billing |
// |C100       |O002    |18.25 |T001     |shipping|
// |C100       |O002    |18.25 |T002     |billing |
// +-----------+--------+------+---------+--------+

Two orders for C100, two tickets for C100, four rows out. The output is correct as a join — every order really is associated with every ticket via the shared customer — but it's almost never what you actually want. If you were summing amount after this join, you'd get (49.99 + 18.25) × 2 = 136.48 instead of the actual 68.24.

The fix here isn't dedup — both orders and both tickets are real, distinct entities. The fix is to not join them directly. If you need order-level data with ticket counts, aggregate tickets to one row per customer first (tickets.groupBy("customer_id").count()) and then join. The general rule: if you're joining two fact tables on a shared dimension, aggregate one of them down to one row per join key before the join.

A Checklist for Joins That Blow Up

When a join result is bigger than expected, work through this:

  1. Confirm the doubling is real. Compare joined.count() to left.count(). If they're equal, you have a different problem.
  2. Find the side with duplicates. Run df.groupBy(joinKey).count().filter($"count" > 1) on each side.
  3. Decide what to do with them. If the duplicates are byte-identical, dropDuplicates(Seq(joinKey)) is fine. If they differ, use a row_number() window and pick the row you actually want.
  4. If both sides have legitimate duplicates, you're not looking at a bug — you're looking at a many-to-many join. Aggregate one side first.

For a related pattern that uses joins to do something useful with two snapshots, see comparing two DataFrames to find added, removed, and changed rows — it uses anti-joins and full outer joins on a guaranteed-unique key, which is exactly the kind of join that won't blow up. And if your join condition involves nulls, how === and =!= handle nulls is worth reading first, since null-on-null comparisons don't behave the way most people expect.

The rule of thumb: joins don't preserve row counts unless the join key is unique on the side you're joining to. Treat that as something you verify, not something you assume.

Tutorial Details

Created: 2026-05-27 10:23:50 PM

Last Updated: 2026-05-27 10:23:50 PM