Window with orderBy¶
It is tricky!!!
If you provide ORDER BY clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:
https://stackoverflow.com/questions/52273186/pyspark-spark-window-function-first-last-issue
Avoid using last and use first with
descending order by
instead. This gives less surprisings.Do NOT use order by if not necessary. It introduces unnecessary ...
import findspark
findspark.init("/opt/spark")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()
from pyspark.sql import Window
import pandas as pd
cust_p = pd.DataFrame(
data=[
("Alice", "2016-05-01", 50.00, 1),
("Alice", "2016-05-01", 45.00, 2),
("Alice", "2016-05-02", 55.00, 3),
("Alice", "2016-05-02", 100.00, 4),
("Bob", "2016-05-01", 25.00, 5),
("Bob", "2016-05-01", 29.00, 6),
("Bob", "2016-05-02", 27.00, 7),
("Bob", "2016-05-02", 30.00, 8),
],
columns=("name", "date", "amount", "id"),
)
cust_p
cust = spark.createDataFrame(cust_p)
cust.show()
cust.orderBy("name", "date").show()
Create a temp view for testing Spark SQL (to compare with the result of PySpark DataFrame API).
cust.createOrReplaceTempView("customers")
max¶
max
works well on over ... partition ...
when order by
is not used.
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
max(col("amount")).over(Window.partitionBy("name", "date")).alias("max_amount"),
).orderBy("name", "date").show()
spark.sql(
"""
select
name,
date,
amount,
id,
max(amount) over (partition by name, date) as max_amount
from
customers
"""
).orderBy("name", "date").show()
The following results might surprise people.
There is nothing wrong in code.
It is only that when order by
is used,
the default frame for window functions (max
in this case) is unbounded preceding and the current row.
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
max(col("amount"))
.over(Window.partitionBy("name", "date").orderBy("id"))
.alias("max_amount"),
).orderBy("name", "date").show()
spark.sql(
"""
select
name,
date,
amount,
id,
max(amount) over (partition by name, date order by id) as max_amount
from
customers
"""
).orderBy("name", "date").show()
rank¶
The window function rank
requires order by
to be used.
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
rank().over(Window.partitionBy("name", "date").orderBy("amount")).alias("rank"),
).orderBy("name", "date").show()
spark.sql(
"""
select
name,
date,
amount,
id,
rank() over (partition by name, date ORDER BY amount DESC) as rank
from
customers
"""
).orderBy("name", "date").show()
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
rank().over(Window.partitionBy("name").orderBy("date")).alias("rank"),
).orderBy("name").show()
spark.sql(
"""
select
name,
date,
amount,
id,
rank() over (partition by name ORDER BY date DESC) as rank
from
customers
"""
).orderBy("name").show()
dense_rank¶
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
dense_rank().over(Window.partitionBy("name").orderBy("date")).alias("dense_rank"),
).orderBy("name").show()
spark.sql(
"""
select
name,
date,
amount,
id,
dense_rank() over (partition by name ORDER BY date DESC) as rank
from
customers
"""
).orderBy("name").show()
first¶
customers.select(
$"name",
$"date",
$"amount",
$"id",
first($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("first_amount")
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
first(amount) over (partition by name, date order by id) as first_amount
from
customers
"""
).orderBy("name", "date").show
customers.select(
$"name",
$"date",
$"amount",
$"id",
last($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("last_amount")
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
last(amount) over (partition by name, date order by id) as last_amount
from
customers
"""
).orderBy("name", "date").show
customers.select(
$"name",
$"date",
$"amount",
$"id",
first($"amount").over(Window.partitionBy("name", "date").orderBy($"id".desc)).alias("last_amount")
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
first(amount) over (partition by name, date order by id desc) as last_amount
from
customers
"""
).orderBy("name", "date").show
partition by with group by¶
Avoid doing so!!!
spark.sql(
"""
select
name,
date,
first(amount) over (partition by name, date order by id desc) as last_amount
from
customers
group by
name, date
"""
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
first(max(amount)) over (partition by name, date order by id desc) as last_amount
from
customers
group by
name, date
"""
).orderBy("name", "date").show
customers.orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
row_number() over (partition by name, date order by id desc) as rownum
from
customers
"""
).orderBy("name", "date").show
spark.sql(
"""
select
*
from (
select
name,
date,
amount,
id,
row_number() over (partition by name, date order by id desc) as rownum
from
customers
) A
where
rownum = 1
"""
).orderBy("name", "date").show
Define a window parition. It does not have to be associated with a table.
Window.partitionBy("col1", "col2")
val customers = Seq(
("Alice", "2016-05-01", 50.00, 1),
("Alice", "2016-05-01", 45.00, 2),
("Alice", "2016-05-02", 55.00, 3),
("Alice", "2016-05-02", 100.00, 4),
("Bob", "2016-05-01", 25.00, 5),
("Bob", "2016-05-01", 29.00, 6),
("Bob", "2016-05-02", 27.00,7 ),
("Bob", "2016-05-02", 30.00, 8)
).toDF("name", "date", "amount", "id")
customers.show
val winSpec = Window.partitionBy("name", "date")
customers.select(
$"name",
$"date",
$"amount",
$"id",
avg($"amount").over(winSpec).alias("avg_amt"),
max($"id").over(winSpec).alias("max_id")
).show
val winSpec = Window.partitionBy("name", "date")
customers.select(
$"name",
$"date",
$"amount",
$"id",
(avg($"amount").over(winSpec) + max($"id").over(winSpec) * 100).alias("new_column")
).show
customers.withColumn("avg",
avg($"amount").over(Window.partitionBy("name", "date"))
).show()
val customers = Seq(
("Alice", "2016-05-01", 50.00),
("Alice", "2016-05-03", 45.00),
("Alice", "2016-05-04", 55.00),
("Bob", "2016-05-01", 25.00),
("Bob", "2016-05-04", 29.00),
("Bob", "2016-05-06", 27.00)
).toDF("name", "date", "amountSpent")
customers.show
Moving Average¶
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
customers.withColumn("movingAvg", avg(customers("amountSpent")).over(wSpec1)).show()
Cumulative Sum¶
val wSpec2 = Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue, 0)
customers.withColumn("cumSum", sum(customers("amountSpent")).over(wSpec2)).show()
Data from previous row¶
val wSpec3 = Window.partitionBy("name").orderBy("date")
customers.withColumn(
"prevAmountSpent", lag(customers("amountSpent"), 1).over(wSpec3)
).show()
row_number¶
val wSpec3 = Window.partitionBy("name").orderBy("date")
customers.withColumn("row_num",
row_number().over(wSpec3)
).show()
percentRank¶
ntile¶
first¶
last¶
lag¶
lead¶
cume_dist¶
val customers = Seq(
("Alice", "2016-05-01", 50.00, 1),
("Alice", "2016-05-01", 45.00, 2),
("Alice", "2016-05-02", 55.00, 3),
("Alice", "2016-05-02", 100.00, 4),
("Bob", "2016-05-01", 25.00, 5),
("Bob", "2016-05-01", 29.00, 6),
("Bob", "2016-05-02", 27.00,7 ),
("Bob", "2016-05-02", 30.00, 8)
).toDF("name", "date", "amount", "id")
customers.orderBy("name", "date").show
customers.createOrReplaceTempView("customers")
Comment¶
Do NOT use orderBy
if the order does not matter!!!
val wSpec = Window.partitionBy("name", "date").orderBy("id")
customers.select(
$"name",
$"date",
$"amount",
$"id",
max($"amount").over(Window.partitionBy("name", "date")).alias("max_amount")
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
max(amount) over (partition by name, date) as max_amount
from
customers
"""
).orderBy("name", "date").show
customers.select(
$"name",
$"date",
$"amount",
$"id",
max($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("max_amount")
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
max(amount) over (partition by name, date order by id) as max_amount
from
customers
"""
).orderBy("name", "date").show
customers.select(
$"name",
$"date",
$"amount",
$"id",
first($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("first_amount")
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
first(amount) over (partition by name, date order by id) as first_amount
from
customers
"""
).orderBy("name", "date").show
customers.select(
$"name",
$"date",
$"amount",
$"id",
last($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("last_amount")
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
last(amount) over (partition by name, date order by id) as first_amount
from
customers
"""
).orderBy("name", "date").show
spark.sql(
"""
select
name,
date,
amount,
id,
first(amount) over (partition by name, date order by id desc) as first_amount
from
customers
"""
).orderBy("name", "date").show
val customers = Seq(
("Alice", "1", "2016-05-01", 50.00),
("Alice", "1", "2016-05-03", 45.00),
("Alice", "2", "2016-05-04", 55.00),
("Bob", "2", "2016-05-01", 25.00),
("Bob", "2", "2016-05-04", 29.00),
("Bob", "2", "2016-05-06", 27.00)
).toDF("name", "group", "date", "amountSpent")
customers.show
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val ws1 = Window.partitionBy("name").orderBy("date")
val ws2 = Window.partitionBy("group").orderBy("date")
customers.
withColumn("i", row_number().over(ws1)).
withColumn("j", row_number().over(ws2)).
show()