Spark 3.0 — New Functions in a Nutshell

Arun Jijo
Javarevisited
Published in
8 min readJun 14, 2020

--

Recently Apache Spark community releases the preview of Spark 3.0 which holds many significant new features that will help Spark to make a powerful mark, which already has a wide range of enterprise users and developers in this Big data and Data science era.

In the new release, spark community has ported some functions from Spark SQL to programmatic Scala API(org.apache.spark.sql.functions) which encourage the developers to make use of this functions directly as a part of their DataFrame transformations rather than entering into SQL mode or creating a View and makes use of this functions along with the SQL expressions or with the callUDF function.

And the community has also toiled enough to introduce some new data transformations functions and partition_transforms functions which will be extremely useful when working with Spark’s new DataFrameWriterv2 to write out the data to some external storage.

Some of the new Functions in Spark 3 are already part of the previous versions of Databricks Spark. So, if you’ve worked in Databricks cloud you may found some of these functions familiar.

Throughout this article lets walk through, Spark’s new functions in both Spark SQL and in Scala API for access in DataFrame operations and the functions which are ported from Spark SQL to Scala API for programmatic access.

Functions Introduced in Spark 3.0 in Spark SQL and for DataFrame transformations

from_csv

Like from_json, this function parses a column which has CSV strings and converts it into Struct type. If the CSV strings are not parsable, it will return null.

Example:

This function requires a Struct schema and options which indicate how to parse the CSV strings. The options are as same as the CSV data source.

