Aggregation Without Grouping¶
You can aggregate all values in Columns of a DataFrame. Just use aggregation functions in
selectwithoutgroupBy, which is very similar to SQL syntax.The aggregation functions
allandanyare available since Spark 3.0. However, they can be achieved using other aggregation functions such assumandcountin earlier versions.You can use both column expression and column names in aggreagation functions.
!/opt/pyenv/versions/3.7.9/bin/python -m pip install pandas pyarrow findspark
import pandas as pd
import findspark
findspark.init("/opt/spark-2.3.1-bin-hadoop2.7/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()
import pandas as pd
import findspark
findspark.init("/opt/spark-3.0.1-bin-hadoop3.2/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()
df = spark.createDataFrame(
pd.DataFrame(
data=(
("Ben", "Du", 0, True, 1),
("Ben", "Du", 0, True, 1),
("Ben", "Tu", 1, False, 0),
("Ben", "Tu", 3, False, 0),
("Ken", "Xu", 6, False, 0),
("Ken", "Xu", 9, False, 0),
),
columns=("fname", "lname", "score", "x", "y"),
)
)
df.show()
df.select(col("score").cast("boolean").cast("int")).show()
None is correctly recoginized as null when used in when(expr, val).otherwise(None).
df.select(when(col("score") >= 3, 1)).show()
df.select(when(col("score") >= 3, 1).otherwise(None)).show()
any¶
Available only as a SQL function (instead of a DataFrame API function) since Spark 3.0.
Works on boolean columns only.
from pyspark.sql.functions import any
df.createOrReplaceTempView("df")
spark.sql("select any(x) from df").show()
spark.sql("select any(x) from df where score > 2").show()
spark.sql("select any(y) from df").show()
df.select(
count("fname").alias("num_first_name"),
count("lname").alias("num_last_name"),
sum("score").alias("sum_score"),
).show()
Aggregation Using groupBy¶
You can use position alias in group by in Spark SQL!!!
df.show()
df.groupBy("lname").agg(sum("y")).show()
df.createOrReplaceTempView("people")
spark.sql("select fname, count(*) as n from people group by 1").show
sum¶
sumignoresnullWhen all values are
null,sumreturnsnull.
sum ignores null.
val df = Seq(
("2017-01-01", 1L),
("2017-01-01", 10L),
("2017-02-01", 2L),
("2017-02-01", 22L)
).toDF("date", "value").
withColumn("value", when($"value" > 20, null).otherwise($"value"))
df.show
df.groupBy("date").agg(sum($"value").alias("s")).show
When all values are null, sum returns null.
import org.apache.spark.sql.functions._
val df = spark.read.json("../data/people.json").
withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
Specify an alias for the column after aggregation.
df.groupBy("is_null").agg(sum("age").alias("sage")).show
Group By Multiple Columns¶
df.groupBy("fname", "lname").sum().show
val df = spark.read.json("../../data/people.json")
df.show
agg¶
import org.apache.spark.sql.functions._
val df = spark.read.json("../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
df.groupBy("is_null").agg(sum("age").alias("sage")).show
df.groupBy("is_null").agg(sum("age").alias("sage"), count("*").alias("cnt")).show
Collection¶
collect_list¶
import org.apache.spark.sql.functions._
val df = Seq(
("Ben", 1),
("Ben" ,2),
("Ben", 3),
("Ken", 1),
("Ken", 9)
).toDF("name", "score")
df.show
val df2 = df.groupBy("name").agg(
collect_list("score").alias("scores")
)
df2.show
df2.printSchema
collect_set¶
val df_copy = Seq(
("Ben", 1),
("Ben", 1),
("Ben" ,2),
("Ben", 3),
("Ken", 1),
("Ken", 9)
).toDF("name", "score")
df_copy.show
val df3 = df_copy.groupBy("name").agg(collect_list("score").alias("scores"))
df3.show()
val df4 = df_copy.groupBy("name").agg(collect_set("score").alias("scores"))
df4.show
First/Last¶
first¶
last¶
Grouping¶
grouping¶
grouping_id¶
val df = spark.read.json("../data/people.json")
df.show
Count¶
count¶
import org.apache.spark.sql.functions._
val df = spark.read.json("../../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
df.groupBy("is_null").count().show()
df.groupBy("is_null").agg(count("*").as("total")).show
df.groupBy("is_null").agg(count(when($"name" === "Andy", 1).otherwise(null))).show
df.groupBy("is_null").agg(sum(when($"name" === "Andy", 1).otherwise(0))).show
df.groupBy("is_null").agg(count("*").alias("total")).show
countDistinct¶
df.groupBy("is_null").agg(countDistinct("is_null").alias("total")).show
approx_count_distinct¶
Sum¶
sum¶
sumDistinct¶
Extreme Values¶
max¶
min¶
Mean/Average¶
avg¶
How does average behave on null values?
mean¶
Standard Deviation¶
stddev¶
stddev_pop¶
stddev_samp¶
Variance¶
var_pop¶
var_sample¶
variance¶
Correlation & Covariance¶
corr¶
covar_pop¶
covar_samp¶
Skewness & Kurtosis¶
skewness¶
kurtosis¶
References¶
https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/sql/RelationalGroupedDataset.html
https://spark.apache.org/docs/latest/sql-programming-guide.html
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
https://stackoverflow.com/questions/37612622/spark-unionall-multiple-dataframes
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/functions.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html