Ben Chuanlong Du's Blog

It is never too late to learn.

Conversion Between PySpark DataFrames and pandas DataFrames

Comments

  1. A PySpark DataFrame can be converted to a pandas DataFrame by calling the method DataFrame.toPandas, and a pandas DataFrame can be converted to a PySpark DataFrame by calling SparkSession.createDataFrame. Notice that when you call DataFrame.toPandas to convert a Spark DataFrame to a pandas DataFrame, the whole Spark DataFrame is collected to the driver machine! This means that you should only call the method DataFrame.toPandas when the Spark DataFrame is small enough to fit into the memory of the driver machine. When a Spark DataFrame is too big to be collected on to the driver machine and you'd like to manipulate the DataFrame in Python (e.g., do model inference/prediction leveraging Python machine learning libraries), it is best to use pandas UDF to achieve this. When applying a pandas UDF to a Spark DataFrame, the padnas UDF is applied to each partition of the Spark DataFrame independently. Of course each partition of the Spark DataFrame is automatically converted to a pandas DataFrame and get converted back to a Spark DataFrame after the manipulation in Python is done. For more discussions on pandas UDF, please refer to User-defined Functions (UDF) in PySpark .

  2. Apache Arrow can be leveraged to convert between Spark DataFrame and pandas DataFrame without data copying. However, there are some restrictions on this.

    • Ensure that pandas==0.24.2 and pyArrow==0.15.1 are installed.
    • Set spark.sql.execution.arrow.pyspark.enabled to be true to enable conversion using Apache Arrow.
    • Since Spark 3, all Spark SQL data types are supported by Arrow-based conversion except MapType, ArrayType of TimestampType, and nested StructType. NOTE that BinaryType is supported by Arrow-based conversion since Spark 2.4. If you need to work on BinaryType data leveraing Apache Arrow, the minimum Spark version you have to use is Spark 2.4.
    • Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow record batches can be adjusted by setting the conf spark.sql.execution.arrow.maxRecordsPerBatch to an integer that will determine the maximum number of rows for each batch. The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition will be made into 1 or more record batches for processing.
      • If you use Spark 2 with pyArrow>=0.15.0, you need to set the environment variable ARROW_PRE_0_15_IPC_FORMAT=1, which can be achieved using spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1 and spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1.
Please refer to 
[PySpark Usage Guide for Pandas with Apache Arrow](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)
for more details.

  1. The perhaps most convenient way to create an ad hoc PySpark DataFrame is to first create a pandas DataFrame and then convert it to a PySpark DataFrame (using SparkSession.createDataFrame).
In [1]:
import pandas as pd
import findspark

findspark.init("/opt/spark")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("PySpark_pandas").enableHiveSupport().getOrCreate()
In [6]:
df_p = pd.DataFrame(
    data=[
        ["Ben", 2, 30],
        ["Dan", 4, 25],
        ["Will", 1, 26],
    ],
    columns=["name", "id", "age"],
)
df_p
Out[6]:
name id age
0 Ben 2 30
1 Dan 4 25
2 Will 1 26
In [9]:
df1 = spark.createDataFrame(df_p)
df1.show()
+----+---+---+
|name| id|age|
+----+---+---+
| Ben|  2| 30|
| Dan|  4| 25|
|Will|  1| 26|
+----+---+---+

Comments