Array Union Spark Scala Example
The array_union
function in Spark Scala takes two arrays as input and returns a new array containing all unique elements from the input arrays, removing any duplicates. When one or more of the arrays are null the entire result is null.
Array Union Definition
The array_union
function first appeared in version 2.4.0
and as of Spark 3.4.1 it is defined as:
def array_union(col1: Column, col2: Column): Column
To see array_union
in action let's define a simple DataFrame with two array columns:
val df = Seq(
(Some(Array("AAA", "BBB")), Some(Array("BBB", "CCC"))),
(None, Some(Array("BBB", "CCC"))),
(Some(Array("AAA", "BBB")), None),
).toDF("arr1", "arr2")
Now let's create a new column called 'result' that we will define as the array_union
of columns arr1
and arr2
.
val df2 = df.withColumn("result", array_union(col("arr1"), col("arr2")))
Let's pull this all together and throw some show's
in there to see how array_union
works:
val df = Seq(
(Some(Array("AAA", "BBB")), Some(Array("BBB", "CCC"))),
(None, Some(Array("BBB", "CCC"))),
(Some(Array("AAA", "BBB")), None),
).toDF("arr1", "arr2")
val df2 = df.withColumn("result", array_union(col("arr1"), col("arr2")))
df.show()
// +----------+-----------+
// | arr1| arr2|
// +----------+-----------+
// |[AAA, BBB]| [BBB, CCC]|
// | null| [BBB, CCC]|
// |[AAA, BBB]| null|
// +----------+-----------+
df2.show()
// +----------+-----------+---------------+
// | arr1| arr2| result|
// +----------+-----------+---------------+
// |[AAA, BBB]| [BBB, CCC]|[AAA, BBB, CCC]|
// | null| [BBB, CCC]| null|
// |[AAA, BBB]| null| null|
// +----------+-----------+---------------+
As you can see when one of the arrays in the call to array_union
is null
the entire result is null. This is typical of many spark functions. However, it is often desireable to have the non-null column get returned. An easy pattern to handle this is to use a spark scala when case statement function.
The approach is that when the value for an array is null
we will replace it with an empty array to allow the union to function correctly.
val df2 = df
.withColumn("tmp_arr1", when(col("arr1").isNull, Array.empty[String]).otherwise(col("arr1")))
.withColumn("tmp_arr2", when(col("arr2").isNull, Array.empty[String]).otherwise(col("arr2")))
.withColumn("result", array_union(col("tmp_arr1"), col("tmp_arr2")))
.drop("tmp_arr1", "tmp_arr2")
df2.show()
// +----------+----------+---------------+
// | arr1| arr2| result|
// +----------+----------+---------------+
// |[AAA, BBB]|[BBB, CCC]|[AAA, BBB, CCC]|
// | null|[BBB, CCC]| [BBB, CCC]|
// |[AAA, BBB]| null| [AAA, BBB]|
// +----------+----------+---------------+
Another common pattern is to union more than two array's together. The array_union
function only allows for two input columns. One easy approach to handle the columns is to chain the calls together as follows:
val df = Seq(
(Some(Array("AAA", "BBB")), Some(Array("BBB", "CCC")), Some(Array("DDD"))),
(None, Some(Array("BBB", "CCC")), Some(Array("DDD", "EEE"))),
(Some(Array("AAA", "BBB")), None, Some(Array("DDD", "EEE"))),
).toDF("arr1", "arr2", "arr3")
val df2 = df
.withColumn("result", array_union(col("arr1"), array_union(col("arr2"), col("arr3"))))
df2.show()
// +----------+----------+----------+--------------------+
// | arr1| arr2| arr3| result|
// +----------+----------+----------+--------------------+
// |[AAA, BBB]|[BBB, CCC]| [DDD]|[AAA, BBB, CCC, DDD]|
// | null|[BBB, CCC]|[DDD, EEE]| null|
// |[AAA, BBB]| null|[DDD, EEE]| null|
// +----------+----------+----------+--------------------+
With these examples one can see that the array_union
spark scala function is a useful tool in the data engineering tool box.