Spark Flattening Row using explode function

Example 1.


 This a simple case of row flattening and generating cartesian product.

Source
+---+------+---------+-----------+
| id |itemid|quntity   |   price      |
+---+------+---------+-----------+
|  A| 1,2,3 |    2,2,1  | 30,19,10  |
|  B|   3,5  |      5,8   |    18,40    |
+---+------+---------+------------+
Note : Source table contains columns with value as String delimited by ",";

Target
+---+------+---------+---------+
| id|itemid|quntity    |  price   |
+---+------+---------+---------+
|  A|     1|        2|       30|
|  A|     1|        2|       19|
|  A|     1|        2|       10|
|  A|     1|        2|       30|
|  A|     1|        2|       19|
|  A|     1|        2|       10|
|  A|     1|        1|       30|
|  A|     1|        1|       19|
|  A|     1|        1|       10|
|  A|     2|        2|       30|
|  A|     2|        2|       19|
|  A|     2|        2|       10|
|  A|     2|        2|       30|
|  A|     2|        2|       19|
|  A|     2|        2|       10|
|  A|     2|        1|       30|
|  A|     2|        1|       19|
|  A|     2|        1|       10|
|  A|     3|        2|       30|
|  A|     3|        2|       19|
+---+------+---------+---------+


First lets create a dataframe for the source table  


val seq = Seq(
  ("A","1,2,3","2,2,1","30,19,10"),
  ("B","3,5","5,8","18,40")
)

val df =  seq.toDF("id","itemid","quantity","price");

df.show

+---+------+---------+-----------+
| id |itemid|quntity   |   price      |
+---+------+---------+-----------+
|  A| 1,2,3 |    2,2,1  | 30,19,10  |
|  B|   3,5  |      5,8   |    18,40    |
+---+------+---------+------------+


Next step is to convert delimited column string values to array

decare a UDF(user defined function)  as 

val toArrayVal = udf((value : String) => value.split(","))

udf  is splitting string on ","  and converting to array

Next step is to update the df column to store  array instead of string because explode()
take column with stored type  as array

val  dfNew = 
df.withColumn("itemid",toArrayVal(df("itemid")))
.withColumn("quantity", toArrayVal(df("quantity")))
.withColumn("price", toArrayVal(df("price")))

dfNew.show

+---+---------+---------+------------+
| id|   itemid| quntity   |  price        |
+---+---------+---------+------------+
|  A|[1, 2, 3]|[2, 2, 1]|[30, 19, 10]|
|  B|   [3, 5]|   [5, 8]|    [18, 40]|
+---+---------+---------+------------+

Output of dfNew.show  shows that string delimited values are transformed to array

Final step is to flatten the rows using explode function

val dfFinal = 
dfNew .select($"id",explode($"itemid").as("itemid"),$"quantity",$"price")
.select($"id",$"itemid",explode($"quantity").as("quantity"),$"price")
.select($"id",$"itemid",$"quantity",explode($"price").as("price"))

dfFinal.show

+---+------+---------+---------+
| id|itemid|quntity    |  price   |
+---+------+---------+---------+
|  A|     1|        2|       30|
|  A|     1|        2|       19|
|  A|     1|        2|       10|
|  A|     1|        2|       30|
|  A|     1|        2|       19|
|  A|     1|        2|       10|
|  A|     1|        1|       30|
|  A|     1|        1|       19|
|  A|     1|        1|       10|
|  A|     2|        2|       30|
|  A|     2|        2|       19|
|  A|     2|        2|       10|
|  A|     2|        2|       30|
|  A|     2|        2|       19|
|  A|     2|        2|       10|
|  A|     2|        1|       30|
|  A|     2|        1|       19|
|  A|     2|        1|       10|
|  A|     3|        2|       30|
|  A|     3|        2|       19|
+---+------+---------+---------+




Example 2

  
In this example we have same source table

source
id   |   itemid   |   quantity  |  price
-------------------------------------------------
 A   |    1,2,3   |   2,2,1       |  30,19,10
 B   |    3,5     |   5,8         |  18,40

target
id   |   itemid   |   quantity | price
-------------------------------------------------
 A   |     1      |      2        |     30
 A   |     2      |      2        |     19
 A   |     3      |      1        |     10
 B   |     3      |      5        |     18
 B   |     5      |      8        |     40


to start with  we will use first two  steps from example 1
  1. Create dataframe for source table
  2. Tranform delimited string value to array

Create a new udf  using  split and zip function  in scala

The zip method takes another collection as parameter and will merge its elements with the elements of the current collection to create a new collection consisting of pairs or Tuple2 elements from both collections.


val splitAndZip = udf((col1:String,col2:String,col3:String) => {
  col1.split(',').zip(col2.split(',')).zip(col3.split(',')).map{case ((a,b),c) => (a,b,c)}
})

Final step is to apply udf and select on dataframe to get final dataframe with target structure

val dfFinal = 
   df .withColumn("out",explode(splitAndZip($"itemid",$"quantity",$"price")))
  .select(
    $"id",
    $"out._1".as("itemid"),
    $"out._2".as("quantity"),
    $"out._3".as("price")
  )
  

dfFinal.show


+---+------+--------+-----+
| id|itemid|quantity|price|
+---+------+--------+-----+
|  A|     1|       2|   30     |
|  A|     2|       2|   19     |
|  A|     3|       1|   10     |
|  B|     3|       5|   18     |
|  B|     5|       8|   40     |
+---+------+--------+-----+




Comments