Job Board
Consulting

Spark Scala Functions

The Spark SQL Functions API is a powerful tool provided by Apache Spark's Scala library. It provides many familiar functions used in data processing, data manipulation and transformations. Anyone who has experience with SQL will quickly understand many of the capabilities and how they work with DataFrames.

The spark scala functions library simplifies complex operations on DataFrames and seamlessly integrates with Spark SQL queries, making it ideal for processing structured or semi-structured data. The lib covers use cases for data aggregation, filtering, mathematical computations, string manipulation and other miscelaneus functions. It is a critical tool in a data engineers spark toolkit.

To use the spark scala functions library it must first be imported:

import org.apache.spark.sql.functions._

Functions Reference

The functions can be grouped into the following catetogires:

Aggregate Functions

def approx_count_distinct(columnName: String, rsd: Double): Column

Collection Functions

Date time Functions

  • Returns the date that is the given number of months after the start date, clamping to the last day when the day-of-month doesn't exist in the target month. See detailed examples.

    def add_months(startDate: Column, numMonths: Int): Column
    def add_months(startDate: Column, numMonths: Column): Column

    add_months(col("event_date"), 1)
  • Alias of current_date. SQL function — use via expr(). See detailed examples.

    curdate()

    df.withColumn("today", expr("curdate()"))
  • Returns the current date at the start of query evaluation as a date column. See detailed examples.

    def current_date(): Column

    df.withColumn("today", current_date())
  • Returns the current timestamp at the start of query evaluation as a timestamp column. See detailed examples.

    def current_timestamp(): Column

    df.withColumn("now", current_timestamp())
  • Returns the current session local timezone. SQL function — use via expr(). See detailed examples.

    current_timezone()

    df.withColumn("tz", expr("current_timezone()"))
  • Returns the date that is the given number of days after the start date. See detailed examples.

    def date_add(start: Column, days: Int): Column
    def date_add(start: Column, days: Column): Column

    date_add(col("event_date"), 7)
  • Spark 3.4.0 SQL alias for datediff. SQL-only — call via expr(). See detailed examples.

    date_diff(endDate, startDate)

    df.withColumn("days", expr("date_diff(shipped_on, ordered_on)"))
  • Formats a date or timestamp column as a string using a pattern like yyyy-MM-dd or MMMM d, yyyy. See detailed examples.

    def date_format(dateExpr: Column, format: String): Column

    date_format(col("event_date"), "MM/dd/yyyy")
  • Extract a part (year, month, day, hour, etc.) from a date, timestamp, or interval. SQL-only — call via expr(). See detailed examples.

    date_part(field, source)

    df.withColumn("year", expr("date_part('YEAR', event_time)"))
  • Returns the date that is the given number of days before the start date. See detailed examples.

    def date_sub(start: Column, days: Int): Column
    def date_sub(start: Column, days: Column): Column

    date_sub(col("event_date"), 7)
  • Returns timestamp truncated to the unit specified by the format. See detailed examples.

    def date_trunc(format: String, timestamp: Column): Column

    date_trunc("month", col("event_ts"))
  • Spark 3.4.0 alias for date_add. SQL-only — call via expr(). See detailed examples.

    dateadd(start_date, num_days)

    df.withColumn("plus_7_days", expr("dateadd(event_date, 7)"))
  • Returns the number of days between two date/timestamp/string columns (end minus start). See detailed examples.

    def datediff(end: Column, start: Column): Column

    datediff(col("shipped_on"), col("ordered_on"))
  • Spark 3.4.0 alias for date_part. SQL-only — call via expr(). See detailed examples.

    datepart(field, source)

    df.withColumn("quarter", expr("datepart('QUARTER', event_date)"))
  • Returns the day of month of the date/timestamp. SQL alias for dayofmonth — use via expr(). See detailed examples.

    day(date)

    df.withColumn("day", expr("day(event_date)"))
  • Extracts the day of the month as an integer from a given date/timestamp/string. See detailed examples.

    def dayofmonth(e: Column): Column

    df.withColumn("day", dayofmonth(col("event_date")))
  • Extracts the day of the week as an integer from a given date/timestamp/string. Sunday = 1 through Saturday = 7. See detailed examples.

    def dayofweek(e: Column): Column

    df.withColumn("dow", dayofweek(col("event_date")))
  • Extracts the day of the year as an integer from a given date/timestamp/string. See detailed examples.

    def dayofyear(e: Column): Column

    df.withColumn("doy", dayofyear(col("event_date")))
  • SQL-standard syntax for pulling a field out of a date, timestamp, or interval. SQL-only — call via expr(). See detailed examples.

    extract(field FROM source)

    df.withColumn("year", expr("extract(YEAR FROM event_time)"))
  • Extracts the hours as an integer from a given date/timestamp/string. See detailed examples.

    def hour(e: Column): Column

    df.withColumn("hour", hour(col("event_time")))
  • Returns the current timestamp without time zone at the start of query evaluation as a timestamp_ntz column. See detailed examples.

    def localtimestamp(): Column

    df.withColumn("now_ntz", localtimestamp())
  • Extracts the minutes as an integer from a given date/timestamp/string. See detailed examples.

    def minute(e: Column): Column

    df.withColumn("minute", minute(col("event_time")))
  • Extracts the month as an integer from a given date/timestamp/string. See detailed examples.

    def month(e: Column): Column

    df.withColumn("month", month(col("event_date")))
  • Alias of current_timestamp. SQL function — use via expr(). See detailed examples.

    now()

    df.withColumn("now", expr("now()"))
  • Extracts the quarter as an integer (1 through 4) from a given date/timestamp/string. See detailed examples.

    def quarter(e: Column): Column

    df.withColumn("quarter", quarter(col("event_date")))
  • Extracts the seconds as an integer from a given date/timestamp/string. See detailed examples.

    def second(e: Column): Column

    df.withColumn("second", second(col("event_time")))
  • Returns date truncated to the unit specified by the format. See detailed examples.

    def trunc(date: Column, format: String): Column

    trunc(col("event_date"), "month")
  • Returns the day of the week for date/timestamp (0 = Monday through 6 = Sunday). SQL function — use via expr(). See detailed examples.

    weekday(date)

    df.withColumn("weekday", expr("weekday(event_date)"))
  • Extracts the ISO 8601 week number as an integer from a given date/timestamp/string. See detailed examples.

    def weekofyear(e: Column): Column

    df.withColumn("week", weekofyear(col("event_date")))
  • Extracts the year as an integer from a given date/timestamp/string. See detailed examples.

    def year(e: Column): Column

    df.withColumn("year", year(col("event_date")))

