Apache Spark is a fast and general-purpose cluster computing system. The latest version can be downloaded from http://spark.apache.org/downloads.html. In this post, we will try to perform some basic data manipulations using spark and python.
Using Spark Python Shell
The first thing a Spark program should do is create a SparkContext object, this tells Spark how to access a cluster. When you use the python shell, a context variable named “sc” will be created automatically.
To access a python Spark shell, you can run the following inside your spark directory:
./bin/pyspark --master local[*] Using Python version 2.7.3 (default, Mar 13 2014 11:03:55) SparkContext available as sc. >>>
Resilient Distributed Datasets (RDDs)
Resilient distributed datasets (RDD) are fault-tolerant collections of elements that can be operated in parallel. The easiest way to create an in memory RDD is by calling the parallelize function as follow:
>>> numbers = [x for x in xrange(100)] >>> dist_numbers = sc.parallelize(numbers) >>> print dist_numbers ParallelCollectionRDD at parallelize at PythonRDD.scala:364
RDDs support two types of operations:
- transformations: they create a new dataset from an existing one
- actions: they return a value
Here are some examples of transformations below using the list of numbers created previously:
>>> # getting an rdd with odd numbers >>> odd_rdd = dist_numbers.filter(lambda x: x % 2 != 0) >>> # calculating factorial for any number on the list >>> import math >>> fact_rdd = dist_numbers.map(lambda x: math.factorial(x)) >>> # get first 10 elements of the list >>> dist_numbers.take(10)
The main benefit of the above is that all those operations are being distributed across your cluster.
Now, let’s look at a more advanced example by using multiple operations.
First, we will read a local file called “LICENSE” then count the number of occurrences for each word and finally get top 10 more mentioned words. To do that, we need to perform the following steps:
- Read the file.
- Split every line in separate words.
- Apply flatMap to join all the lines as a single list.
- Apply Map to map every word and the number 1 (effectively counting 1 each time).
- Reduce by key to increment by one for each word appearance (sum).
- Invert key value in order to be able to order by number of appearances.
- Get final result.
The resulting python program looks like this:
>>> # loading local file >>> dist_file = sc.textFile("LICENSE") >>> dist_file.flatMap(lambda x: x.split()).map(lambda x: (x, 1,)).reduceByKey(lambda x, y: x + y ).map(lambda x: (x, x,) ).sortByKey(False).take(10) [(279, u'#'), (253, u'the'), (142, u'of'), (118, u'or'), (114, u'and'), (111, u'to'), (102, u'OR'), (85, u'OF'), (80, u'in'), (55, u'this')]
We now realize that the first 10 words are not what we were looking for (i.e. all comments or simple words). We can improve our calculation by only keeping the words longer than 4 characters:
>>> dist_file.flatMap(lambda x: x.split()).filter(lambda x: len(x) > 4).map(lambda x: (x, 1,)).reduceByKey(lambda x, y: x + y ).map(lambda x: (x, x,) ).sortByKey(False).take(10) >>> [(55, u'this'), (43, u'that'), (40, u'License'), (33, u'conditions'), (31, u'with'), (28, u'following'), (27, u'copyright'), (26, u'Python'), (24, u'========================================================================'), (23, u'without')]
As you can see, we’ve identified a set of words which is more palatable.
A more complex example
We will write a python script named “weather.py” which will analyse a file from the US National Weather service. To get the initial data, get the file named “200705hourly.txt” which you can extract from the following compressed file http://cdo.ncdc.noaa.gov/qclcd_ascii/QCLCD200705.zip. That file contains the weather statistics for July 2005 in an hourly format. We are going to compile that data for the whole month using RDDs.
The program is as follow:
from pyspark import SparkContext, StorageLevel sc = SparkContext("local", "Spark Demo") def extract_values(x): """ x Date x Time x Visibility x DryBulbFarenheit x DryBulbCelsius x WindSpeed """ return ( x, (x, x, x, x, x), ) def process_files(): # reading in the input file data = sc.textFile('/tmp/200705hourly.txt') # getting the column names keys = data.map(lambda x: x.split(',')).first() # splitting the lines but ignoring the first one values_rdd = data.filter(lambda x: x is not None and not x.startswith(keys)).map(lambda x: x.split(',')) # persisting our set to avoid extra computation values_rdd.persist(StorageLevel.MEMORY_ONLY) mapped_values_rdd = values_rdd.map(extract_values) grouped_rdd = mapped_values_rdd.groupByKey() if __name__ == "__main__": process_files()
We can execute this script with the command below:
bin/spark-submit --master local[*] weather.py
Spark is a great tool and we’ve now seen, quite easy to get started with. It also has a lot more features which we will not cover in this post, such as Machine Learning Algorithms, Cluster Deployment, Streaming and Graph Analysis. All these features can be accessed programmatically not only using Python, but also Java and Scala if you’re more familiar with those.