val studentInfo = "1,Jerin,CSE"::"2,Jerlin,ECE"::"3,Arun,CSE"::Nilval schema = new StructType() 
.add("Id",IntegerType)
.add("Name",StringType)
.add("Dept",StringType)
val options = Map("delimiter" ->",")val studentDF = studentInfo.toDF("Student_Info")
.withColumn("csv_struct",from_csv('Student_Info, schema,options))
studentDF.show()

to_csv

To convert the Struct type column into CSV string.

Example:

Along with the Struct type column, this function also accepts an optional options parameter which indicates how to convert the Struct column into CSV string.

studentDF
.withColumn("csv_string",to_csv($"csv_struct",Map.empty[String, String].asJava))
.show

schema_of_csv

Infers the schema of a CSV string and returns the schema in DDL format.

Example:

This function needs a CSV string column and an optional parameter which holds the options of how to parse the CSV string.

studentDF
.withColumn("schema",schema_of_csv("csv_string"))
.show

for_all

Apply the given predicate to all the elements in the array, and returns true only if all the elements in the array evaluates to true otherwise yields false.

Example:

To check if all the elements in the given Array column are even.

val  df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF("int_array")
df.withColumn("flag",forall($"int_array",(x:Column)=>(lit(x%2==0))))
.show

transform

Return a new array after applying the function to all the elements in the array.

Example:

Add “1” to all the elements in the array.

val df = Seq((Seq(2,4,6)),(Seq(5,10,3))).toDF("num_array")
df
.withColumn("num_array",transform($"num_array",x=>x+1))
.show

overlay

To replace the contents of the column, with the actual replacing contents from the specified byte position to the optionally specified byte length.

Example:

To change the greeting from specific persons to make it the traditional “Hello World”

Here we’re replacing the person names with the world, as the starting position of names is 7 and we want to remove the complete name before replacing the contents, the length of byte positions needs to remove should be greater than or equal to the maximum length of the name in the Column.

So, we’ve passed the replacing word as “World”, specific starting position to replace the contents as “7” and number of positions to remove from the specified starting position is “12” (This one is optional, if not specified the function will just replace the source content with the replace content from the specified starting position).

Overlay replaces the content from StringType, TimeStampType, IntegerType, etc… But the return type of the Column will be always StringType irrespective of the Column input type.

val greetingMsg = "Hello Arun"::"Hello Mohit Chawla"::"Hello Shaurya"::Nilval greetingDF = greetingMsg.toDF("greet_msg")greetingDF.withColumn("greet_msg",overlay($"greet_msg",lit("World"),lit("7"),lit("12")))
.show

split

Split the string expression according to the given regex and the specified limit which indicates how many times to apply the regex to the given string expression.

If the specified limit is lesser than or equal to zero, the regex will be applied as many times over the string and the resulting array will contain all the possible string splits according to the given regex.

if the specified limit is greater than zero, the regex will be applied not more than the limit

Example:

To split the given string expression into two according to the regex. I.e. the string delimiter.

val num = "one~two~three"::"four~five"::Nil
val numDF = num.toDF("numbers")
numDF
.withColumn("numbers",split($"numbers","~",2))
.show

To split the same string expression into multiple parts as many times as the delimiter appears

numDF
.withColumn("numbers",split($"numbers","~",0))
.show

map_entries

To convert the Map key values into unordered arrays of entries.

Example:

To get all the keys and values of a Map in an array.

val df = Seq(Map(1->"x",2->"y")).toDF("key_values")
df.withColumn("key_value_array",map_entries($"key_values"))
.show

map_zip_with

Merge two Maps according to the keys into a single one using a function.

Example:

To calculate total sales done by employees across departments and get the total sales for a particular employee in a single map by passing a function that will sum up the total sales from two different Map columns based on the key.

val df = Seq((Map("EID_1"->10000,"EID_2"->25000),
Map("EID_1"->1000,"EID_2"->2500))) .toDF("emp_sales_dept1","emp_sales_dept2")

df.
withColumn("total_emp_sales",map_zip_with($"emp_sales_dept1",$"emp_sales_dept2",(k,v1,v2)=>(v1+v2)))
.show

map_filter

Returns new key-value pairs that contain only the Map values which satisfy the given predicate function.

Example:

To filter out only the employees whose sales values is higher than 20000

val df = Seq(Map("EID_1"->10000,"EID_2"->25000))
.toDF("emp_sales")

df
.withColumn("filtered_sales",map_filter($"emp_sales",(k,v)=>(v>20000)))
.show

transform_values

Manipulates the values of all the elements in a Map column according to the given function.

Example:

To calculate the employee salary by giving each employee an increment of 5000

val df = Seq(Map("EID_1"->10000,"EID_2"->25000))
.toDF("emp_salary")

df
.withColumn("emp_salary",transform_values($"emp_salary",(k,v)=>(v+5000)))
.show

transform_keys

Manipulates the keys of all the elements in a Map column according to the given function.

Example:

To add the company name “XYZ” to the employee id.

val df = Seq(Map("EID_1" -> 10000, "EID_2" -> 25000))
.toDF("employees")
df
.withColumn("employees", transform_keys($"employees", (k, v) => concat(k,lit("_XYZ"))))
.show

xhash64

To compute the hash code of the given column contents, using the 64-bit xxhash algorithm and returns the result as long.

Functions ported from Spark SQL to Scala API in Spark 3.0 for DataFrame transformations

Most of the Spark SQL functions are available as Scala API, which makes the same functions to be available to be used as a part of DataFrame operations. But there are still handful of functions which are not available as programmatic function. To use these functions, it is mandatory to enter into Spark SQL mode and use these functions as a part of SQL expressions or use the Spark “callUDF” functions to use the same functions. Some of these functions used to be ported to the programmatic spark API in the newer releases as the popularity and usage of the function keeps progresses. The following are the functions which are ported from previous versions of Spark SQL function to Scala API (org.spark.apache.sql.functions)

date_sub

Subtract days from date, timestamp and string data types. If the data type is string, it should be of format that can cast to date “yyyy-MM-dd” or “yyyy-MM-dd HH:mm:ss.SSSS”

Example:

Subtract “1 day” from the eventDateTime.

If the days to be subtracted is in negative, this function will add the given days to the actual date.

var df = Seq(
(1, Timestamp.valueOf("2020-01-01 23:00:01")),
(2, Timestamp.valueOf("2020-01-02 12:40:32")),
(3, Timestamp.valueOf("2020-01-03 09:54:00")),
(4, Timestamp.valueOf("2020-01-04 10:12:43"))
)
.toDF("typeId","eventDateTime")

df.withColumn("Adjusted_Date",date_sub($"eventDateTime",1)).show()

date_add

Same as date_sub, but to add days to the actual days.

Example:

Add “1 day” to eventDateTime

var df = Seq(
(1, Timestamp.valueOf("2020-01-01 23:00:01")),
(2, Timestamp.valueOf("2020-01-02 12:40:32")),
(3, Timestamp.valueOf("2020-01-03 09:54:00")),
(4, Timestamp.valueOf("2020-01-04 10:12:43"))
)
.toDF("Id","eventDateTime")
df
.withColumn("Adjusted Date",date_add($"eventDateTime",1))
.show()

months_add

Like date_add and date_sub, this function helps to add months to date.

To subtract months give the number months to subtract as negative, as there is no separate subtract function for subtracting months

Example:

Add and Subtract one months from eventDateTime.

var df = Seq(
(1, Timestamp.valueOf("2020-01-01 23:00:01")),
(2, Timestamp.valueOf("2020-01-02 12:40:32")),
(3, Timestamp.valueOf("2020-01-03 09:54:00")),
(4, Timestamp.valueOf("2020-01-04 10:12:43"))
).toDF("typeId","eventDateTime")
//To add one months
df
.withColumn("Adjusted Date",add_months($"eventDateTime",1))
.show()
//To subtract one months
df
.withColumn("Adjusted Date",add_months($"eventDateTime",-1))
.show()

zip_with

To merge the right and left array, by applying a function.

This function expects both of the array lengths to be the same, in case if one of the arrays is shorter than the other, null will be added to match the longer array length.

Example:

Add the contents of two array and merge into one

val df = Seq((Seq(2,4,6),Seq(5,10,3)))
.toDF("array_1","array_2")

df
.withColumn("merged_array",zip_with($"array_1",$"array_2",(x,y)=>(x+y)))
.show

Apply the predicate to all the elements and check at least one or more elements in the array holds true to the predicate function.

Example:

To check if at least one element in the array is even.

val df= Seq(Seq(2,4,6),Seq(5,10,3)).toDF("num_array")
df.withColumn("flag",exists($"num_array", x =>lit(x%2===0)))
.show

filter

Apply the given predicate to all the elements in the array and to filter out the elements where the predicate holds true.

Example:

To filter out only the even elements in the Array.

val df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF("num_array")
df.withColumn("even_array",filter($"num_array", x =>lit(x%2===0)))
.show

aggregate

Reduces the given array and another value/state to a single value with the given function and applies the optional finish function to convert the reduced value to another state/value.

Example:

To add 10 to the sum of the array and multiply the result with 2

val df = Seq((Seq(2,4,6),3),(Seq(5,10,3),8))
.toDF("num_array","constant")
df.withColumn("reduced_array",aggregate($"num_array", $"constant",(x,y)=>x+y,x => x*2))
.show

Function introduced in Spark 3.0 for Spark SQL mode

Following are the new SQL functions, which you can take advantage of only in Spark SQL mode.

acosh

Find the inverse of hyperbolic cosine of a given expression.

asinh

Find the inverse of hyperbolic sin of a given expression.

atanh

Find the inverse of hyperbolic tangent of a given expression.

bit_and, bit_or and bit_xor

To compute the bit-wise AND, OR and XOR values

bit_count

Returns the number of bits count.

bool_and and bool_or

To validate if all the values of the expression is true or to validate at least one of the expression is true.

count_if

Returns number of true values in a column

Example:

To find out the no of even values in the given column

var df = Seq((1),(2),(4)).toDF("num")

df.createOrReplaceTempView("table")
spark.sql("select count_if(num %2==0) from table").show

date_part

To extract a part of a date/timestamps like hour,minutes etc…

div

Used to divide expression or an column with another expression/column

every and some

This function returns true, if the given expression evaluates true for all the column values for every and at least one value evaluates to true for some

make_date, make_interval and make_timestamp

To construct date, timestamp and specific intervals.

Example:

SELECT make_timestamp(2020, 01, 7, 30, 45.887)

max_by and min_by

Compares two columns and returns the value of left column which is associated with the maximum/minimum value of right column

Example:

var df = Seq((1,1),(2,1),(4,3)).toDF("x","y")

df.createOrReplaceTempView("table")
spark.sql("select max_by(x,y) from table").show

typeof

Returns the data type of the column values

version

Returns the Spark version along with its git version

justify_days, justify_hours and justify_interval

The newly introduced justify functions are used to adjust the time interval.

Example:

To represent 30 days as month,

SELECT justify_days(interval '30 day')

Partition Transform functions

From Spark 3.0 on wards, there exists some new functions which are helpful with partitioning the data which I’ll cover in a separate article.

Overall, we’ve analyzed all the data transformation and analytical function which are part of spark from 3.0. Hope this guide helps in understand about these new functions. These functions will definitely accelerate spark development works and assist to build solid and efficient spark pipelines.

If you have any queries, throw that to me on twitter.

--

--

Arun Jijo
Javarevisited

Data engineer at DataKare Solutions who gained expertise at Apache Nifi, Kafka, Spark and passionate in Java.