Math Functions

  • def sqrt(e: Column): Column
    def sqrt(colName: String): Column

Misc Functions

  • Rurturns an md5 hash representation of the column as a 32 char hex string. See spark scala hash functions for more information.

    def md5(e: Column): Column

    df.withColumn("md5", md5(col("name")))
  • Returns tha sha1 hash of the specified column as a 40 char hex string. See example spark scala hash functions.

    def sha1(e: Column): Column

    df.withColumn("sha1", sha1(col("name")))
  • Returns the sha2 hash of the specified column as a hex string. The number of desired bits myst be provided and be one of 224, 256, 384, or 512. For more details check out example spark scala sha2 usage.

    def sha2(e: Column, numBits: Int): Column

    df.withColumn("sha2", sha2(col("name"), 512))
  • Calculates the CRC (cyclic redundancy check) of the column. See spark scala crc function.

    def crc32(e: Column): Column

    df.withColumn("crc", crc32(col("name")))
  • Returns the hash of a list of columns using the murmer3 hashing algorithm. See spark scala generic hash function.

    def hash(cols: Column*): Column

    df.withColumn("hash", hash(col("A"), col("B")))
  • Returns the calculated 64 bit xxHash algorithm. See spark scala hash functions.

    def xxhash64(cols: Column*): Column

    df.withColumn("xxhash64", xxhash64(col("A"), col("B")))
  • def def raise_error(c: Column): Column

Non-aggregate Functions

