Job Board
Consulting

Spark Scala Functions

The Spark SQL Functions API is a powerful tool provided by Apache Spark's Scala library. It provides many familiar functions used in data processing, data manipulation and transformations. Anyone who has experience with SQL will quickly understand many of the capabilities and how they work with DataFrames.

The spark scala functions library simplifies complex operations on DataFrames and seamlessly integrates with Spark SQL queries, making it ideal for processing structured or semi-structured data. The lib covers use cases for data aggregation, filtering, mathematical computations, string manipulation and other miscelaneus functions. It is a critical tool in a data engineers spark toolkit.

To use the spark scala functions library it must first be imported:

import org.apache.spark.sql.functions._

Functions Reference

The functions can be grouped into the following catetogires:

Aggregate Functions

def approx_count_distinct(columnName: String, rsd: Double): Column

Collection Functions

Date time Functions

Math Functions

  • def sqrt(e: Column): Column

  • def sqrt(colName: String): Column

Misc Functions

  • Rurturns an md5 hash representation of the column as a 32 char hex string. See spark scala hash functions for more information.

    def md5(e: Column): Column

    df.withColumn("md5", md5(col("name")))
  • Returns tha sha1 hash of the specified column as a 40 char hex string. See example spark scala hash functions.

    def sha1(e: Column): Column

    df.withColumn("sha1", sha1(col("name")))
  • Returns the sha2 hash of the specified column as a hex string. The number of desired bits myst be provided and be one of 224, 256, 384, or 512. For more details check out example spark scala sha2 usage.

    def sha2(e: Column, numBits: Int): Column

    df.withColumn("sha2", sha2(col("name"), 512))
  • Calculates the CRC (cyclic redundancy check) of the column. See spark scala crc function.

    def crc32(e: Column): Column

    df.withColumn("crc", crc32(col("name")))
  • Returns the hash of a list of columns using the murmer3 hashing algorithm. See spark scala generic hash function.

    def hash(cols: Column*): Column

    df.withColumn("hash", hash(col("A"), col("B")))
  • Returns the calculated 64 bit xxHash algorithm. See spark scala hash functions.

    def xxhash64(cols: Column*): Column

    df.withColumn("xxhash64", xxhash64(col("A"), col("B")))
  • def assert_true(c: Column): Column

  • def def raise_error(c: Column): Column

Non-aggregate Functions

Note to me. Give one example of each function on single signature. Link out to example that shows usage of all.

  • The convinence `col` function allows you to reference a DataFrame's column by it's string name.

    def col(colName: String): Column = Column(colName)

    df.select(col("my-column-name"))
  • The convinence `column` function allows you to reference a DataFrame's column by it's string name. This is an alternative to `col`

    def column(colName: String): Column = Column(colName)

    df.select(column("my-column-name"))
  • The `lit` function allows you to wrap an arbitrary or 'literal' value into a column to use within DataFrame transformation logic.

    def lit(literal: Any): Column

    df.withColumn("s1", lit("some literal value"))
      .withColumn("i1", lit(5))
  • def typedlit[T : TypeTag](literal: T): Column

  • def array(cols: Column*): Column

  • def array(colName: String, colNames: String*): Column

  • Map can be used to create a MapType column from a list of columns. You must provide 2n columns and the map is created using the rule `n1 => n2, n3 => n4, nx => nx + 1`. For more details check out the create map from columns examples.

    def map(cols: Column*): Column

    df.withColumn("s", map(lit("id"), col("id"), lit("name"), col("name")))
  • def map_from_arrays(keys: Column, values: Column): Column

  • def broadcast[T](df: Dataset[T]): Dataset[T]

  • Returns the first non-null value from a list of columns. Returns null when all are null. Check out coalesce examples for more information.

    def coalesce(e: Column*): Column

    df.withColumn("allergen", coalesce(col("allergen"), lit("N/A")))
  • def input_file_name(): Column

    df.withColumn("input_file_name", input_file_name())
  • def isnan(e: Column): Column

  • def isnull(e: Column): Column

  • def monotonically_increasing_id(): Column

  • def nanvl(col1: Column, col2: Column): Column

  • def negate(e: Column): Column

  • def not(e: Column): Column

  • def rand(seed: Long): Column

  • def rand(): Column

  • def randn(seed: Long): Column

  • def randn(): Column

  • def spark_partition_id(): Column

  • def struct(cols: Column*): Column

  • def struct(colName: String, colNames: String*): Column

  • The `when` function provides fall through conditionals that allow you to implement complex logic. See when/otherwise examples.

    def when(condition: Column, value: Any): Column

    when(col("gender").isNull, lit("U"))
  • def bitwise_not(e: Column): Column

  • def expr(expr: String): Column

  • def greatest(exprs: Column*): Column

  • def greatest(columnName: String, columnNames: String*): Column

  • def least(exprs: Column*): Column

  • def least(columnName: String, columnNames: String*): Column

