Job Board
Consulting

SQL Pipe Syntax in Spark 4.0: Writing More Readable Queries

Spark 4.0 added the |> pipe operator to SQL (SPARK-49555), letting you chain transformations top-to-bottom instead of nesting subqueries. The mental model is the one Scala devs already use for DataFrame method chains — df.filter(...).groupBy(...).agg(...) — applied directly to SQL.

For the broader release picture, see What's New in Spark 4.0 for Scala Developers.


The Problem It Solves

Vanilla SQL forces you to write clauses in an order that has almost nothing to do with the order the engine executes them. You write SELECT first, but it logically runs last — after FROM, JOIN, WHERE, GROUP BY, and HAVING. Anyone who has written a query and then realized they need a filter on a computed aggregate has felt this. The fix is a subquery, a CTE, or a HAVING clause — three different mechanisms for what is really one idea: "now do another step."

DataFrame chains in Scala don't have this problem because the operators read in the order they execute:

spark.table("transactions")
  .filter($"amount" > 0)
  .groupBy($"category")
  .agg(sum($"amount").as("total"))
  .filter($"total" > 1000)
  .orderBy($"total".desc)

Pipe syntax brings the same top-to-bottom flow to SQL. Each |> is a stage. The input of each stage is the output of the previous one. There are no subqueries, no HAVING, no inverted ordering.

spark.sql("""
  FROM transactions
  |> WHERE amount > 0
  |> AGGREGATE SUM(amount) AS total GROUP BY category
  |> WHERE total > 1000
  |> ORDER BY total DESC
""")

Same query, same plan, dramatically easier to read top to bottom.


The Canonical Starting Point: FROM

Pipe syntax queries start with a FROM <table> clause that returns the whole table unmodified. This is now a valid standalone query in Spark 4.0 — FROM customers returns all rows of customers, equivalent to SELECT * FROM customers or TABLE customers. From there you append operators with |>.

spark.sql("FROM customers")
  // equivalent to:
spark.sql("SELECT * FROM customers")

You can also pipe operators onto any existing SQL query — pipe syntax interleaves with classic syntax, so you can migrate one query at a time without touching the others.


The Operator Set

The pipe vocabulary covers the same transformations the DataFrame API does. Here's the mapping a Scala developer can hold in their head:

Pipe operator DataFrame equivalent
SELECT .select(...)
EXTEND .withColumn(...)
SET .withColumn(name, expr) (replacing an existing column)
DROP .drop(...)
WHERE .filter(...)
AGGREGATE ... GROUP BY .groupBy(...).agg(...)
JOIN .join(...)
ORDER BY .orderBy(...)
LIMIT / OFFSET .limit(...) / .offset(...)
UNION / INTERSECT / EXCEPT .union(...) / .intersect(...) / .except(...)
PIVOT / UNPIVOT .groupBy(...).pivot(...) / stack via selectExpr
TABLESAMPLE .sample(...)
AS .alias(...)

Operators can appear in any order, any number of times. There is no HAVING, no QUALIFY, no separate placement rules — a WHERE after an AGGREGATE does what HAVING does, and a WHERE after a window-function SELECT does what QUALIFY does.


SELECT vs EXTEND vs SET

These three are easy to confuse but each maps cleanly to a DataFrame idiom.

SELECT evaluates expressions and replaces the row shape entirely, just like .select(...):

spark.sql("""
  FROM transactions
  |> SELECT user_id, amount * 0.9 AS discounted
""")

EXTEND appends new columns while keeping the existing ones, just like .withColumn(...):

spark.sql("""
  FROM transactions
  |> EXTEND amount * 0.9 AS discounted
""")
// All original columns + discounted

SET replaces a named column in place, similar to .withColumn("name", newExpr) where name already exists:

spark.sql("""
  FROM transactions
  |> SET amount = amount * 0.9
""")

If you've written DataFrame code and reached for .withColumn versus .select based on whether you wanted to keep the other columns, that same intuition applies here: EXTEND to add, SELECT to project down, SET to overwrite.

One catch: aggregate functions are not allowed in |> SELECT. Use |> AGGREGATE instead. This is a deliberate split — the regular-SQL SELECT overloads both projection and aggregation depending on whether GROUP BY is present, and pipe syntax separates those into two operators.


AGGREGATE: One Operator for Group-By and Reduce

In classic SQL, GROUP BY is a clause tacked onto SELECT. In pipe syntax it's part of the AGGREGATE operator.

Full-table aggregation:

spark.sql("""
  FROM transactions
  |> AGGREGATE COUNT(*) AS rows, SUM(amount) AS total
""")
// One row: (rows, total)

Group-by aggregation:

spark.sql("""
  FROM transactions
  |> AGGREGATE SUM(amount) AS total GROUP BY category
""")
// One row per category, columns: (category, total)

