Ben Chuanlong Du's Blog

It is never too late to learn.

The select Function in Spark DataFrame

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder().
    appName("Spark SQL basic example").
    getOrCreate()
spark
spark = org.apache.spark.sql.SparkSession@749b1121
In [2]:
val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Select Using String Name

In [12]:
df.select("name").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

Select Using $ Sign (Syntax Sugar)

In [13]:
df.select($"name", $"age" + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

In [16]:
df.select($"age" > 21).show()
+----------+
|(age > 21)|
+----------+
|      null|
|      true|
|     false|
+----------+

Renaming Column When Selecting

In [6]:
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

In [8]:
df.select($"age".alias("year")).show
+----+
|year|
+----+
|null|
|  30|
|  19|
+----+

Star in Select

Notice that * can be used to select all columns from a table.

In [2]:
val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

df = [age: bigint, name: string]
Out[2]:
[age: bigint, name: string]
In [3]:
df.select("*").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

However, table.* is not supported. Of course, there are several alternatives for this.

In [4]:
df.select("df.*").show
Out[4]:
Name: org.apache.spark.sql.AnalysisException
Message: cannot resolve 'df.*' give input columns '';
StackTrace:   at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:278)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$buildExpandedProjectList$1.apply(Analyzer.scala:878)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$buildExpandedProjectList$1.apply(Analyzer.scala:876)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$buildExpandedProjectList(Analyzer.scala:876)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:795)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:790)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:790)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:668)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:1171)
In [5]:
df.select($"*").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Column Alias in Select CANNOT be Referenced in Select

If you define a column alias in select, you cannot use it in the same select clause. This is inconvenient.

In [2]:
val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

df = [age: bigint, name: string]
Out[2]:
[age: bigint, name: string]
In [4]:
import org.apache.spark.sql.functions._
df.select(
    $"age",
    lower($"name").alias("name_lower"),
    upper($"name_lower").alias("name_upper")
).show
Out[4]:
Name: org.apache.spark.sql.AnalysisException
Message: cannot resolve '`name_lower`' given input columns: [age, name];;
'Project [age#8L, lower(name#9) AS name_lower#19, upper('name_lower) AS name_upper#20]
+- Relation[age#8L,name#9] json

StackTrace: 'Project [age#8L, lower(name#9) AS name_lower#19, upper('name_lower) AS name_upper#20]
+- Relation[age#8L,name#9] json
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)

Reorder Columns

In [22]:
import org.apache.spark.sql.functions._

val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

df = [age: bigint, name: string]
Out[22]:
[age: bigint, name: string]
In [5]:
df.columns.sorted(Ordering[String].reverse)
Out[5]:
[name, age]

Comment

Living genrated strings won't work in select. You have to use Column type instead.

In [26]:
df.select(
    df.columns.sorted(Ordering[String].reverse).map(c => col(c)): _*
).show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

In [23]:
df.select(df.columns.sorted(Ordering[String].reverse): _*)
Out[23]:
Name: Compile Error
Message: <console>:33: error: no `: _*' annotation allowed here
(such annotations are only allowed in arguments to *-parameters)
       df.select(df.columns.sorted(Ordering[String].reverse): _*)
                                                            ^

StackTrace: 

Comments