-
Notifications
You must be signed in to change notification settings - Fork 646
Expand file tree
/
Copy pathSparkSchemaDemo.py
More file actions
78 lines (64 loc) · 2.87 KB
/
SparkSchemaDemo.py
File metadata and controls
78 lines (64 loc) · 2.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.getOrCreate()
logger = Log4j(spark)
flightSchemaStruct = StructType([
StructField("FL_DATE", DateType()),
StructField("OP_CARRIER", StringType()),
StructField("OP_CARRIER_FL_NUM", IntegerType()),
StructField("ORIGIN", StringType()),
StructField("ORIGIN_CITY_NAME", StringType()),
StructField("DEST", StringType()),
StructField("DEST_CITY_NAME", StringType()),
StructField("CRS_DEP_TIME", IntegerType()),
StructField("DEP_TIME", IntegerType()),
StructField("WHEELS_ON", IntegerType()),
StructField("TAXI_IN", IntegerType()),
StructField("CRS_ARR_TIME", IntegerType()),
StructField("ARR_TIME", IntegerType()),
StructField("CANCELLED", IntegerType()),
StructField("DISTANCE", IntegerType())
])
flightSchemaDDL = """FL_DATE DATE, OP_CARRIER STRING, OP_CARRIER_FL_NUM INT, ORIGIN STRING,
ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT,
WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED INT, DISTANCE INT"""
# CSV with inferred schema, this gives incorrect schema type for FL_DATE (string)
# flightTimeCsvDF = spark.read \
# .format("csv") \
# .option("header", "true") \
# .option("inferSchema", "true") \
# .load("data/flight*.csv")
flightTimeCsvDF = spark.read \
.format("csv") \
.option("header", "true") \
.schema(flightSchemaStruct) \
.option("mode", "FAILFAST") \
.option("dateFormat", "M/d/y") \
.load("data/flight*.csv")
flightTimeCsvDF.show(5)
logger.info("CSV Schema:" + flightTimeCsvDF.schema.simpleString())
# JSON with inferred schema, this gives incorrect schema type for FL_DATE (string)
# flightTimeJsonDF = spark.read \
# .format("json") \
# .load("data/flight*.json")
flightTimeJsonDF = spark.read \
.format("json") \
.schema(flightSchemaDDL) \
.option("dateFormat", "M/d/y") \
.load("data/flight*.json")
flightTimeJsonDF.show(5)
logger.info("JSON Schema:" + flightTimeJsonDF.schema.simpleString())
# Parquet files include the schema, so no need to specify it
# There is a PyCharm plugin named "Avro and Parquet Viewer" you can install to view the parquet file schema/data.
flightTimeParquetDF = spark.read \
.format("parquet") \
.load("data/flight*.parquet")
flightTimeParquetDF.show(5)
logger.info("Parquet Schema:" + flightTimeParquetDF.schema.simpleString())
spark.stop()