Tips and Traps¶
If you use PySpark instead of Spark/Scala, pandas udf is a great alternative to all those (complicated) collections functions discussed here. Leveraging pandas udf, each partition of a Spark DataFrame can be converted to a pandas DataFrame without copying the underlying data, you can then do transforms on pandas DataFrames which will be converted back to partitons of a Spark DataFrame.
When converting a pandas DataFrame to a Spark DataFrame,
- a column of
list
is converted to a column ofArrayType
- a column of
tuple
is converted to a column ofStructType
- a column of
dict
is converted to a column ofMapType
- a column of
Q: how about dict?
Comemnts¶
There are multiple ways (vanilla string, JSON string, StructType and ArrayType) to represent complex data types in Spark DataFrames. Notice that a Tuple is converted to a StructType in Spark DataFrames and an Array is converted to a ArrayType in Spark DataFrames. Starting from Spark 2.4, you can use ArrayType which is more convenient if the elements have the same type.
Vanilla String¶
- string, substring, regexp_extract, locate, left, concat_ws
JSON String¶
- json_tuple
- get_json_object
- from_json
StructType¶
ArrayType¶
- array
- element_at
- array_min, array_max, array_join, array_interesect, array_except, array_distinct, array_contains, array, array_position, array_remove, array_repeat, array_sort, array_union, array_overlap, array_zip
from typing import List, Tuple
import pandas as pd
from pathlib import Path
import findspark
findspark.init(str(next(Path("/opt").glob("spark-3*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import (
IntegerType,
StringType,
StructType,
StructField,
ArrayType,
)
spark = (
SparkSession.builder.appName("PySpark_Collection_Func")
.enableHiveSupport()
.getOrCreate()
)
Python Types to DataType in PySpark¶
A column of list
is converted to a Column of ArrayType
.
df = spark.createDataFrame(
pd.DataFrame(
data=[([1, 2], "how"), ([2, 3], "are"), ([3, 4], "you")],
columns=("col1", "col2"),
)
)
df.show()
df.schema
A column of tuple
is converted to a Column of StructType
.
df = spark.createDataFrame(
pd.DataFrame(
data=[((1, 2), "how"), ((2, 3), "are"), ((3, 4), "you")],
columns=("col1", "col2"),
)
)
df.show()
df.schema
A column of dict
is converted to a column of MapType
.
df = spark.createDataFrame(
pd.DataFrame(
data=[
({"x": 1, "y": 2}, "how"),
({"x": 2, "y": 3}, "are"),
({"x": 3, "y": 4}, "you"),
],
columns=("col1", "col2"),
)
)
df.show()
df.schema
df = spark.createDataFrame(
pd.DataFrame(
data=[(1, 2, "how"), (2, 3, "are"), (3, 4, "you")],
columns=("col1", "col2", "col3"),
)
)
df.show()
df.select(create_map("col3", "col1").alias("map")).show()
df.select(create_map([df.col3, df.col1]).alias("map")).show()
explode¶
spark.sql(
"""
select
split("how are you", " ") as words
"""
).show()
spark.sql(
"""
select
explode(split("how are you", " ")) as words
"""
).show()
split¶
collect¶
df = spark.createDataFrame(
pd.DataFrame(
data=[(1, 2, "how"), (2, 3, "are"), (3, 4, "you")],
columns=("col1", "col2", "col3"),
)
)
df.show()
df.select(struct("col1", "col2").alias("struct")).show()
df.select(struct([df.col1, df.col2]).alias("struct")).show()
df.select(struct("col1", "col2").alias("struct")).schema
Work with StructType¶
Notice that a Tuple is converted to StructType in Spark DataFrames.
df = spark.createDataFrame(
pd.DataFrame(
data=[((1, 2), "how"), ((2, 3), "are"), ((3, 4), "you")],
columns=("col1", "col2"),
)
)
df.show()
Split all elements of a StructType into different columns.
df.select("col1.*").show
Extract elements from StructTypes by position and rename the columns.
df.select(
$"col1._1".alias("v1"),
$"col1._2".alias("v2")
).show
Work with ArrayType¶
Notice that an Array is converted to an ArrayType in Spark DataFrames. Note: ArrayType requires Spark 2.4.0+.
val df = Seq(
(Array(1, 2), "how"),
(Array(2, 3), "are"),
(Array(3, 4), "you")
).toDF("col1", "col2")
df.show
df.select(
element_at($"col1", 1).alias("v1"),
element_at($"col1", 2).alias("v2")
).show
ArrayType¶
df = spark.createDataFrame(
pd.DataFrame(
data=[([1, 2], "how", 1), ([2, 3], "are", 2), ([3, 4], "you", 3)],
columns=["col1", "col2", "col3"],
)
)
df.show()
df.select(element_at(col("col1"), 1).alias("word")).show()
df.select(element_at("col1", 1).alias("word")).show()
@udf(ArrayType(IntegerType()))
def my_udf(x: int) -> List:
return [x, 1]
df1 = df.select(my_udf("col3").alias("f1"))
df1.show()
df1.schema
df1.select(element_at("f1", 1).alias("v1"), element_at("f1", 2).alias("v2")).show()
StructType¶
df2 = spark.createDataFrame(
pd.DataFrame(
data=[((1, 2), "how", 1), ((2, 3), "are", 2), ((3, 4), "you", 3)],
columns=["col1", "col2", "col3"],
)
)
df2.show()
df2.schema
df2.select("col1.*").show()
@udf(
StructType(
[
StructField("_1", IntegerType(), nullable=True),
StructField("_2", IntegerType(), nullable=True),
]
)
)
def my_udf2(x: int) -> Tuple:
return (x, 1)
df3 = df.select(my_udf2("col3").alias("f1"))
df3.show()
df3.schema
df3.select("f1.*").show()
df3.select(col("f1._1").alias("v1"), col("f1._2").alias("v2")).show()
References¶
https://sparkbyexamples.com/spark/spark-dataframe-map-maptype-column/
https://sparkbyexamples.com/spark/spark-how-to-convert-structtype-to-a-maptype/
https://mungingdata.com/apache-spark/maptype-columns/
https://docs.databricks.com/_static/notebooks/transform-complex-data-types-scala.html
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/functions.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html