Aggregation Without Grouping¶
You can aggregate all values in Columns of a DataFrame. Just use aggregation functions in
select
withoutgroupBy
, which is very similar to SQL syntax.The aggregation functions
all
andany
are available since Spark 3.0. However, they can be achieved using other aggregation functions such assum
andcount
in earlier versions.You can use both column expression and column names in aggreagation functions.
!/opt/pyenv/versions/3.7.9/bin/python -m pip install pandas pyarrow findspark
import pandas as pd
import findspark
findspark.init("/opt/spark-2.3.1-bin-hadoop2.7/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()
import pandas as pd
import findspark
findspark.init("/opt/spark-3.0.1-bin-hadoop3.2/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()
df = spark.createDataFrame(
pd.DataFrame(
data=(
("Ben", "Du", 0, True, 1),
("Ben", "Du", 0, True, 1),
("Ben", "Tu", 1, False, 0),
("Ben", "Tu", 3, False, 0),
("Ken", "Xu", 6, False, 0),
("Ken", "Xu", 9, False, 0),
),
columns=("fname", "lname", "score", "x", "y"),
)
)
df.show()
df.select(col("score").cast("boolean").cast("int")).show()
None
is correctly recoginized as null
when used in when(expr, val).otherwise(None)
.
df.select(when(col("score") >= 3, 1)).show()
df.select(when(col("score") >= 3, 1).otherwise(None)).show()
any¶
Available only as a SQL function (instead of a DataFrame API function) since Spark 3.0.
Works on boolean columns only.
from pyspark.sql.functions import any
df.createOrReplaceTempView("df")
spark.sql("select any(x) from df").show()
spark.sql("select any(x) from df where score > 2").show()
spark.sql("select any(y) from df").show()
df.select(
count("fname").alias("num_first_name"),
count("lname").alias("num_last_name"),
sum("score").alias("sum_score"),
).show()
Aggregation Using groupBy
¶
You can use position alias in group by in Spark SQL!!!
df.show()
df.groupBy("lname").agg(sum("y")).show()
df.createOrReplaceTempView("people")
spark.sql("select fname, count(*) as n from people group by 1").show
sum¶
sum
ignoresnull
When all values are
null
,sum
returnsnull
.
sum
ignores null
.
val df = Seq(
("2017-01-01", 1L),
("2017-01-01", 10L),
("2017-02-01", 2L),
("2017-02-01", 22L)
).toDF("date", "value").
withColumn("value", when($"value" > 20, null).otherwise($"value"))
df.show
df.groupBy("date").agg(sum($"value").alias("s")).show
When all values are null
, sum
returns null
.
import org.apache.spark.sql.functions._
val df = spark.read.json("../data/people.json").
withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
Specify an alias for the column after aggregation.
df.groupBy("is_null").agg(sum("age").alias("sage")).show
Group By Multiple Columns¶
df.groupBy("fname", "lname").sum().show
val df = spark.read.json("../../data/people.json")
df.show
agg¶
import org.apache.spark.sql.functions._
val df = spark.read.json("../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
df.groupBy("is_null").agg(sum("age").alias("sage")).show
df.groupBy("is_null").agg(sum("age").alias("sage"), count("*").alias("cnt")).show
Collection¶
collect_list¶
import org.apache.spark.sql.functions._
val df = Seq(
("Ben", 1),
("Ben" ,2),
("Ben", 3),
("Ken", 1),
("Ken", 9)
).toDF("name", "score")
df.show
val df2 = df.groupBy("name").agg(
collect_list("score").alias("scores")
)
df2.show
df2.printSchema
collect_set¶
val df_copy = Seq(
("Ben", 1),
("Ben", 1),
("Ben" ,2),
("Ben", 3),
("Ken", 1),
("Ken", 9)
).toDF("name", "score")
df_copy.show
val df3 = df_copy.groupBy("name").agg(collect_list("score").alias("scores"))
df3.show()
val df4 = df_copy.groupBy("name").agg(collect_set("score").alias("scores"))
df4.show
First/Last¶
first¶
last¶
Grouping¶
grouping¶
grouping_id¶
val df = spark.read.json("../data/people.json")
df.show
Count¶
count¶
import org.apache.spark.sql.functions._
val df = spark.read.json("../../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
df.groupBy("is_null").count().show()
df.groupBy("is_null").agg(count("*").as("total")).show
df.groupBy("is_null").agg(count(when($"name" === "Andy", 1).otherwise(null))).show
df.groupBy("is_null").agg(sum(when($"name" === "Andy", 1).otherwise(0))).show
df.groupBy("is_null").agg(count("*").alias("total")).show
countDistinct¶
df.groupBy("is_null").agg(countDistinct("is_null").alias("total")).show
approx_count_distinct¶
Sum¶
sum¶
sumDistinct¶
Extreme Values¶
max¶
min¶
Mean/Average¶
avg¶
How does average behave on null values?
mean¶
Standard Deviation¶
stddev¶
stddev_pop¶
stddev_samp¶
Variance¶
var_pop¶
var_sample¶
variance¶
Correlation & Covariance¶
corr¶
covar_pop¶
covar_samp¶
Skewness & Kurtosis¶
skewness¶
kurtosis¶
References¶
https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/sql/RelationalGroupedDataset.html
https://spark.apache.org/docs/latest/sql-programming-guide.html
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
https://stackoverflow.com/questions/37612622/spark-unionall-multiple-dataframes
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/functions.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html