To see more details on the normal non-aggregate functions see here.

Partition Transform Functions

Sorting Functions

  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame.

    def asc(columnName: String): Column

    df.sort(asc("name"), asc("age"))
  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame, but `null` values come first.

    def asc_nulls_first(columnName: String): Column

    df.sort(asc_nulls_first("name"))
  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame, but `null` values come last.

    def asc_nulls_last(columnName: String): Column

    df.sort(asc_nulls_last("name"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame.

    def desc(columnName: String): Column

    df.sort(desc("name"), desc("age"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame, but `null` values come first.

    def desc_nulls_first(columnName: String): Column

    df.sort(desc_nulls_first("name"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame, but `null` values come last.

    def desc_nulls_last(columnName: String): Column

    df.sort(desc_nulls_last("name"))

String Functions

  • Returns the numeric code point of the first character in a string. Use with chr/char (SQL) to convert back. See detailed examples.

    def ascii(e: Column): Column

    df.withColumn("ascii", ascii(lit("abcdefg")))
  • Computes the Base64 encoding of a binary or string column. See detailed examples.

    def base64(e: Column): Column

  • Returns the bit length of a string (bytes × 8). For Unicode strings this differs from character count. See detailed examples.

    def bit_length(e: Column): Column

    df.withColumn("bl", bit_length(lit("happy days")))
  • def concat_ws(sep: String, exprs: Column*): Column

  • Column method. Returns true if the string column contains the given substring. See detailed examples.

    def contains(other: Any): Column

    df.withColumn("has_foam", col("description").contains("foam"))
  • def decode(value: Column, charset: String): Column

  • def encode(value: Column, charset: String): Column

  • Column method. Returns true if the string column ends with the given suffix. See detailed examples.

    def endsWith(literal: String): Column

    df.withColumn("is_example", col("url").endsWith("/examples/"))
  • The `format_number` function converts a numerical column `x` to a string formated with comma separators and `d` decimal places. For more details check out format_number examples.

    def format_number(x: Column, d: Int): Column

    df.withColumn("formatted", format_number(col("elevation"), 0))
  • Converts a string to title case — capitalizes the first letter of each word, lowercases the rest. See detailed examples.

    def initcap(e: Column): Column

    df.withColumn("title_case", initcap(col("name")))
  • Returns the 1-based position of a substring within a string column, or 0 if not found. See detailed examples.

    def instr(str: Column, substring: String): Column

    df.withColumn("at_pos", instr(col("email"), "@"))
  • Case-insensitive SQL LIKE pattern matching. See detailed examples.

    def ilike(literal: String): Column

    df.withColumn("match", col("city").ilike("new york%"))
  • SQL alias for lower. Available via expr(). See lower and upper examples.

    lcase(str) - SQL alias for lower

    df.withColumn("lc", expr("lcase(name)"))
  • SQL LIKE pattern matching using % and _ wildcards. See detailed examples.

    def like(literal: String): Column

    df.withColumn("match", col("email").like("%@example.com"))
  • Returns the character length of a string column. For Unicode strings, counts visible characters rather than bytes. See detailed examples.

    def length(e: Column): Column

    df.withColumn("char_count", length(col("city")))
  • Converts a string column to lowercase. See lower and upper examples.

    def lower(e: Column): Column

    df.withColumn("lc", lower(lit("HELLO World")))
  • def levenshtein(l: Column, r: Column): Column

  • Returns the 1-based position of a substring within a string column (arguments reversed vs instr). See detailed examples.

    def locate(substr: String, str: Column): Column

    df.withColumn("pos", locate("@", col("email")))
  • def locate(substr: String, str: Column, pos: Int): Column

    df.withColumn("second_dash", locate("-", col("date_str"), 6))
  • Left-pads a string column to a target length with a specified padding string. See detailed examples.

    def lpad(str: Column, len: Int, pad: String): Column

    df.withColumn("padded", lpad(col("number").cast("string"), 6, "0"))
  • def lpad(str: Column, len: Int, pad: Array[Byte]): Column

  • Left trim removes the spaces from the left side of a string column. For a more detailed overview checkout spark scala ltrim string examples.

    def ltrim(e: Column): Column

    df.withColumn("ltrim", ltrim(col("c1")))
  • Left trim removes the specified characters in the `trimString` from the left side of a string column. For more details see spark scala ltrim string examples.

    def ltrim(e: Column, trimString: String): Column

    df.withColumn("ltrim", ltrim(col("c1"), " \n\t"))
  • Returns the byte length of a string (UTF-8 encoded). For Unicode strings this differs from character count. See detailed examples.

    def octet_length(e: Column): Column

    df.withColumn("byte_count", octet_length(col("word")))
  • def regexp_extract(e: Column, exp: String, groupIdx: Int): Column

  • def regexp_replace(e: Column, pattern: String, replacement: String): Column

  • def regexp_replace(e: Column, pattern: Column, replacement: Column): Column

  • Replace all occurrences of a substring with another string. Available as a SQL function via expr(). See detailed examples.

    def replace(str, search, replacement): Column — via expr()

    df.withColumn("new_email", expr("replace(email, 'old.com', 'new.com')"))
  • Decodes a Base64-encoded string column back to binary. See detailed examples.

    def unbase64(e: Column): Column

  • Right-pads a string column to a target length with a specified padding string. See detailed examples.

    def rpad(str: Column, len: Int, pad: String): Column

    df.withColumn("padded", rpad(col("name"), 12, "."))
  • def rpad(str: Column, len: Int, pad: Array[Byte]): Column

  • Duplicates a string column a specified number of times. See detailed examples.

    def repeat(str: Column, n: Int): Column

    df.withColumn("repeated", repeat(col("word"), 3))
  • Reverses the character order of a string column, or the element order of an array column. See detailed examples.

    def reverse(e: Column): Column

    df.withColumn("reversed", reverse(col("word")))
  • SQL RLIKE pattern matching using a full Java regular expression. See detailed examples.

    def rlike(literal: String): Column

    df.withColumn("is_phone", col("phone").rlike("""\d{3}-\d{3}-\d{4}"""))
  • Right trim removes the spaces from the right side of a string column. For a more detailed overview checkout spark scala rtrim string examples.

    def rtrim(e: Column): Column

    df.withColumn("rtrim", rtrim(col("c1")))
  • Right trim removes the specified characters in the `trimString` from the right side of a string column. For more details see spark scala rtrim string examples.

    def rtrim(e: Column, trimString: String): Column

    df.withColumn("rtrim", rtrim(col("c1"), " \n\t"))
  • def soundex(e: Column): Column

  • Split a string on a delimiter or regex pattern into an array. See detailed examples.

    def split(str: Column, pattern: String): Column

    df.withColumn("result", split(col("tags"), ","))
  • def split(str: Column, pattern: String, limit: Int): Column

    df.withColumn("result", split(col("tags"), ",", 2))
  • Column method. Returns true if the string column starts with the given prefix. See detailed examples.

    def startsWith(literal: String): Column

    df.withColumn("is_https", col("url").startsWith("https://"))
  • Extract characters from a string by position. See detailed examples.

    def substring(str: Column, pos: Int, len: Int): Column

    df.withColumn("result", substring(col("phone"), 1, 3))
  • Extract a substring before or after the Nth occurrence of a delimiter. See detailed examples.

    def substring_index(str: Column, delim: String, count: Int): Column

    df.withColumn("result", substring_index(col("ip"), ".", 2))
  • Replace a portion of a string at a given position with a replacement string. See detailed examples.

    def overlay(src: Column, replace: Column, pos: Column, len: Column): Column

  • def overlay(src: Column, replace: Column, pos: Column): Column

  • def sentences(string: Column, language: Column, country: Column): Column

  • def sentences(string: Column): Column

  • Replace or delete characters in a string column using a positional character mapping. See detailed examples.

    def translate(src: Column, matchingString: String, replaceString: String): Column

    df.withColumn("normalized", translate(col("sku"), "-/", "  "))
  • Trim removes the spaces from both ends (left and right) of a string column. For a more detailed overview checkout spark scala trim string examples.

    def trim(e: Column): Column

    df.withColumn("trim", trim(col("c1")))
  • Trim removes the specified characters in the `trimString` from the ends of a string column. For more details see spark scala trim string examples.

    def trim(e: Column, trimString: String): Column

    df.withColumn("trim", trim(col("c1"), " \n\t"))
  • SQL alias for upper. Available via expr(). See lower and upper examples.

    ucase(str) - SQL alias for upper

    df.withColumn("uc", expr("ucase(name)"))
  • Converts a string column to uppercase. See lower and upper examples.

    def upper(e: Column): Column

    df.withColumn("up", upper(lit("hello World")))

UDF Functions

Window Functions

Example Details

Created: 2023-07-31 02:25:00 PM

Last Updated: 2023-09-11 01:54:13 PM