Note to me. Give one example of each function on single signature. Link out to example that shows usage of all.

  • The convinence `col` function allows you to reference a DataFrame's column by it's string name.

    def col(colName: String): Column = Column(colName)

    df.select(col("my-column-name"))
  • The convinence `column` function allows you to reference a DataFrame's column by it's string name. This is an alternative to `col`

    def column(colName: String): Column = Column(colName)

    df.select(column("my-column-name"))
  • The `lit` function allows you to wrap an arbitrary or 'literal' value into a column to use within DataFrame transformation logic.

    def lit(literal: Any): Column

    df.withColumn("s1", lit("some literal value"))
      .withColumn("i1", lit(5))
  • def typedlit[T : TypeTag](literal: T): Column

  • def array(cols: Column*): Column
    def array(colName: String, colNames: String*): Column

  • Map can be used to create a MapType column from a list of columns. You must provide 2n columns and the map is created using the rule `n1 => n2, n3 => n4, nx => nx + 1`. For more details check out the create map from columns examples.

    def map(cols: Column*): Column

    df.withColumn("s", map(lit("id"), col("id"), lit("name"), col("name")))
  • def map_from_arrays(keys: Column, values: Column): Column

  • def broadcast[T](df: Dataset[T]): Dataset[T]

  • Returns the first non-null value from a list of columns. Returns null when all are null. Check out coalesce examples for more information.

    def coalesce(e: Column*): Column

    df.withColumn("allergen", coalesce(col("allergen"), lit("N/A")))
  • def isnan(e: Column): Column

  • def isnull(e: Column): Column

  • def monotonically_increasing_id(): Column

  • def nanvl(col1: Column, col2: Column): Column

  • def negate(e: Column): Column

  • def not(e: Column): Column

  • def rand(seed: Long): Column
    def rand(): Column

  • def randn(seed: Long): Column
    def randn(): Column

  • def spark_partition_id(): Column

  • def struct(cols: Column*): Column
    def struct(colName: String, colNames: String*): Column

  • The `when` function provides fall through conditionals that allow you to implement complex logic. See when/otherwise examples.

    def when(condition: Column, value: Any): Column

    when(col("gender").isNull, lit("U"))
  • def bitwise_not(e: Column): Column

  • def expr(expr: String): Column

  • def greatest(exprs: Column*): Column
    def greatest(columnName: String, columnNames: String*): Column

  • def least(exprs: Column*): Column
    def least(columnName: String, columnNames: String*): Column

To see more details on the normal non-aggregate functions see here.

Partition Transform Functions

