Spark Flattening Row using explode function
Example 1.
This a simple case of row flattening and generating cartesian product.
Source
+---+------+---------+-----------+
| id |itemid|quntity | price |
+---+------+---------+-----------+
| A| 1,2,3 | 2,2,1 | 30,19,10 |
| B| 3,5 | 5,8 | 18,40 |
+---+------+---------+------------+
Note : Source table contains columns with value as String delimited by ",";
Target
+---+------+---------+---------+
| id|itemid|quntity | price |
+---+------+---------+---------+
| A| 1| 2| 30|
| A| 1| 2| 19|
| A| 1| 2| 10|
| A| 1| 2| 30|
| A| 1| 2| 19|
| A| 1| 2| 10|
| A| 1| 1| 30|
| A| 1| 1| 19|
| A| 1| 1| 10|
| A| 2| 2| 30|
| A| 2| 2| 19|
| A| 2| 2| 10|
| A| 2| 2| 30|
| A| 2| 2| 19|
| A| 2| 2| 10|
| A| 2| 1| 30|
| A| 2| 1| 19|
| A| 2| 1| 10|
| A| 3| 2| 30|
| A| 3| 2| 19|
+---+------+---------+---------+
First lets create a dataframe for the source table
val seq = Seq(
("A","1,2,3","2,2,1","30,19,10"),
("B","3,5","5,8","18,40")
)
val df = seq.toDF("id","itemid","quantity","price");
df.show
+---+------+---------+-----------+
| id |itemid|quntity | price |
+---+------+---------+-----------+
| A| 1,2,3 | 2,2,1 | 30,19,10 |
| B| 3,5 | 5,8 | 18,40 |
+---+------+---------+------------+
| id |itemid|quntity | price |
+---+------+---------+-----------+
| A| 1,2,3 | 2,2,1 | 30,19,10 |
| B| 3,5 | 5,8 | 18,40 |
+---+------+---------+------------+
Next step is to convert delimited column string values to array
decare a UDF(user defined function) as
val toArrayVal = udf((value : String) => value.split(","))
udf is splitting string on "," and converting to array
Next step is to update the df column to store array instead of string because explode()
take column with stored type as array
val dfNew =
df.withColumn("itemid",toArrayVal(df("itemid")))
.withColumn("quantity", toArrayVal(df("quantity")))
.withColumn("price", toArrayVal(df("price")))
dfNew.show
+---+---------+---------+------------+
| id| itemid| quntity | price |
+---+---------+---------+------------+
| A|[1, 2, 3]|[2, 2, 1]|[30, 19, 10]|
| B| [3, 5]| [5, 8]| [18, 40]|
+---+---------+---------+------------+
Output of dfNew.show shows that string delimited values are transformed to array
Final step is to flatten the rows using explode function
val dfFinal =
dfNew .select($"id",explode($"itemid").as("itemid"),$"quantity",$"price")
.select($"id",$"itemid",explode($"quantity").as("quantity"),$"price")
.select($"id",$"itemid",$"quantity",explode($"price").as("price"))
dfFinal.show
+---+------+---------+---------+
| id|itemid|quntity | price |
+---+------+---------+---------+
| A| 1| 2| 30|
| A| 1| 2| 19|
| A| 1| 2| 10|
| A| 1| 2| 30|
| A| 1| 2| 19|
| A| 1| 2| 10|
| A| 1| 1| 30|
| A| 1| 1| 19|
| A| 1| 1| 10|
| A| 2| 2| 30|
| A| 2| 2| 19|
| A| 2| 2| 10|
| A| 2| 2| 30|
| A| 2| 2| 19|
| A| 2| 2| 10|
| A| 2| 1| 30|
| A| 2| 1| 19|
| A| 2| 1| 10|
| A| 3| 2| 30|
| A| 3| 2| 19|
+---+------+---------+---------+
Example 2
In this example we have same source table
source
id | itemid | quantity | price
-------------------------------------------------
A | 1,2,3 | 2,2,1 | 30,19,10
B | 3,5 | 5,8 | 18,40
-------------------------------------------------
A | 1,2,3 | 2,2,1 | 30,19,10
B | 3,5 | 5,8 | 18,40
target
id | itemid | quantity | price
-------------------------------------------------
A | 1 | 2 | 30
A | 2 | 2 | 19
A | 3 | 1 | 10
B | 3 | 5 | 18
B | 5 | 8 | 40
-------------------------------------------------
A | 1 | 2 | 30
A | 2 | 2 | 19
A | 3 | 1 | 10
B | 3 | 5 | 18
B | 5 | 8 | 40
to start with we will use first two steps from example 1
1. Create dataframe for source table
2. Tranform delimited string value to array
Create a new udf using split and zip function in scala
The zip method takes another collection as parameter and will merge its elements with the elements of the current collection to create a new collection consisting of pairs or Tuple2 elements from both collections.
val splitAndZip = udf((col1:String,col2:String,col3:String) => {
col1.split(',').zip(col2.split(',')).zip(col3.split(',')).map{case ((a,b),c) => (a,b,c)}
})
Final step is to apply udf and select on dataframe to get final dataframe with target structure
val dfFinal =
df .withColumn("out",explode(splitAndZip($"itemid",$"quantity",$"price")))
.select(
$"id",
$"out._1".as("itemid"),
$"out._2".as("quantity"),
$"out._3".as("price")
)
dfFinal.show
+---+------+--------+-----+
| id|itemid|quantity|price|
+---+------+--------+-----+
| A| 1| 2| 30 |
| A| 2| 2| 19 |
| A| 3| 1| 10 |
| B| 3| 5| 18 |
| B| 5| 8| 40 |
+---+------+--------+-----+
Comments
Post a Comment