Job Board
Consulting

Spark Scala Functions

The Spark SQL Functions API is a powerful tool provided by Apache Spark's Scala library. It provides many familiar functions used in data processing, data manipulation and transformations. Anyone who has experience with SQL will quickly understand many of the capabilities and how they work with DataFrames.

The spark scala functions library simplifies complex operations on DataFrames and seamlessly integrates with Spark SQL queries, making it ideal for processing structured or semi-structured data. The lib covers use cases for data aggregation, filtering, mathematical computations, string manipulation and other miscelaneus functions. It is a critical tool in a data engineers spark toolkit.

To use the spark scala functions library it must first be imported:

import org.apache.spark.sql.functions._

Functions Reference

The functions can be grouped into the following catetogires:

Aggregate Functions

def approx_count_distinct(columnName: String, rsd: Double): Column

Collection Functions

Date time Functions

Math Functions

  • def sqrt(e: Column): Column

  • def sqrt(colName: String): Column

Misc Functions

  • Rurturns an md5 hash representation of the column as a 32 char hex string. See spark scala hash functions for more information.

    def md5(e: Column): Column

    df.withColumn("md5", md5(col("name")))
  • Returns tha sha1 hash of the specified column as a 40 char hex string. See example spark scala hash functions.

    def sha1(e: Column): Column

    df.withColumn("sha1", sha1(col("name")))
  • Returns the sha2 hash of the specified column as a hex string. The number of desired bits myst be provided and be one of 224, 256, 384, or 512. For more details check out example spark scala sha2 usage.

    def sha2(e: Column, numBits: Int): Column

    df.withColumn("sha2", sha2(col("name"), 512))
  • Calculates the CRC (cyclic redundancy check) of the column. See spark scala crc function.

    def crc32(e: Column): Column

    df.withColumn("crc", crc32(col("name")))
  • Returns the hash of a list of columns using the murmer3 hashing algorithm. See spark scala generic hash function.

    def hash(cols: Column*): Column

    df.withColumn("hash", hash(col("A"), col("B")))
  • Returns the calculated 64 bit xxHash algorithm. See spark scala hash functions.

    def xxhash64(cols: Column*): Column

    df.withColumn("xxhash64", xxhash64(col("A"), col("B")))
  • def assert_true(c: Column): Column

  • def def raise_error(c: Column): Column

Non-aggregate Functions

Note to me. Give one example of each function on single signature. Link out to example that shows usage of all.

  • The convinence `col` function allows you to reference a DataFrame's column by it's string name.

    def col(colName: String): Column = Column(colName)

    df.select(col("my-column-name"))
  • The convinence `column` function allows you to reference a DataFrame's column by it's string name. This is an alternative to `col`

    def column(colName: String): Column = Column(colName)

    df.select(column("my-column-name"))
  • The `lit` function allows you to wrap an arbitrary or 'literal' value into a column to use within DataFrame transformation logic.

    def lit(literal: Any): Column

    df.withColumn("s1", lit("some literal value"))
      .withColumn("i1", lit(5))
  • def typedlit[T : TypeTag](literal: T): Column

  • def array(cols: Column*): Column

  • def array(colName: String, colNames: String*): Column

  • Map can be used to create a MapType column from a list of columns. You must provide 2n columns and the map is created using the rule `n1 => n2, n3 => n4, nx => nx + 1`. For more details check out the create map from columns examples.

    def map(cols: Column*): Column

    df.withColumn("s", map(lit("id"), col("id"), lit("name"), col("name")))
  • def map_from_arrays(keys: Column, values: Column): Column

  • def broadcast[T](df: Dataset[T]): Dataset[T]

  • Returns the first non-null value from a list of columns. Returns null when all are null. Check out coalesce examples for more information.

    def coalesce(e: Column*): Column

    df.withColumn("allergen", coalesce(col("allergen"), lit("N/A")))
  • def input_file_name(): Column

    df.withColumn("input_file_name", input_file_name())
  • def isnan(e: Column): Column

  • def isnull(e: Column): Column

  • def monotonically_increasing_id(): Column

  • def nanvl(col1: Column, col2: Column): Column

  • def negate(e: Column): Column

  • def not(e: Column): Column

  • def rand(seed: Long): Column

  • def rand(): Column

  • def randn(seed: Long): Column

  • def randn(): Column

  • def spark_partition_id(): Column

  • def struct(cols: Column*): Column

  • def struct(colName: String, colNames: String*): Column

  • The `when` function provides fall through conditionals that allow you to implement complex logic. See when/otherwise examples.

    def when(condition: Column, value: Any): Column

    when(col("gender").isNull, lit("U"))
  • def bitwise_not(e: Column): Column

  • def expr(expr: String): Column

  • def greatest(exprs: Column*): Column

  • def greatest(columnName: String, columnNames: String*): Column

  • def least(exprs: Column*): Column

  • def least(columnName: String, columnNames: String*): Column

