AQE (Adaptive Query Execution)¶
To enable AQE,
you have to set spark.sql.adaptive.enabled
to true
(using --conf spark.sql.adaptive.enabled=true
in spark-submit
or using `spark.config("spark.sql.adaptive,enabled", "true") in Spark/PySpark code.)
Pandas UDFs¶
Pandas UDFs are user defined functions
that are executed by Spark using Arrow
to transfer data to Pandas to work with the data,
which allows vectorized operations.
A Pandas UDF is defined using pandas_udf
as a decorator or to wrap the function,
and no additional configuration is required.
A Pandas UDF behaves as a regular PySpark function API in general.
Pandas UDFs are introdduced in Spark 2.3.
However,
it has been greatly simplfied and made more Pythonic in Spark 3.0.
Before Spark 3.0,
Pandas UDFs used to be defined with PandasUDFType
.
From Spark 3.0 with Python 3.6+,
you can also use Python type hints.
Using Python type hints are preferred
and using PandasUDFType will be deprecated in the future release.
Note that the type hint should use pandas.Series
in all cases
but there is one variant that pandas.DataFrame
should be used
for its input or output type hint instead
when the input or output column is of StructType.
The following example shows a Pandas UDF
which takes long column, string column and struct column, and outputs a struct column.
It requires the function to specify the type hints of pandas.Series and pandas.DataFrame as below:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3["col2"] = s1 + s2.str.len()
return s3
# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>",
)
df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)
df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)
References¶