The output column order is grouping columns first, then aggregates — the same convention as Dataset.groupBy(...).agg(...). You can pass ordinal references to GROUP BY (GROUP BY 1, 2) to group by the first and second input columns, which is a small but real ergonomic improvement when columns get renamed in upstream stages.

The WHERE vs HAVING distinction also disappears. Filtering after an AGGREGATE is just another |> WHERE:

spark.sql("""
  FROM transactions
  |> AGGREGATE SUM(amount) AS total GROUP BY category
  |> WHERE total > 1000
""")

JOIN: The AS Alias Trick

Joins work on the piped input table as the left side. The right side is the table or subquery you write after JOIN. The pattern that comes up constantly is aliasing the piped input so you can disambiguate columns in the ON clause — that's what the standalone |> AS operator is for.

spark.sql("""
  FROM transactions
  |> AS t
  |> LEFT JOIN customers AS c ON t.user_id = c.id
  |> SELECT c.name, t.amount
""")

All the usual join modifiers work: LEFT, RIGHT, FULL, CROSS, SEMI, ANTI, NATURAL, LATERAL, plus USING(col) as an alternative to ON.


A Realistic Example: TPC-H Query 13

Here's a textbook case. TPC-H Query 13 counts the distribution of order counts per customer — a two-level aggregation that classic SQL handles with a nested subquery.

// Classic SQL — read inside-out
spark.sql("""
  SELECT c_count, COUNT(*) AS custdist
  FROM (
    SELECT c_custkey, COUNT(o_orderkey) AS c_count
    FROM customer
    LEFT OUTER JOIN orders
      ON c_custkey = o_custkey
      AND o_comment NOT LIKE '%unusual%packages%'
    GROUP BY c_custkey
  ) AS c_orders
  GROUP BY c_count
  ORDER BY custdist DESC, c_count DESC
""")

// Pipe syntax — read top-to-bottom
spark.sql("""
  FROM customer
  |> LEFT OUTER JOIN orders
       ON c_custkey = o_custkey
       AND o_comment NOT LIKE '%unusual%packages%'
  |> AGGREGATE COUNT(o_orderkey) AS c_count GROUP BY c_custkey
  |> AGGREGATE COUNT(*) AS custdist GROUP BY c_count
  |> ORDER BY custdist DESC, c_count DESC
""")

The second version reads as a recipe: start with customers, left-join orders, count orders per customer, count customers per order-count, sort. The first version requires you to find the innermost SELECT, parse it, then walk outward. Same plan, same performance, dramatically different mental load.


When You'd Actually Use This from Scala

The DataFrame API already gives you fluent, top-to-bottom transformations. So when does pipe syntax earn its keep in a Scala codebase?

SQL strings you already maintain. If you have a body of spark.sql("...") calls — embedded reports, migration scripts, queries pasted from analysts — pipe syntax is a drop-in readability upgrade. You don't have to rewrite them as DataFrames.

Hybrid teams. When the same query needs to be readable by SQL-first data analysts and Scala-first engineers, pipe syntax narrows the gap. The pipeline shape matches what the engineer would write in DataFrames; the syntax matches what the analyst writes in SQL.

Complex queries with multiple aggregation stages. Two-level group-bys, window functions followed by filters, joins that feed into aggregates — these are where classic SQL nesting hurts most and pipe syntax pays off most.

SQL files in version control. A pipe-syntax .sql file diffs cleanly when you add a stage. Inserting a filter step is a single new line, not a re-indent of an entire subquery.

If your codebase is mostly DataFrames already, you probably won't reach for pipe syntax much. But for the SQL you do write, it's a clear win.


Limitations and Things to Know

  • Aggregates are not allowed in |> SELECT. Use |> AGGREGATE instead. This catches people coming from classic SQL.
  • Spark 4.0 minimum. Pipe syntax is gated on Spark 4.0 (SPARK-49555). If you're still on Spark 3.x, this is one more reason to plan the upgrade.
  • Window functions go in |> SELECT, not a separate operator. Use a window-function expression with OVER inside SELECT, then |> WHERE for what would have been QUALIFY.
  • The full operator list is documented at spark.apache.org/docs/4.0.0/sql-pipe-syntax.html. It's a short, readable page worth bookmarking.
  • It's fully interoperable with classic SQL. You can pipe onto any classic query, use either syntax inside a subquery, and mix them freely across statements in the same session. There's no global "pipe mode" to enable.

The Bottom Line

Pipe syntax doesn't add expressive power. Every pipe query has an exact classic-SQL equivalent. What it adds is readability for multi-stage transformations — the same readability you get from chaining DataFrame methods in Scala. If you're a Scala developer who occasionally writes complex SQL strings, this is the cleanest quality-of-life improvement in Spark 4.0's SQL surface. Try it on the next query that would otherwise need three nested subqueries.

For the full syntax reference, see the Spark 4.0 SQL pipe syntax documentation and the Databricks announcement post that introduced the design.

Article Details

Created: 2026-06-06

Last Updated: 2026-06-06 10:43:13 PM