Comments¶
A PySpark DataFrame can be converted to a pandas DataFrame by calling the method
DataFrame.toPandas
, and a pandas DataFrame can be converted to a PySpark DataFrame by callingSparkSession.createDataFrame
. Notice that when you callDataFrame.toPandas
to convert a Spark DataFrame to a pandas DataFrame, the whole Spark DataFrame is collected to the driver machine! This means that you should only call the methodDataFrame.toPandas
when the Spark DataFrame is small enough to fit into the memory of the driver machine. When a Spark DataFrame is too big to be collected on to the driver machine and you'd like to manipulate the DataFrame in Python (e.g., do model inference/prediction leveraging Python machine learning libraries), it is best to use pandas UDF to achieve this. When applying a pandas UDF to a Spark DataFrame, the padnas UDF is applied to each partition of the Spark DataFrame independently. Of course each partition of the Spark DataFrame is automatically converted to a pandas DataFrame and get converted back to a Spark DataFrame after the manipulation in Python is done. For more discussions on pandas UDF, please refer to User-defined Functions (UDF) in PySpark .Apache Arrow can be leveraged to convert between Spark DataFrame and pandas DataFrame without data copying. However, there are some restrictions on this.
- Ensure that
pandas==0.24.2
andpyArrow==0.15.1
are installed. - Set
spark.sql.execution.arrow.pyspark.enabled
to betrue
to enable conversion using Apache Arrow. - Since Spark 3, all Spark SQL data types are supported by Arrow-based conversion except MapType, ArrayType of TimestampType, and nested StructType. NOTE that BinaryType is supported by Arrow-based conversion since Spark 2.4. If you need to work on BinaryType data leveraing Apache Arrow, the minimum Spark version you have to use is Spark 2.4.
- Data partitions in Spark are converted into Arrow record batches,
which can temporarily lead to high memory usage in the JVM.
To avoid possible out of memory exceptions,
the size of the Arrow record batches can be adjusted
by setting the conf
spark.sql.execution.arrow.maxRecordsPerBatch
to an integer that will determine the maximum number of rows for each batch. The default value is10,000
records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition will be made into 1 or more record batches for processing.- If you use Spark 2 with
pyArrow>=0.15.0
, you need to set the environment variableARROW_PRE_0_15_IPC_FORMAT=1
, which can be achieved usingspark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1
andspark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1
.
- If you use Spark 2 with
- Ensure that
Please refer to
[PySpark Usage Guide for Pandas with Apache Arrow](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)
for more details.
- The perhaps most convenient way to create an ad hoc PySpark DataFrame
is to first create a pandas DataFrame
and then convert it to a PySpark DataFrame (using
SparkSession.createDataFrame
).
import pandas as pd
import findspark
findspark.init("/opt/spark")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark_pandas").enableHiveSupport().getOrCreate()
df_p = pd.DataFrame(
data=[
["Ben", 2, 30],
["Dan", 4, 25],
["Will", 1, 26],
],
columns=["name", "id", "age"],
)
df_p
df1 = spark.createDataFrame(df_p)
df1.show()
References¶
https://stackoverflow.com/questions/37612622/spark-unionall-multiple-dataframes
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions