In [1]:
import findspark
findspark.init("/opt/spark")
In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()
In [3]:
file_flight = "../../home/media/data/flights14.csv"
Load Data in CSV Format¶
.load
is a general method for reading data in different format. You have to specify the format of the data via the method.format
of course..csv
(both for CSV and TSV),.json
and.parquet
are specializations of.load
..format
is optional if you use a specific loading function (csv, json, etc.).No header by default.
.coalesece(1)
orrepartition(1)
if you want to write to only 1 file.
Using load
¶
In [6]:
flights = (
spark.read.format("csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load(file_flight)
)
flights.show(5)
In [9]:
val flights = spark.read.
format("csv").
load(fileFlight)
flights.show(5)
Out[9]:
Using csv
¶
In [10]:
val flights = spark.read.
format("csv").
option("header", "true"). // false by default
option("mode", "DROPMALFORMED").
csv(fileFlight)
flights.show(5)
Out[10]:
In [11]:
val flights = spark.read.option("header", "true").csv(fileFlight)
flights.show(5)
Out[11]:
In [12]:
val flights = spark.read.csv(fileFlight)
flights.show(5)
Out[12]:
In [14]:
val flights = spark.sql("SELECT * FROM csv.`../../home/media/data/flights14.csv`")
flights.show
Out[14]:
In [ ]:
import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("project", StringType, true),
StructField("article", StringType, true),
StructField("requests", IntegerType, true),
StructField("bytes_served", DoubleType, true))
)
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("header", "true")
.schema(customSchema)
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Write DataFrame to CSV¶
In [15]:
val flights = spark.read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
csv(fileFlight)
flights.write.option("header", "true").csv("f2.csv")
Out[15]:
Misc Configurations¶
In [ ]:
df.write.
mode("overwrite").
format("parquet").
option("compression", "none").
save("/tmp/file_no_compression_parq")
df.write.
mode("overwrite").
format("parquet").
option("compression", "gzip").
save("/tmp/file_with_gzip_parq")
df.write.
mode("overwrite").
format("parquet").
option("compression", "snappy").
save("/tmp/file_with_snappy_parq")
df.write.mode("overwrite").format("orc").option("compression", "none").mode("overwrite").save("/tmp/file_no_compression_orc")
df.write.mode("overwrite").format("orc").option("compression", "snappy").mode("overwrite").save("/tmp/file_with_snappy_orc")
df.write.mode("overwrite").format("orc").option("compression", "zlib").mode("overwrite").save("/tmp/file_with_zlib_orc")
Output with a Single Header¶
https://stackoverflow.com/questions/38056152/merge-spark-output-csv-files-with-a-single-header
In [ ]:
dataFrame
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)
In [ ]:
dataFrame
.repartition(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)