Job Board
Consulting

Using the toDF Spark Implict For Easier Testing

When testing your data engineering etl pipelines it can be a real help to quickly create simple DataFrames with the data scenarios you are transforming. Also, when you encounter problems in production that were unexpected, quickly creating test cases that account for that new situation are also highly beneficial. Thankfully the Spark Scala toDF function found in the implicits library can assist with this.

First you will need to include the implicits library on the spark session instance. The spark session is typically already available for you within a notebook environment. However in a compiled jar project you will need to pay particular attention to this.

A common technique to easily access a spark session within your jar classes is to lazily load it via a trait.

For example consider a file that defines the trait as follows:

package sparkingscala

import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {

  lazy val spark: SparkSession = {
    val s = SparkSession.builder().master("local").appName("sparkingscala").getOrCreate()
    
    // Configure the Session Here
    s.sparkContext.setLogLevel("WARN")
    
    s
  }
  
}

Now when you define a class that is used as part of your etl program, you extend the class with the trait. Then you can include the spark implicits library succesfully on the lazy loaded session:

package sparkingscala

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object Demo extends SparkSessionWrapper {
  import spark.implicits._

  def someDataETLTransform()(df: DataFrame) = {
    // do transform logic here
    df
  }
}

Within your test cases this will look something along the lines of this – using the utest library:

package sparkingscala

import utest._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object SomeTest extends TestSuite with SparkSessionWrapper {
  import spark.implicits._

  val tests = Tests {
    "awesome test case" - {
      // Setup your test dataframes using DF

      // Run your transformation logic

      // Assert your new dataframe is as you expect
    }
  }
}

Now that we have the spark.implicits setup we can start using the implicit toDF to make some test data frames.

val df = Seq(
  "Hello, world!",
  "Greetings, Universe!",
  "Salutations, Earthlings!",
).toDF()

println("-------------- Schema --------------")
df.printSchema()

// root
//  |-- value: string (nullable = true)

println("-------------- Data --------------")
df.show(false)

// +------------------------+
// |value                   |
// +------------------------+
// |Hello, world!           |
// |Greetings, Universe!    |
// |Salutations, Earthlings!|
// +------------------------+

As you can see, by leveraging a scala sequence type we can quickly create dataframes within our code. However, this is just a simple dataframe with a single column. We can add more columns by creating a sequence of scala tuples.

val df = Seq(
  ("Hello, world!", "Farewell"),
  ("Good Morning!", "Good Night!"),
  ("Welcome", "Thank you"),
).toDF()

println("-------------- Schema --------------")
df.printSchema()

// root
//  |-- _1: string (nullable = true)
//  |-- _2: string (nullable = true)

println("-------------- Data --------------")
df.show(false)

// +-------------+-----------+
// |_1           |_2         |
// +-------------+-----------+
// |Hello, world!|Farewell   |
// |Good Morning!|Good Night!|
// |Welcome      |Thank you  |
// +-------------+-----------+

In the examples above you can see in the outputed schemas that the columns have automatically generated names. We can specify the column names by passing in a parameter list to the toDF implicit function.

val df = Seq(
  ("Hello, world!", "Farewell"),
  ("Good Morning!", "Good Night!"),
  ("Hi", "Bye"),
).toDF("salutation", "farewell")

println("-------------- Schema --------------")
df.printSchema()

// root
//  |-- salutation: string (nullable = true)
//  |-- farewell: string (nullable = true)

println("-------------- Data --------------")
df.show(false)

// +-------------+-----------+
// |salutation   |farewell   |
// +-------------+-----------+
// |Hello, world!|Farewell   |
// |Good Morning!|Good Night!|
// |Hi           |Bye        |
// +-------------+-----------+

So far we have only created columns that are string types. We can create different typed columns by providing different types within the tuples.

val df = Seq(
  ("Hello, world!", "Good Night Sweet Prince!", 10),
  ("Good Morning!", "Good Night!", 5),
  ("Hi", "Bye", 4),
).toDF("salutation", "farewell", "rating")

println("-------------- Schema --------------")
df.printSchema()

// root
//  |-- salutation: string (nullable = true)
//  |-- farewell: string (nullable = true)
//  |-- rating: integer (nullable = false)

println("-------------- Data --------------")
df.show(false)

// +-------------+------------------------+------+
// |salutation   |farewell                |rating|
// +-------------+------------------------+------+
// |Hello, world!|Good Night Sweet Prince!|10    |
// |Good Morning!|Good Night!             |5     |
// |Hi           |Bye                     |4     |
// +-------------+------------------------+------+

If you are building up your test data frames and encountered an error like this:

[ENCODER_NOT_FOUND] Not found an encoder of the type Any to Spark SQL internal representation. Consider to change the input type to one of supported at...

You might be asking how can I create nullable columns! We can leverage the scala Some, Option and null to help us here:

val df = Seq(
  ("Hello, world!", "Good Night Sweet Prince!", Some(10)),
  ("Good Morning!", "Good Night!", null),
  ("Hi", "Bye", Some(4)),
).toDF("salutation", "farewell", "rating")

println("-------------- Schema --------------")
df.printSchema()

// root
//  |-- salutation: string (nullable = true)
//  |-- farewell: string (nullable = true)
//  |-- rating: integer (nullable = true)

println("-------------- Data --------------")
df.show(false)

// +-------------+------------------------+------+
// |salutation   |farewell                |rating|
// +-------------+------------------------+------+
// |Hello, world!|Good Night Sweet Prince!|10    |
// |Good Morning!|Good Night!             |null  |
// |Hi           |Bye                     |4     |
// +-------------+------------------------+------+

Here is a more complex example using arrays:

val df = Seq(
  (Some(Array("AAA", "BBB")), Some(Array("BBB", "CCC"))),
  (None,                      Some(Array("BBB", "CCC"))),
  (Some(Array("AAA", "BBB")), None),
).toDF("arr1", "arr2")

println("-------------- Schema --------------")
df.printSchema()

// root
//  |-- arr1: array (nullable = true)
//  |    |-- element: string (containsNull = true)
//  |-- arr2: array (nullable = true)
//  |    |-- element: string (containsNull = true)

println("-------------- Data --------------")
df.show(false)
+----------+----------+
|arr1      |arr2      |
+----------+----------+
|[AAA, BBB]|[BBB, CCC]|
|null      |[BBB, CCC]|
|[AAA, BBB]|null      |
+----------+----------+

Testing your etl pipelines is a best practice for any spark scala data engineer to be following. Thankfully with the toDF implicit function it become much easier to create robust test cases that you can use within your testing suite.

Tutorial Details

Created: 2023-08-08 01:12:12 AM

Last Updated: 2023-08-08 01:12:12 AM