Sorting Functions

  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame.

    def asc(columnName: String): Column

    df.sort(asc("name"), asc("age"))
  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame, but `null` values come first.

    def asc_nulls_first(columnName: String): Column

    df.sort(asc_nulls_first("name"))
  • Convinence function to apply an ascending sort order on a column in a sort expression on a DataFrame, but `null` values come last.

    def asc_nulls_last(columnName: String): Column

    df.sort(asc_nulls_last("name"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame.

    def desc(columnName: String): Column

    df.sort(desc("name"), desc("age"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame, but `null` values come first.

    def desc_nulls_first(columnName: String): Column

    df.sort(desc_nulls_first("name"))
  • Convinence function to apply a decending sort order on a column in a sort expression on a DataFrame, but `null` values come last.

    def desc_nulls_last(columnName: String): Column

    df.sort(desc_nulls_last("name"))

String Functions

  • Returns the numeric code point of the first character in a string. Use with chr/char (SQL) to convert back. See detailed examples.

    def ascii(e: Column): Column

    df.withColumn("ascii", ascii(lit("abcdefg")))
  • Computes the Base64 encoding of a binary or string column. See detailed examples.

    def base64(e: Column): Column

  • Returns the binary string representation of a long integer column. See detailed examples.

    def bin(e: Column): Column

    df.withColumn("binary", bin(col("value")))
  • Returns the bit length of a string (bytes × 8). For Unicode strings this differs from character count. See detailed examples.

    def bit_length(e: Column): Column

    df.withColumn("bl", bit_length(lit("happy days")))
  • Trim characters from both sides of a string. SQL-standard equivalent of trim, available via expr(). See detailed examples.

    def btrim(str): Column — via expr()
    def btrim(str, trimStr): Column — via expr()

    df.withColumn("trimmed", expr("btrim(city)"))
    df.withColumn("trimmed", expr("btrim(city, '-')"))
    
  • Concatenates multiple string columns with a separator, treating nulls as empty strings. See detailed examples.

    def concat_ws(sep: String, exprs: Column*): Column

    df.withColumn("path", concat_ws("/", col("dir"), col("file")))
  • Column method. Returns true if the string column contains the given substring. See detailed examples.

    def contains(other: Any): Column

    df.withColumn("has_foam", col("description").contains("foam"))
  • Converts a number string from one base to another (bases 2 through 36). See detailed examples.

    def conv(num: Column, fromBase: Int, toBase: Int): Column

    df.withColumn("hex", conv(col("decimal"), 10, 16))
  • Converts a binary column to a string using the specified character set. See detailed examples.

    def decode(value: Column, charset: String): Column

  • Returns the n-th value from a list of column expressions (1-based). SQL function — call via expr(). See detailed examples.

    elt(n, input1, input2, ...): Column — via expr()

    df.withColumn("picked", expr("elt(idx, col1, col2, col3)"))
  • Converts a string column to binary using the specified character set. See detailed examples.

    def encode(value: Column, charset: String): Column

  • Column method. Returns true if the string column ends with the given suffix. See detailed examples.

    def endsWith(literal: String): Column

    df.withColumn("is_example", col("url").endsWith("/examples/"))
  • Returns the 1-based position of a string within a comma-delimited list. Returns 0 if not found. SQL function — use via expr(). See detailed examples.

    find_in_set(str, strArray) - Returns the index (1-based) of str in the comma-delimited strArray

    df.withColumn("pos", expr("find_in_set('admin', roles)"))
  • The `format_number` function converts a numerical column `x` to a string formated with comma separators and `d` decimal places. For more details check out format_number examples.

    def format_number(x: Column, d: Int): Column

    df.withColumn("formatted", format_number(col("elevation"), 0))
  • Formats column values into a string using a printf-style pattern. See detailed examples.

    def format_string(format: String, arguments: Column*): Column

    df.withColumn("label", format_string("%s: $%.2f", col("product"), col("price")))
  • Converts an integer or string column to its hexadecimal representation. See detailed examples.

    def hex(column: Column): Column

    df.withColumn("hex_value", hex(col("value")))
  • Converts a string to title case — capitalizes the first letter of each word, lowercases the rest. See detailed examples.

    def initcap(e: Column): Column

    df.withColumn("title_case", initcap(col("name")))
  • Returns the 1-based position of a substring within a string column, or 0 if not found. See detailed examples.

    def instr(str: Column, substring: String): Column

    df.withColumn("at_pos", instr(col("email"), "@"))
  • Case-insensitive SQL LIKE pattern matching. See detailed examples.

    def ilike(literal: String): Column

    df.withColumn("match", col("city").ilike("new york%"))
  • SQL alias for lower. Available via expr(). See lower and upper examples.

    lcase(str) - SQL alias for lower

    df.withColumn("lc", expr("lcase(name)"))
  • SQL LIKE pattern matching using % and _ wildcards. See detailed examples.

    def like(literal: String): Column

    df.withColumn("match", col("email").like("%@example.com"))
  • Returns the character length of a string column. For Unicode strings, counts visible characters rather than bytes. See detailed examples.

    def length(e: Column): Column

    df.withColumn("char_count", length(col("city")))
  • Converts a string column to lowercase. See lower and upper examples.

    def lower(e: Column): Column

    df.withColumn("lc", lower(lit("HELLO World")))
  • Computes the Levenshtein distance between two string columns — the minimum number of single-character edits needed to transform one into the other. See detailed examples.

    def levenshtein(l: Column, r: Column): Column

    df.withColumn("distance", levenshtein(col("name1"), col("name2")))
  • Returns the 1-based position of a substring within a string column (arguments reversed vs instr). See detailed examples.

    def locate(substr: String, str: Column): Column
    def locate(substr: String, str: Column, pos: Int): Column

    df.withColumn("pos", locate("@", col("email")))
    df.withColumn("second_dash", locate("-", col("date_str"), 6))
    
  • Left-pads a string column to a target length with a specified padding string. See detailed examples.

    def lpad(str: Column, len: Int, pad: String): Column
    def lpad(str: Column, len: Int, pad: Array[Byte]): Column

    df.withColumn("padded", lpad(col("number").cast("string"), 6, "0"))
  • Removes characters from the left side of a string column. The 1-arg form trims spaces; the 2-arg form trims the characters in trimString. See detailed examples.

    def ltrim(e: Column): Column
    def ltrim(e: Column, trimString: String): Column

    df.withColumn("ltrim", ltrim(col("c1")))
    df.withColumn("ltrim", ltrim(col("c1"), " \n\t"))
    
  • Replaces uppercase letters, lowercase letters, and digits with masking characters, preserving the structure of the original value. See detailed examples.

    def mask(input, upperChar, lowerChar, digitChar, otherChar): Column — via expr()

    df.withColumn("masked", expr("mask(email)"))
  • Returns the byte length of a string (UTF-8 encoded). For Unicode strings this differs from character count. See detailed examples.

    def octet_length(e: Column): Column

    df.withColumn("byte_count", octet_length(col("word")))
  • Counts the non-overlapping matches of a regex in a string column. Available as a SQL function via expr(). See detailed examples.

    regexp_count(str, regexp): Column — via expr()

    df.withColumn("ip_count", expr("regexp_count(log_line, '\\d+\\.\\d+\\.\\d+\\.\\d+')"))
  • def regexp_extract(e: Column, exp: String, groupIdx: Int): Column

  • Returns an array of all matches of a regex in a string column. Available as a SQL function via expr(). See detailed examples.

    regexp_extract_all(str, regexp[, idx]): Column — via expr()

    df.withColumn("emails", expr("regexp_extract_all(message, '[A-Za-z0-9._]+@[A-Za-z0-9.]+', 0)"))
  • Returns the 1-based position of the first regex match in a string, or 0 if no match. Available as a SQL function via expr(). See detailed examples.

    regexp_instr(str, regexp[, idx]): Column — via expr()

    df.withColumn("first_digit_pos", expr("regexp_instr(note, '\\d+')"))
  • Returns true when a string matches a regex, false otherwise. Available as a SQL function via expr(). See detailed examples.

    regexp_like(str, regexp): Column — via expr()

    df.withColumn("is_email", expr("regexp_like(contact, '^[A-Za-z0-9._]+@[A-Za-z0-9.]+$')"))
  • def regexp_replace(e: Column, pattern: String, replacement: String): Column
    def regexp_replace(e: Column, pattern: Column, replacement: Column): Column

  • Returns the first substring of str that matches the regex, or null if no match. Available as a SQL function via expr(). See detailed examples.

    regexp_substr(str, regexp): Column — via expr()

    df.withColumn("phone", expr("regexp_substr(message, '\\d{3}-\\d{3}-\\d{4}')"))
  • Replace all occurrences of a substring with another string. Available as a SQL function via expr(). See detailed examples.

    def replace(str, search, replacement): Column — via expr()

    df.withColumn("new_email", expr("replace(email, 'old.com', 'new.com')"))
  • Decodes a Base64-encoded string column back to binary. See detailed examples.

    def unbase64(e: Column): Column

  • Decodes a hex-encoded string back to binary. See detailed examples.

    def unhex(column: Column): Column

    df.withColumn("decoded", unhex(col("hex_string")))
  • Right-pads a string column to a target length with a specified padding string. See detailed examples.

    def rpad(str: Column, len: Int, pad: String): Column
    def rpad(str: Column, len: Int, pad: Array[Byte]): Column

    df.withColumn("padded", rpad(col("name"), 12, "."))
  • Duplicates a string column a specified number of times. See detailed examples.

    def repeat(str: Column, n: Int): Column

    df.withColumn("repeated", repeat(col("word"), 3))
  • Reverses the character order of a string column, or the element order of an array column. See detailed examples.

    def reverse(e: Column): Column

    df.withColumn("reversed", reverse(col("word")))
  • SQL RLIKE pattern matching using a full Java regular expression. See detailed examples.

    def rlike(literal: String): Column

    df.withColumn("is_phone", col("phone").rlike("""\d{3}-\d{3}-\d{4}"""))
  • Removes characters from the right side of a string column. The 1-arg form trims spaces; the 2-arg form trims the characters in trimString. See detailed examples.

    def rtrim(e: Column): Column
    def rtrim(e: Column, trimString: String): Column

    df.withColumn("rtrim", rtrim(col("c1")))
    df.withColumn("rtrim", rtrim(col("c1"), " \n\t"))
    
  • Returns the soundex code of a string — a four-character phonetic encoding for fuzzy name matching. See detailed examples.

    def soundex(e: Column): Column

    df.withColumn("soundex_code", soundex(col("name")))
  • Split a string on a delimiter or regex pattern into an array. See detailed examples.

    def split(str: Column, pattern: String): Column
    def split(str: Column, pattern: String, limit: Int): Column

    df.withColumn("result", split(col("tags"), ","))
    df.withColumn("result", split(col("tags"), ",", 2))
    
  • Column method. Returns true if the string column starts with the given prefix. See detailed examples.

    def startsWith(literal: String): Column

    df.withColumn("is_https", col("url").startsWith("https://"))
  • Extract characters from a string by position. See detailed examples.

    def substring(str: Column, pos: Int, len: Int): Column

    df.withColumn("result", substring(col("phone"), 1, 3))
  • Extract a substring before or after the Nth occurrence of a delimiter. See detailed examples.

    def substring_index(str: Column, delim: String, count: Int): Column

    df.withColumn("result", substring_index(col("ip"), ".", 2))
  • Replace a portion of a string at a given position with a replacement string. See detailed examples.

    def overlay(src: Column, replace: Column, pos: Column, len: Column): Column
    def overlay(src: Column, replace: Column, pos: Column): Column

  • Extracts a part of a URL — HOST, PATH, QUERY, PROTOCOL, etc. SQL function — use via expr(). See detailed examples.

    parse_url(url, partToExtract): Column — via expr()

    df.withColumn("host", expr("parse_url(url, 'HOST')"))
  • Extracts a specific query-string parameter from a URL. SQL function — use via expr(). See detailed examples.

    parse_url(url, partToExtract, key): Column — via expr()

    df.withColumn("q", expr("parse_url(url, 'QUERY', 'q')"))
  • Split text into an array of sentences, each containing an array of words. See detailed examples.

    def sentences(string: Column, language: Column, country: Column): Column
    def sentences(string: Column): Column

  • Replace or delete characters in a string column using a positional character mapping. See detailed examples.

    def translate(src: Column, matchingString: String, replaceString: String): Column

    df.withColumn("normalized", translate(col("sku"), "-/", "  "))
  • Removes characters from both ends of a string column. The 1-arg form trims spaces; the 2-arg form trims the characters in trimString. See detailed examples.

    def trim(e: Column): Column
    def trim(e: Column, trimString: String): Column

    df.withColumn("trim", trim(col("c1")))
    df.withColumn("trim", trim(col("c1"), " \n\t"))
    
  • SQL alias for upper. Available via expr(). See lower and upper examples.

    ucase(str) - SQL alias for upper

    df.withColumn("uc", expr("ucase(name)"))
  • Converts a string column to uppercase. See lower and upper examples.

    def upper(e: Column): Column

    df.withColumn("up", upper(lit("hello World")))
  • Decodes a string from application/x-www-form-urlencoded format. SQL function — use via expr(). See detailed examples.

    url_decode(str): Column — via expr()

    df.withColumn("decoded", expr("url_decode(input)"))
  • Encodes a string to application/x-www-form-urlencoded format, safe for use in URLs. SQL function — use via expr(). See detailed examples.

    url_encode(str): Column — via expr()

    df.withColumn("encoded", expr("url_encode(input)"))

UDF Functions

Window Functions

Example Details

Created: 2023-07-31 02:25:00 PM

Last Updated: 2023-09-11 01:54:13 PM