Happy is a framework that allows Hadoop jobs to be written and run in Python 2.2 using Jython. It is an easy way to write map-reduce programs for Hadoop, and includes some new useful features as well. The current release supports Hadoop 0.17.2.
You may need to set your JAVA_HOME environment variable. See google for details.
Download Jython 2.2.1 and install it locally as explained here.
Set the JYTHON_HOME environment variable to point to your Jython install directory.
Either download and install Hadoop 0.17.2 or use an existing local installation.
Set the HADOOP_HOME environment variable to the root of the Hadoop installation. By default, Happy uses $HADOOP_HOME/conf as the location of the configuration files for your installation. To have it use a different directory, set the HADOOP_CONF environment variable.
Add Jython to the classpath of your Hadoop cluster. This usually requires copying the Jython installation to a path where it can be accessed by all of the Hadoop processes for your cluster, editing $HADOOP_CONF/hadoop-env.sh to include the absolute path to jython.jar in the CLASSPATH variable, and restarting your cluster.
Download and unpack the Happy release.
To run the Happy wordcount demo on a text file in your Hadoop DFS, go to the Happy release dir and run:
./bin/happy.sh ./examples/wordcount.py <input> <output>
Map-reduce jobs in Happy are defined by sub-classing happy.HappyJob and implementing a map(records, task) and reduce(key, values, task) function. Then you create an instance of the class, set the job parameters (such as inputs and outputs) and call run().
When you call run(), Happy serializes your job instance and copies it and all accompanying libraries out to the Hadoop cluster. Then for each task in the Hadoop job, your job instance is de-serialized and map or reduce is called.
The task results are written out using a collector, but aggregate statistics and other roll-up information can be stored in the happy.results dictionary, which is returned from the run() call.
Jython modules and Java jar files that are being called by your code can be specified using the environment variable HAPPY_PATH. These are added to the Python path at startup, and are also automatically included when jobs are sent to Hadoop. The path is stored in happy.path and can be edited at runtime.
Below is the examples/wordcount.py script. This script takes a text file as input and outputs a count of all of the words in the file. It uses the Happy logging APIs and the Happy results dictionary.
import sys, happy, happy.log
happy.log.setLevel("debug")
log = happy.log.getLog("wordcount")
class WordCount(happy.HappyJob):
def __init__(self, inputpath, outputpath):
happy.HappyJob.__init__(self)
self.inputpaths = inputpath
self.outputpath = outputpath
self.inputformat = "text
def map(self, records, task):
for _, value in records:
for word in value.split():
task.collect(word, "1")
def reduce(self, key, values, task):
count = 0;
for _ in values: count += 1
task.collect(key, str(count))
log.debug(key + ":" + str(count))
happy.results["words"] = happy.results.setdefault("words", 0) + count
happy.results["unique"] = happy.results.setdefault("unique", 0) + 1
if __name__ == "__main__":
if len(sys.argv) < 3:
print "Usage: <inputpath> <outputpath>"
sys.exit(-1)
wc = WordCount(sys.argv[1], sys.argv[2])
results = wc.run()
print str(sum(results["words"])) + " total words"
print str(sum(results["unique"])) + " unique words"
def __init__(self, inputpath, outputpath):
happy.HappyJob.__init__(self)
self.inputpaths = inputpath
self.outputpath = outputpath
self.inputformat = "text"
The job parameters are set here. self.inputpaths can be a single path or a list of paths, and specifies the files and directories in the DFS to use for the job. self.outputpath specifies the output directory. self.inputformat = "text" specifies that the input files will be treated as text files, splitting records on newlines. The key is the byte offset of the line, and the value is the line of text.
def map(self, records, task):
for _, value in records:
for word in value.split():
task.collect(word, "1")
The map function takes an iterator over key, value tuples, and a task object that collects output. The function splits each string and then sends the key, value pair <word>, "1" to the reducer. The Hadoop cluster then sorts the output by the keys (the words) and groups together the values for processing by the reducer function.
def reduce(self, key, values, task):
count = 0;
for _ in values: count += 1
task.collect(key, str(count))
log.debug(key + ":" + str(count))
happy.results["words"] = happy.results.setdefault("words", 0) + count
happy.results["unique"] = happy.results.setdefault("unique", 0) + 1
The reduce function takes a key, an iterator over values, and a task object for collecting output. The function totals the number of values for each word and emits <word>, <count> tuples. The word count for each word is also written as a debug statement to the log, and the total and unique word counts are stored in the happy.results dictionary.
if __name__ == "__main__":
if len(sys.argv) < 3:
print "Usage: <inputpath> <outputpath>"
sys.exit(-1)
wc = WordCount(sys.argv[1], sys.argv[2])
results = wc.run()
print str(sum(results["words"])) + " total words"
print str(sum(results["unique"])) + " unique words"
The job invocation needs to be enclosed in a main block, or else it will get executed on the cluster when the script is called through import. The job is dispatched by calling run, and a result object is returned that rolls up all of the happy.results objects on the cluster. In this case, the results dictionary contains an array of all "words" and "unique" values that were written on the cluster.
Job parameters are set as fields on your job instance and are detailed in HappyJob Parameters. The parameters mostly all translate to standard Hadoop JobConf parameters, but if you're unhappy with these or want an additional level of customization, you can override the jobconf parameters using the HappyJob.jobargs dictionary.
Jython modules and Java jar files that are being called by your code can be specified using the environment variable HAPPY_PATH. These are added to the Python path at startup, and are also automatically included when jobs are sent to Hadoop. The path is stored in happy.path and can be edited at runtime.
Happy allows result data to be sent from tasks executed on the cluster to the calling process through the happy.results dictionary. Any map or reduce task can write to happy.results using any key, and then all of the dictionaries are combined, and returned from HappyJob.run() as a single dictionary with lists of values for each key. Behind the scenes, the data files that are passed back are compressed, so a reasonable large amount of data can be returned quickly, but this won't work well if the results use up too much memory on the client process.
Valid file input formats, are "text" (one value per line), "keyvalue" (one key-value pair per line, separated by a tab), or "sequence" (a binary compressed sequencefile), or "auto" (auto-detect sequence or keyvalue).
The default input format for Happy is "auto", which automatically detects whether the input is a tab-seperated key-value text file or a sequence file. If the input is a text file, the keys and values are passed through as Strings. If the input is a sequence file of Text values, they are also passed through as Strings, otherwise the native objects are passed through.
Hadoop will automatically handle compressed text files when the text or keyvalue inputformats are used as long as the input files have appropriate extensions. The supported formats and extensions are gzip (.gz), zlib (.deflate), and lzo (.lzo). Output compression of text and sequence files can be enabled by setting compressoutput=True. The codec can be selected by setting compressiontype to zlib, gzip, or lzo.
Sequence Files are Hadoop's binary file format for storing and compressing sequential key-value data. You can tell a sequence file because the first three characters are SEQ followed by binary data. Sequence files store the Java classes for serializing the keys and values (most often these are type Text) and the codec used for compression. They are a fast and efficient way to store data that you're using for map-reduce jobs.
Enable sequence file compression by setting compressoutput=True. Sequence file compression can be set to BLOCK (default) or RECORD using the sequencetype parameter. Block compression allows sequence files to be split on a block boundary, and record compression allows sequence files to be split at any record. Block compression is significantly faster and more efficient than record compression.
The happy.dfs module allows for alternative collectors other than the task collector. These are useful if you want to sort your output data into multiple directories, or want to store a large amount of data as a side-affect of your job. Partitioned collectors are collectors where the filename is automatically created based on the current task id.
Happy includes fast APIs for encoding and decoding JSON data to native Python data structures. This is a convenient way to sort and serialize data in a portable and inspectable form.
These are job parameters that can be set on happy.HappyJob.
At minimum, a job class needs a map(records, task) function to run. A reduce(key, values, task) function is required if HappyJob.reducetasks is greater than 0. Other functions that can be defined for the job are:
The task object passed into the map and reduce functions is used to output data and get information about the current task.
task.getInputPath() Returns the input path from which the current records are being read. This is useful if you're reading from multiple different files and want to have different code run depending on the input.
The Happy logging module integrates with Hadoop's built-in logging support, which uses Log4J and the Apache Commons Logging adapters. The log objects used in this module are all instances of the Apache Log API. An example usage can be seen in the wordcount example.
Functions for accessing the Hadoop DFS and the local filesystem.