Visualizing Airline Delays with Spark
In Chapter 3, we explored using Hadoop Streaming and MapReduce to compute the average flight delay per airport using the US Department of Transportation’s on-time flight dataset. This kind of computation—parsing a CSV file and performing an aggregate computation—is an extremely common use case of Hadoop, particularly as CSV data is easily exported from relational databases. This dataset, which records all US domestic flight departure and arrival times along with their delays, is also interesting because while a single month is easily computed upon, the entire dataset would benefit from distributed computation due to its size.
In this example, we’ll use Spark to perform an aggregation of this dataset, in particular determining which airlines were the most delayed in April 2014. We will specifically look at the slightly more advanced [and Pythonic] techniques we can use due to the increased flexibility of the Spark Python API. Moreover, we will show how central the driver program is to the computation by pulling the results back and displaying a visualization on the driver machine using matplotlib
[a task that would take two steps using traditional MapReduce].
In order to get a feel for how Spark applications are structured, and to see the template described in the previous section in action, we will first inspect a 10,000-foot view of the complete structure of the program with the details snipped out:
## Imports
import
csv
import
matplotlib.pyplot
as
plt
from
StringIO
import
StringIO
from
datetime
import
datetime
from
collections
import
namedtuple
from
operator
import
add
,
itemgetter
from
pyspark
import
SparkConf
,
SparkContext
## Module constants
APP_NAME
=
"Flight Delay Analysis"
DATE_FMT
=
"
%Y
-
%m
-
%d
"
TIME_FMT
=
"
%H%M
"
fields
=
[
'date'
,
'airline'
,
'flightnum'
,
'origin'
,
'dest'
,
'dep'
,
'dep_delay'
,
'arv'
,
'arv_delay'
,
'airtime'
,
'distance'
]
Flight
=
namedtuple
[
'Flight'
,
fields
]
## Closure functions
def
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
pass
def
split
[
line
]:
"""
Operator function for splitting a line with csv module
"""
pass
def
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
pass
## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
pass
if
__name__
==
"__main__"
:
# Configure Spark
conf
=
SparkConf
[]
.
setMaster
[
"local[*]"
]
conf
=
conf
.
setAppName
[
APP_NAME
]
sc
=
SparkContext
[
conf
=
conf
]
# Execute main functionality
main
[
sc
]
This snippet of code, while long, provides a good overview of the structure of an actual Spark program. The imports show the usual use of a mixture of standard library tools as well as a third-party library, matplotlib
. As with Hadoop Streaming, any third-party code that is not part of the standard library must be either pre-installed on the cluster or shipped with the job. For code that need only be executed on the driver and not in the executors [e.g., matplotlib
], you can use a try/except
block and capture
0.## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
Note
As with Hadoop Streaming, any third-party Python dependencies that are not part of the Python standard library must be pre-installed on each node in the cluster. However, unlike Hadoop Streaming, the fact that there are two contexts, the driver context and the executor context, means that some heavyweight libraries [particularly visualization libraries] can be installed only on the driver machine, so long as they are not used in a closure passed to a Spark operation that will execute on the cluster. To prevent errors, wrap imports in a try/except
block and capture
2s.## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
The application then defines some data that is configurable, including the date and time format for parsing datetime strings and the application name. A specialized
3 data structure is also created in order to create lightweight and accessible parsed rows from the input data. This information should be available to all executors, but is lightweight enough to not require a broadcast variable. Next, the processing functions,## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
4,## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
5, and## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
6 are defined, as well as a## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
7 function that uses the## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
8 to define the actions and transformations on the airline dataset. Finally, the## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
9 configures Spark and executes the## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
7 function.## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
With this high-level overview complete, let’s dive deeper into the specifics of the code, starting with the
7 method that defines the primary Spark operations and the analytical methodology:## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
Our first job is to load our two data sources from disk: first, a lookup table of airline codes to airline names, and second, the flight instances dataset. The dataset airlines.csv is a small jump table that allows us to join airline codes with the full airline name; however, because this dataset is small enough, we don’t have to perform a distributed join of two RDDs. Instead we store this information as a Python dictionary and broadcast it to every node in the cluster using
2, which transforms the local Python dictionary into a broadcast variable.def
split
[
line
]:
"""
Operator function for splitting a line with csv module
"""
reader
=
csv
.
reader
[
StringIO
[
line
]]
return
reader
.
next
[]
The creation of this broadcast variable and execution is as follows. First, an RDD is created from the text file on the local disk called airlines.csv [note the relative path]. Creation of the RDD is required because this data could be coming from a Hadoop data source, which would be specified with a URI to the location [e.g., hdfs:// for HDFS data or s3:// for S3, etc.]. Note if this file was simply on the local machine, then loading it into an RDD is not necessary. The
5 function is then mapped to every element in the dataset, as discussed momentarily. Finally, the## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
4 action is applied to the RDD, which brings the data back from the cluster to the driver as a Python list. Because thedef
split
[
line
]:
"""
Operator function for splitting a line with csv module
"""
reader
=
csv
.
reader
[
StringIO
[
line
]]
return
reader
.
next
[]
4 action was applied, when this line of code executes, a job is sent to the cluster to load the RDD, split it, then return the context to the driver program:def
split
[
line
]:
"""
Operator function for splitting a line with csv module
"""
reader
=
csv
.
reader
[
StringIO
[
line
]]
return
reader
.
next
[]
def
split
[
line
]:
"""
Operator function for splitting a line with csv module
"""
reader
=
csv
.
reader
[
StringIO
[
line
]]
return
reader
.
next
[]
The
5 function parses each line of text using the## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
7 module by creating a file-like object with the line of text usingdef
split
[
line
]:
"""
Operator function for splitting a line with csv module
"""
reader
=
csv
.
reader
[
StringIO
[
line
]]
return
reader
.
next
[]
8, which is then passed into thedef
split
[
line
]:
"""
Operator function for splitting a line with csv module
"""
reader
=
csv
.
reader
[
StringIO
[
line
]]
return
reader
.
next
[]
9. Because there is only a single line of text, we can simply returndef
split
[
line
]:
"""
Operator function for splitting a line with csv module
"""
reader
=
csv
.
reader
[
StringIO
[
line
]]
return
reader
.
next
[]
0. While this method of CSV parsing may seem pretty heavyweight, it allows us to more easily deal with delimiters, escaping, and other nuances of CSV processing. For larger datasets, a similar methodology is applied to entire files usingdef
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
row
[
0
]
=
datetime
.
strptime
[
row
[
0
],
DATE_FMT
]
.
date
[]
row
[
5
]
=
datetime
.
strptime
[
row
[
5
],
TIME_FMT
]
.
time
[]
row
[
6
]
=
float
[
row
[
6
]]
row
[
7
]
=
datetime
.
strptime
[
row
[
7
],
TIME_FMT
]
.
time
[]
row
[
8
]
=
float
[
row
[
8
]]
row
[
9
]
=
float
[
row
[
9
]]
row
[
10
]
=
float
[
row
[
10
]]
return
Flight
[
*
row
[:
11
]]
1 to process many CSV files that are split into blocks of 128 MB each [e.g., the block size and replication on HDFS]:def
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
row
[
0
]
=
datetime
.
strptime
[
row
[
0
],
DATE_FMT
]
.
date
[]
row
[
5
]
=
datetime
.
strptime
[
row
[
5
],
TIME_FMT
]
.
time
[]
row
[
6
]
=
float
[
row
[
6
]]
row
[
7
]
=
datetime
.
strptime
[
row
[
7
],
TIME_FMT
]
.
time
[]
row
[
8
]
=
float
[
row
[
8
]]
row
[
9
]
=
float
[
row
[
9
]]
row
[
10
]
=
float
[
row
[
10
]]
return
Flight
[
*
row
[:
11
]]
def
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
row
[
0
]
=
datetime
.
strptime
[
row
[
0
],
DATE_FMT
]
.
date
[]
row
[
5
]
=
datetime
.
strptime
[
row
[
5
],
TIME_FMT
]
.
time
[]
row
[
6
]
=
float
[
row
[
6
]]
row
[
7
]
=
datetime
.
strptime
[
row
[
7
],
TIME_FMT
]
.
time
[]
row
[
8
]
=
float
[
row
[
8
]]
row
[
9
]
=
float
[
row
[
9
]]
row
[
10
]
=
float
[
row
[
10
]]
return
Flight
[
*
row
[:
11
]]
Next, the
7 function loads the much larger flights.csv, which needs to be computed upon in a parallel fashion using an RDD. After splitting the CSV rows, we map the## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
4 function to the CSV row, which converts dates and times to Python dates and times, and casts floating-point numbers appropriately. The output of this function is a## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
3 called## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
5 that was defined in the module constants section of the application. Named tuples are lightweight data structures that contain record information such that data can be accessed by name—for example,def
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
row
[
0
]
=
datetime
.
strptime
[
row
[
0
],
DATE_FMT
]
.
date
[]
row
[
5
]
=
datetime
.
strptime
[
row
[
5
],
TIME_FMT
]
.
time
[]
row
[
6
]
=
float
[
row
[
6
]]
row
[
7
]
=
datetime
.
strptime
[
row
[
7
],
TIME_FMT
]
.
time
[]
row
[
8
]
=
float
[
row
[
8
]]
row
[
9
]
=
float
[
row
[
9
]]
row
[
10
]
=
float
[
row
[
10
]]
return
Flight
[
*
row
[:
11
]]
6 rather than position [e.g.,def
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
row
[
0
]
=
datetime
.
strptime
[
row
[
0
],
DATE_FMT
]
.
date
[]
row
[
5
]
=
datetime
.
strptime
[
row
[
5
],
TIME_FMT
]
.
time
[]
row
[
6
]
=
float
[
row
[
6
]]
row
[
7
]
=
datetime
.
strptime
[
row
[
7
],
TIME_FMT
]
.
time
[]
row
[
8
]
=
float
[
row
[
8
]]
row
[
9
]
=
float
[
row
[
9
]]
row
[
10
]
=
float
[
row
[
10
]]
return
Flight
[
*
row
[:
11
]]
7]. Like normal Python tuples, they are immutable, so they are safe to use in processing applications because the data can’t be modified. Additionally, they are much more memory and processing efficient than dictionaries, and as a result, provide a noticeable benefit in big data applications like Spark where memory is at a premium.def
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
row
[
0
]
=
datetime
.
strptime
[
row
[
0
],
DATE_FMT
]
.
date
[]
row
[
5
]
=
datetime
.
strptime
[
row
[
5
],
TIME_FMT
]
.
time
[]
row
[
6
]
=
float
[
row
[
6
]]
row
[
7
]
=
datetime
.
strptime
[
row
[
7
],
TIME_FMT
]
.
time
[]
row
[
8
]
=
float
[
row
[
8
]]
row
[
9
]
=
float
[
row
[
9
]]
row
[
10
]
=
float
[
row
[
10
]]
return
Flight
[
*
row
[:
11
]]
With an RDD of
5 objects in hand, the final transformation is to map an anonymous function that transforms the RDD to a series of key/value pairs where the key is the name of the airline and the value is the sum of the arrival and departure delays. At this point, besides the creation of the airlines dictionary, no execution has been performed on the collection. However, once we begin to sum the per airline delays using thedef
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
row
[
0
]
=
datetime
.
strptime
[
row
[
0
],
DATE_FMT
]
.
date
[]
row
[
5
]
=
datetime
.
strptime
[
row
[
5
],
TIME_FMT
]
.
time
[]
row
[
6
]
=
float
[
row
[
6
]]
row
[
7
]
=
datetime
.
strptime
[
row
[
7
],
TIME_FMT
]
.
time
[]
row
[
8
]
=
float
[
row
[
8
]]
row
[
9
]
=
float
[
row
[
9
]]
row
[
10
]
=
float
[
row
[
10
]]
return
Flight
[
*
row
[:
11
]]
9 action and thedef
parse
[
row
]:
"""
Parses a row and returns a named tuple.
"""
row
[
0
]
=
datetime
.
strptime
[
row
[
0
],
DATE_FMT
]
.
date
[]
row
[
5
]
=
datetime
.
strptime
[
row
[
5
],
TIME_FMT
]
.
time
[]
row
[
6
]
=
float
[
row
[
6
]]
row
[
7
]
=
datetime
.
strptime
[
row
[
7
],
TIME_FMT
]
.
time
[]
row
[
8
]
=
float
[
row
[
8
]]
row
[
9
]
=
float
[
row
[
9
]]
row
[
10
]
=
float
[
row
[
10
]]
return
Flight
[
*
row
[:
11
]]
0 operator, the job is executed across the cluster, then collected back to the driver program.def
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
airlines
=
[
d
[
0
]
for
d
in
delays
]
minutes
=
[
d
[
1
]
for
d
in
delays
]
index
=
list
[
xrange
[
len
[
airlines
]]]
fig
,
axe
=
plt
.
subplots
[]
bars
=
axe
.
barh
[
index
,
minutes
]
# Add the total minutes to the right
for
idx
,
air
,
min
in
zip
[
index
,
airlines
,
minutes
]:
if
min
>
0
:
bars
[
idx
]
.
set_color
[
'#d9230f'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
min
+
1
,
idx
+
0.5
],
va
=
'center'
]
else
:
bars
[
idx
]
.
set_color
[
'#469408'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
10
,
idx
+
0.5
],
va
=
'center'
]
# Set the ticks
ticks
=
plt
.
yticks
[[
idx
+
0.5
for
idx
in
index
],
airlines
]
xt
=
plt
.
xticks
[][
0
]
plt
.
xticks
[
xt
,
[
' '
]
*
len
[
xt
]]
# Minimize chart junk
plt
.
grid
[
axis
=
'x'
,
color
=
'white'
,
linestyle
=
'-'
]
plt
.
title
[
'Total Minutes Delayed per Airline'
]
plt
.
show
[]
At this point, the cluster computation is complete, and we proceed in a sequential fashion on the driver program. The delays are sorted by delay magnitude in the memory of the client program. Note that this is possible for the same reason that we created the airlines lookup table as a broadcast variable: the number of airlines is small and it is more efficient to sort in memory. However, if this RDD was extremely large, a distributed sort using
1 could be used. Finally, instead of writing the results to disk, the output is printed to the console. If this dataset were big, thedef
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
airlines
=
[
d
[
0
]
for
d
in
delays
]
minutes
=
[
d
[
1
]
for
d
in
delays
]
index
=
list
[
xrange
[
len
[
airlines
]]]
fig
,
axe
=
plt
.
subplots
[]
bars
=
axe
.
barh
[
index
,
minutes
]
# Add the total minutes to the right
for
idx
,
air
,
min
in
zip
[
index
,
airlines
,
minutes
]:
if
min
>
0
:
bars
[
idx
]
.
set_color
[
'#d9230f'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
min
+
1
,
idx
+
0.5
],
va
=
'center'
]
else
:
bars
[
idx
]
.
set_color
[
'#469408'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
10
,
idx
+
0.5
],
va
=
'center'
]
# Set the ticks
ticks
=
plt
.
yticks
[[
idx
+
0.5
for
idx
in
index
],
airlines
]
xt
=
plt
.
xticks
[][
0
]
plt
.
xticks
[
xt
,
[
' '
]
*
len
[
xt
]]
# Minimize chart junk
plt
.
grid
[
axis
=
'x'
,
color
=
'white'
,
linestyle
=
'-'
]
plt
.
title
[
'Total Minutes Delayed per Airline'
]
plt
.
show
[]
2 action might be used to take the firstdef
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
airlines
=
[
d
[
0
]
for
d
in
delays
]
minutes
=
[
d
[
1
]
for
d
in
delays
]
index
=
list
[
xrange
[
len
[
airlines
]]]
fig
,
axe
=
plt
.
subplots
[]
bars
=
axe
.
barh
[
index
,
minutes
]
# Add the total minutes to the right
for
idx
,
air
,
min
in
zip
[
index
,
airlines
,
minutes
]:
if
min
>
0
:
bars
[
idx
]
.
set_color
[
'#d9230f'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
min
+
1
,
idx
+
0.5
],
va
=
'center'
]
else
:
bars
[
idx
]
.
set_color
[
'#469408'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
10
,
idx
+
0.5
],
va
=
'center'
]
# Set the ticks
ticks
=
plt
.
yticks
[[
idx
+
0.5
for
idx
in
index
],
airlines
]
xt
=
plt
.
xticks
[][
0
]
plt
.
xticks
[
xt
,
[
' '
]
*
len
[
xt
]]
# Minimize chart junk
plt
.
grid
[
axis
=
'x'
,
color
=
'white'
,
linestyle
=
'-'
]
plt
.
title
[
'Total Minutes Delayed per Airline'
]
plt
.
show
[]
3 items rather than printing the entire dataset, or by usingdef
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
airlines
=
[
d
[
0
]
for
d
in
delays
]
minutes
=
[
d
[
1
]
for
d
in
delays
]
index
=
list
[
xrange
[
len
[
airlines
]]]
fig
,
axe
=
plt
.
subplots
[]
bars
=
axe
.
barh
[
index
,
minutes
]
# Add the total minutes to the right
for
idx
,
air
,
min
in
zip
[
index
,
airlines
,
minutes
]:
if
min
>
0
:
bars
[
idx
]
.
set_color
[
'#d9230f'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
min
+
1
,
idx
+
0.5
],
va
=
'center'
]
else
:
bars
[
idx
]
.
set_color
[
'#469408'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
10
,
idx
+
0.5
],
va
=
'center'
]
# Set the ticks
ticks
=
plt
.
yticks
[[
idx
+
0.5
for
idx
in
index
],
airlines
]
xt
=
plt
.
xticks
[][
0
]
plt
.
xticks
[
xt
,
[
' '
]
*
len
[
xt
]]
# Minimize chart junk
plt
.
grid
[
axis
=
'x'
,
color
=
'white'
,
linestyle
=
'-'
]
plt
.
title
[
'Total Minutes Delayed per Airline'
]
plt
.
show
[]
4 to write the data back to our local disk or to HDFS.def
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
airlines
=
[
d
[
0
]
for
d
in
delays
]
minutes
=
[
d
[
1
]
for
d
in
delays
]
index
=
list
[
xrange
[
len
[
airlines
]]]
fig
,
axe
=
plt
.
subplots
[]
bars
=
axe
.
barh
[
index
,
minutes
]
# Add the total minutes to the right
for
idx
,
air
,
min
in
zip
[
index
,
airlines
,
minutes
]:
if
min
>
0
:
bars
[
idx
]
.
set_color
[
'#d9230f'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
min
+
1
,
idx
+
0.5
],
va
=
'center'
]
else
:
bars
[
idx
]
.
set_color
[
'#469408'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
10
,
idx
+
0.5
],
va
=
'center'
]
# Set the ticks
ticks
=
plt
.
yticks
[[
idx
+
0.5
for
idx
in
index
],
airlines
]
xt
=
plt
.
xticks
[][
0
]
plt
.
xticks
[
xt
,
[
' '
]
*
len
[
xt
]]
# Minimize chart junk
plt
.
grid
[
axis
=
'x'
,
color
=
'white'
,
linestyle
=
'-'
]
plt
.
title
[
'Total Minutes Delayed per Airline'
]
plt
.
show
[]
Finally, because we have the data available in the driver, we can visualize the results using matplotlib
as follows:
def
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
airlines
=
[
d
[
0
]
for
d
in
delays
]
minutes
=
[
d
[
1
]
for
d
in
delays
]
index
=
list
[
xrange
[
len
[
airlines
]]]
fig
,
axe
=
plt
.
subplots
[]
bars
=
axe
.
barh
[
index
,
minutes
]
# Add the total minutes to the right
for
idx
,
air
,
min
in
zip
[
index
,
airlines
,
minutes
]:
if
min
>
0
:
bars
[
idx
]
.
set_color
[
'#d9230f'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
min
+
1
,
idx
+
0.5
],
va
=
'center'
]
else
:
bars
[
idx
]
.
set_color
[
'#469408'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
10
,
idx
+
0.5
],
va
=
'center'
]
# Set the ticks
ticks
=
plt
.
yticks
[[
idx
+
0.5
for
idx
in
index
],
airlines
]
xt
=
plt
.
xticks
[][
0
]
plt
.
xticks
[
xt
,
[
' '
]
*
len
[
xt
]]
# Minimize chart junk
plt
.
grid
[
axis
=
'x'
,
color
=
'white'
,
linestyle
=
'-'
]
plt
.
title
[
'Total Minutes Delayed per Airline'
]
plt
.
show
[]
Hopefully this example illustrates the interplay of the cluster and the driver program [sending out for analytics, then bringing results back to the driver], as well as the role of Python code in a Spark application. To run this code [presuming that you have a directory called ontime with the two CSV files in the same directory], use the
6 command as follows:def
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
airlines
=
[
d
[
0
]
for
d
in
delays
]
minutes
=
[
d
[
1
]
for
d
in
delays
]
index
=
list
[
xrange
[
len
[
airlines
]]]
fig
,
axe
=
plt
.
subplots
[]
bars
=
axe
.
barh
[
index
,
minutes
]
# Add the total minutes to the right
for
idx
,
air
,
min
in
zip
[
index
,
airlines
,
minutes
]:
if
min
>
0
:
bars
[
idx
]
.
set_color
[
'#d9230f'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
min
+
1
,
idx
+
0.5
],
va
=
'center'
]
else
:
bars
[
idx
]
.
set_color
[
'#469408'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
10
,
idx
+
0.5
],
va
=
'center'
]
# Set the ticks
ticks
=
plt
.
yticks
[[
idx
+
0.5
for
idx
in
index
],
airlines
]
xt
=
plt
.
xticks
[][
0
]
plt
.
xticks
[
xt
,
[
' '
]
*
len
[
xt
]]
# Minimize chart junk
plt
.
grid
[
axis
=
'x'
,
color
=
'white'
,
linestyle
=
'-'
]
plt
.
title
[
'Total Minutes Delayed per Airline'
]
plt
.
show
[]
hostname $ spark-submit app.py
Because we hardcoded the master as
7 in the configuration under thedef
plot
[
delays
]:
"""
Show a bar chart of the total delay per airline
"""
airlines
=
[
d
[
0
]
for
d
in
delays
]
minutes
=
[
d
[
1
]
for
d
in
delays
]
index
=
list
[
xrange
[
len
[
airlines
]]]
fig
,
axe
=
plt
.
subplots
[]
bars
=
axe
.
barh
[
index
,
minutes
]
# Add the total minutes to the right
for
idx
,
air
,
min
in
zip
[
index
,
airlines
,
minutes
]:
if
min
>
0
:
bars
[
idx
]
.
set_color
[
'#d9230f'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
min
+
1
,
idx
+
0.5
],
va
=
'center'
]
else
:
bars
[
idx
]
.
set_color
[
'#469408'
]
axe
.
annotate
[
"
%0.0f
min"
%
min
,
xy
=
[
10
,
idx
+
0.5
],
va
=
'center'
]
# Set the ticks
ticks
=
plt
.
yticks
[[
idx
+
0.5
for
idx
in
index
],
airlines
]
xt
=
plt
.
xticks
[][
0
]
plt
.
xticks
[
xt
,
[
' '
]
*
len
[
xt
]]
# Minimize chart junk
plt
.
grid
[
axis
=
'x'
,
color
=
'white'
,
linestyle
=
'-'
]
plt
.
title
[
'Total Minutes Delayed per Airline'
]
plt
.
show
[]
9, this command creates a Spark job with as many processes as are available on the localhost. It will then begin executing the transformations and actions specified in the## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
7 function with the local## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
8. First it loads the jump table as an RDD, collect, and broadcast it to all processes, then it loads the flight data RDD and processes it to compute the average delays in a parallel fashion.## Main functionality
def
main
[
sc
]:
"""
Describe the transformations and actions used on the dataset, then plot
the visualization on the output using matplotlib.
"""
# Load the airlines lookup dictionary
airlines
=
dict
[
sc
.
textFile
[
"ontime/airlines.csv"
]
.
map
[
split
]
.
collect
[]]
# Broadcast the lookup dictionary to the cluster
airline_lookup
=
sc
.
broadcast
[
airlines
]
# Read the CSV data into an RDD
flights
=
sc
.
textFile
[
"ontime/flights.csv"
]
.
map
[
split
]
.
map
[
parse
]
# Map the total delay to the airline [joined using the broadcast value]
delays
=
flights
.
map
[
lambda
f
:
[
airline_lookup
.
value
[
f
.
airline
],
add
[
f
.
dep_delay
,
f
.
arv_delay
]]]
# Reduce the total delay for the month to the airline
delays
=
delays
.
reduceByKey
[
add
]
.
collect
[]
delays
=
sorted
[
delays
,
key
=
itemgetter
[
1
]]
# Provide output from the driver
for
d
in
delays
:
"
%0.0f
minutes delayed
\t
%s
"
%
[
d
[
1
],
d
[
0
]]
# Show a bar chart of the delays
plot
[
delays
]
Once the context and the output from the collect is returned to the driver, we can visualize the result using matplotlib
, as shown in Figure 4-4. The final result shows that the total delays [in minutes] in April 2014 span from arriving early for those you’re flying Hawaiian or Alaskan Airlines, to an aggregate total delay for most big airlines. The novelty here is not in the visualization of the analysis, but in the one-step process of submitting a parallel executing job, and in a reasonable amount of user time, displaying a result. Consequently, applications like these that deliver on-demand analyses directly to users for immediate insights are becoming increasingly common.