In [1]:
import findspark
findspark.init("/opt/spark")
In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()
In [3]:
file_flight = "../../home/media/data/flights14.csv"
Load Data in CSV Format¶
.loadis a general method for reading data in different format. You have to specify the format of the data via the method.formatof course..csv(both for CSV and TSV),.jsonand.parquetare specializations of.load..formatis optional if you use a specific loading function (csv, json, etc.).No header by default.
.coalesece(1)orrepartition(1)if you want to write to only 1 file.
Using load¶
In [6]:
flights = (
spark.read.format("csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load(file_flight)
)
flights.show(5)
In [9]:
val flights = spark.read.
format("csv").
load(fileFlight)
flights.show(5)
Out[9]:
Using csv¶
In [10]:
val flights = spark.read.
format("csv").
option("header", "true"). // false by default
option("mode", "DROPMALFORMED").
csv(fileFlight)
flights.show(5)
Out[10]:
In [11]:
val flights = spark.read.option("header", "true").csv(fileFlight)
flights.show(5)
Out[11]:
In [12]:
val flights = spark.read.csv(fileFlight)
flights.show(5)
Out[12]:
In [14]:
val flights = spark.sql("SELECT * FROM csv.`../../home/media/data/flights14.csv`")
flights.show
Out[14]:
In [ ]:
import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("project", StringType, true),
StructField("article", StringType, true),
StructField("requests", IntegerType, true),
StructField("bytes_served", DoubleType, true))
)
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("header", "true")
.schema(customSchema)
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Write DataFrame to CSV¶
In [15]:
val flights = spark.read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
csv(fileFlight)
flights.write.option("header", "true").csv("f2.csv")
Out[15]:
Misc Configurations¶
In [ ]:
df.write.
mode("overwrite").
format("parquet").
option("compression", "none").
save("/tmp/file_no_compression_parq")
df.write.
mode("overwrite").
format("parquet").
option("compression", "gzip").
save("/tmp/file_with_gzip_parq")
df.write.
mode("overwrite").
format("parquet").
option("compression", "snappy").
save("/tmp/file_with_snappy_parq")
df.write.mode("overwrite").format("orc").option("compression", "none").mode("overwrite").save("/tmp/file_no_compression_orc")
df.write.mode("overwrite").format("orc").option("compression", "snappy").mode("overwrite").save("/tmp/file_with_snappy_orc")
df.write.mode("overwrite").format("orc").option("compression", "zlib").mode("overwrite").save("/tmp/file_with_zlib_orc")
Output with a Single Header¶
https://stackoverflow.com/questions/38056152/merge-spark-output-csv-files-with-a-single-header
In [ ]:
dataFrame
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)
In [ ]:
dataFrame
.repartition(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)