To see more details on the normal non-aggregate functions see here.

Partition Transform Functions

Sorting Functions

  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame.

    def asc(columnName: String): Column

    df.sort(asc("name"), asc("age"))
  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame, but `null` values come first.

    def asc_nulls_first(columnName: String): Column

    df.sort(asc_nulls_first("name"))
  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame, but `null` values come last.

    def asc_nulls_last(columnName: String): Column

    df.sort(asc_nulls_last("name"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame.

    def desc(columnName: String): Column

    df.sort(desc("name"), desc("age"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame, but `null` values come first.

    def desc_nulls_first(columnName: String): Column

    df.sort(desc_nulls_first("name"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame, but `null` values come last.

    def desc_nulls_last(columnName: String): Column

    df.sort(desc_nulls_last("name"))

String Functions

  • def ascii(e: Column): Column

    df.withColumn("ascii", ascii(lit("abcdefg")))
  • def base64(e: Column): Column

  • Returns the bit length of a string.

    def bit_length(e: Column): Column

    df.withColumn("bl", bit_length(lit("happy days")))
  • def concat_ws(sep: String, exprs: Column*): Column

  • def decode(value: Column, charset: String): Column

  • def encode(value: Column, charset: String): Column

  • The `format_number` function converts a numerical column `x` to a string formated with comma separators and `d` decimal places. For more details check out format_number examples.

    def format_number(x: Column, d: Int): Column

    df.withColumn("formatted", format_number(col("elevation"), 0))
  • def initcap(e: Column): Column

  • def instr(str: Column, substring: String): Column

  • def length(e: Column): Column

  • Lowercases a string column.

    def lower(e: Column): Column

    df.withColumn("lc", lower(lit("HELLO World")))
  • def levenshtein(l: Column, r: Column): Column

  • def locate(substr: String, str: Column): Column

  • def locate(substr: String, str: Column, pos: Int): Column

  • def lpad(str: Column, len: Int, pad: String): Column

  • def lpad(str: Column, len: Int, pad: Array[Byte]): Column

  • Left trim removes the spaces from the left side of a string column. For a more detailed overview checkout spark scala ltrim string examples.

    def ltrim(e: Column): Column

    df.withColumn("ltrim", ltrim(col("c1")))
  • Left trim removes the specified characters in the `trimString` from the left side of a string column. For more details see spark scala ltrim string examples.

    def ltrim(e: Column, trimString: String): Column

    df.withColumn("ltrim", ltrim(col("c1"), " \n\t"))
  • def octet_length(e: Column): Column

  • def regexp_extract(e: Column, exp: String, groupIdx: Int): Column

  • def regexp_replace(e: Column, pattern: String, replacement: String): Column

  • def regexp_replace(e: Column, pattern: Column, replacement: Column): Column

  • def unbase64(e: Column): Column

  • def rpad(str: Column, len: Int, pad: String): Column

  • def rpad(str: Column, len: Int, pad: Array[Byte]): Column

  • def repeat(str: Column, n: Int): Column

  • Right trim removes the spaces from the right side of a string column. For a more detailed overview checkout spark scala rtrim string examples.

    def rtrim(e: Column): Column

    df.withColumn("rtrim", rtrim(col("c1")))
  • Right trim removes the specified characters in the `trimString` from the right side of a string column. For more details see spark scala rtrim string examples.

    def rtrim(e: Column, trimString: String): Column

    df.withColumn("rtrim", rtrim(col("c1"), " \n\t"))
  • def soundex(e: Column): Column

  • def split(str: Column, pattern: String): Column

  • def split(str: Column, pattern: String, limit: Int): Column

  • def substring(str: Column, pos: Int, len: Int): Column

  • def substring_index(str: Column, delim: String, count: Int): Column

  • def overlay(src: Column, replace: Column, pos: Column, len: Column): Column

  • def overlay(src: Column, replace: Column, pos: Column): Column

  • def sentences(string: Column, language: Column, country: Column): Column

  • def sentences(string: Column): Column

  • def translate(src: Column, matchingString: String, replaceString: String): Column

  • Trim removes the spaces from both ends (left and right) of a string column. For a more detailed overview checkout spark scala trim string examples.

    def trim(e: Column): Column

    df.withColumn("trim", trim(col("c1")))
  • Trim removes the specified characters in the `trimString` from the ends of a string column. For more details see spark scala trim string examples.

    def trim(e: Column, trimString: String): Column

    df.withColumn("trim", trim(col("c1"), " \n\t"))
  • Uppercases a string column.

    def upper(e: Column): Column

    df.withColumn("up", upper(lit("hello World")))

UDF Functions

Window Functions

Example Details

Created: 2023-07-31 02:25:00 PM

Last Updated: 2023-09-11 01:54:13 PM