Happy is a framework that allows Hadoop jobs to be written and run in Python 2.2 using Jython. It is an easy way to write map-reduce programs for Hadoop, and includes some new useful features as well. The current release supports Hadoop 0.17.2.

Quickstart

  1. You may need to set your JAVA_HOME environment variable. See google for details.

  2. Download Jython 2.2.1 and install it locally as explained here.

  3. Set the JYTHON_HOME environment variable to point to your Jython install directory.

  4. Either download and install Hadoop 0.17.2 or use an existing local installation.

  5. Set the HADOOP_HOME environment variable to the root of the Hadoop installation. By default, Happy uses $HADOOP_HOME/conf as the location of the configuration files for your installation. To have it use a different directory, set the HADOOP_CONF environment variable.

  6. Add Jython to the classpath of your Hadoop cluster. This usually requires copying the Jython installation to a path where it can be accessed by all of the Hadoop processes for your cluster, editing $HADOOP_CONF/hadoop-env.sh to include the absolute path to jython.jar in the CLASSPATH variable, and restarting your cluster.

  7. Download and unpack the Happy release.

  8. To run the Happy wordcount demo on a text file in your Hadoop DFS, go to the Happy release dir and run:

    ./bin/happy.sh ./examples/wordcount.py <input> <output>
    

Happy Overview

Map-reduce jobs in Happy are defined by sub-classing happy.HappyJob and implementing a map(records, task) and reduce(key, values, task) function. Then you create an instance of the class, set the job parameters (such as inputs and outputs) and call run().

When you call run(), Happy serializes your job instance and copies it and all accompanying libraries out to the Hadoop cluster. Then for each task in the Hadoop job, your job instance is de-serialized and map or reduce is called.

The task results are written out using a collector, but aggregate statistics and other roll-up information can be stored in the happy.results dictionary, which is returned from the run() call.

Jython modules and Java jar files that are being called by your code can be specified using the environment variable HAPPY_PATH. These are added to the Python path at startup, and are also automatically included when jobs are sent to Hadoop. The path is stored in happy.path and can be edited at runtime.

WordCount Example

Below is the examples/wordcount.py script. This script takes a text file as input and outputs a count of all of the words in the file. It uses the Happy logging APIs and the Happy results dictionary.

import sys, happy, happy.log

happy.log.setLevel("debug")
log = happy.log.getLog("wordcount")

class WordCount(happy.HappyJob):
    def __init__(self, inputpath, outputpath):
        happy.HappyJob.__init__(self)
        self.inputpaths = inputpath
        self.outputpath = outputpath
        self.inputformat = "text

    def map(self, records, task):
        for _, value in records:
            for word in value.split():
                task.collect(word, "1")

    def reduce(self, key, values, task):
        count = 0;
        for _ in values: count += 1
        task.collect(key, str(count))
        log.debug(key + ":" + str(count))
        happy.results["words"] = happy.results.setdefault("words", 0) + count
        happy.results["unique"] = happy.results.setdefault("unique", 0) + 1

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print "Usage: <inputpath> <outputpath>"
        sys.exit(-1)
    wc = WordCount(sys.argv[1], sys.argv[2])
    results = wc.run()
    print str(sum(results["words"])) + " total words"
    print str(sum(results["unique"])) + " unique words"

Constructor

def __init__(self, inputpath, outputpath):
    happy.HappyJob.__init__(self)
    self.inputpaths = inputpath
    self.outputpath = outputpath
    self.inputformat = "text"

The job parameters are set here. self.inputpaths can be a single path or a list of paths, and specifies the files and directories in the DFS to use for the job. self.outputpath specifies the output directory. self.inputformat = "text" specifies that the input files will be treated as text files, splitting records on newlines. The key is the byte offset of the line, and the value is the line of text.

Map Function

def map(self, records, task):
    for _, value in records:
        for word in value.split():
            task.collect(word, "1")

The map function takes an iterator over key, value tuples, and a task object that collects output. The function splits each string and then sends the key, value pair <word>, "1" to the reducer. The Hadoop cluster then sorts the output by the keys (the words) and groups together the values for processing by the reducer function.

Reduce Function

def reduce(self, key, values, task):
    count = 0;
    for _ in values: count += 1
    task.collect(key, str(count))
    log.debug(key + ":" + str(count))
    happy.results["words"] = happy.results.setdefault("words", 0) + count
    happy.results["unique"] = happy.results.setdefault("unique", 0) + 1

The reduce function takes a key, an iterator over values, and a task object for collecting output. The function totals the number of values for each word and emits <word>, <count> tuples. The word count for each word is also written as a debug statement to the log, and the total and unique word counts are stored in the happy.results dictionary.

Main Function

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print "Usage: <inputpath> <outputpath>"
        sys.exit(-1)
    wc = WordCount(sys.argv[1], sys.argv[2])
    results = wc.run()
    print str(sum(results["words"])) + " total words"
    print str(sum(results["unique"])) + " unique words"

The job invocation needs to be enclosed in a main block, or else it will get executed on the cluster when the script is called through import. The job is dispatched by calling run, and a result object is returned that rolls up all of the happy.results objects on the cluster. In this case, the results dictionary contains an array of all "words" and "unique" values that were written on the cluster.

Happy Notes

Job Parameters

Job parameters are set as fields on your job instance and are detailed in HappyJob Parameters. The parameters mostly all translate to standard Hadoop JobConf parameters, but if you're unhappy with these or want an additional level of customization, you can override the jobconf parameters using the HappyJob.jobargs dictionary.

Happy Path

Jython modules and Java jar files that are being called by your code can be specified using the environment variable HAPPY_PATH. These are added to the Python path at startup, and are also automatically included when jobs are sent to Hadoop. The path is stored in happy.path and can be edited at runtime.

Results Object

Happy allows result data to be sent from tasks executed on the cluster to the calling process through the happy.results dictionary. Any map or reduce task can write to happy.results using any key, and then all of the dictionaries are combined, and returned from HappyJob.run() as a single dictionary with lists of values for each key. Behind the scenes, the data files that are passed back are compressed, so a reasonable large amount of data can be returned quickly, but this won't work well if the results use up too much memory on the client process.

Input Format

Valid file input formats, are "text" (one value per line), "keyvalue" (one key-value pair per line, separated by a tab), or "sequence" (a binary compressed sequencefile), or "auto" (auto-detect sequence or keyvalue).

The default input format for Happy is "auto", which automatically detects whether the input is a tab-seperated key-value text file or a sequence file. If the input is a text file, the keys and values are passed through as Strings. If the input is a sequence file of Text values, they are also passed through as Strings, otherwise the native objects are passed through.

Compression

Hadoop will automatically handle compressed text files when the text or keyvalue inputformats are used as long as the input files have appropriate extensions. The supported formats and extensions are gzip (.gz), zlib (.deflate), and lzo (.lzo). Output compression of text and sequence files can be enabled by setting compressoutput=True. The codec can be selected by setting compressiontype to zlib, gzip, or lzo.

Sequence Files

Sequence Files are Hadoop's binary file format for storing and compressing sequential key-value data. You can tell a sequence file because the first three characters are SEQ followed by binary data. Sequence files store the Java classes for serializing the keys and values (most often these are type Text) and the codec used for compression. They are a fast and efficient way to store data that you're using for map-reduce jobs.

Enable sequence file compression by setting compressoutput=True. Sequence file compression can be set to BLOCK (default) or RECORD using the sequencetype parameter. Block compression allows sequence files to be split on a block boundary, and record compression allows sequence files to be split at any record. Block compression is significantly faster and more efficient than record compression.

Alternative Collectors

The happy.dfs module allows for alternative collectors other than the task collector. These are useful if you want to sort your output data into multiple directories, or want to store a large amount of data as a side-affect of your job. Partitioned collectors are collectors where the filename is automatically created based on the current task id.

JSON APIs

Happy includes fast APIs for encoding and decoding JSON data to native Python data structures. This is a convenient way to sort and serialize data in a portable and inspectable form.

Happy APIs

HappyJob Parameters

These are job parameters that can be set on happy.HappyJob.

jobname
A name for the job.
inputpaths
REQUIRED - A single input path or array of input paths in the DFS.
outputpath
REQUIRED - The output path in the DFS.
inputformat
The file input format, either text (one value per line), keyvalue (one key-value pair per line, separated by a tab), or sequence (a binary compressed sequencefile), or auto (auto-detect sequence or keyvalue). The default is auto.
outputformat
The file output format, either "text" (one key-value pair per line, separated by a tab), or "sequence" (a binary compressed sequencefile). The default is "text".
maptasks
The number of map tasks to run.
reducetasks
The number of reduce tasks to run. Set to 0 if you want to skip the reduce step.
localjob
Set to True if the job should run locally, pulling data from the DFS. Good for debugging, but be sure that you don't use a file that is too large.
compressoutput
Set to True to compress text and sequence file output. False by default.
compressiontype
Selects a compression codec for output. Valid values are gzip, zlib, and lzo (default).
sequencetype
Selects a sequence file compression mode. Valid values are RECORD, and BLOCK (default).
jobargs
Overrides and/or sets any hadoop job configuration parameters. Values should be entered as a dictionary of key/value pairs, where the key is the parameter name and the value is the value the parameter should be set to.

HappyJob Methods

At minimum, a job class needs a map(records, task) function to run. A reduce(key, values, task) function is required if HappyJob.reducetasks is greater than 0. Other functions that can be defined for the job are:

mapconfig()
If this function is defined, it is called before map is called.
mapclose()
If this function is defined, it is called after all map calls for the current task are done.
reduceconfig()
If this function is defined, it is called before reduce is called.
reduceclose()
If this function is defined, it is called after all reduce calls for the current task are done.
combineconfig()
If this function is defined, it is called before combine is called.
combine(key, values, task)
If this function is defined, it is called during the combine step. Map outputs local to the current box can be partially reduced before they are sorted and sent to the reducer.
combineclose()
If this function is defined, it is called after all combine calls for the current task are done.

task Object

The task object passed into the map and reduce functions is used to output data and get information about the current task.

task.collect(key, value)
Collects the key and value as output.

task.getInputPath() Returns the input path from which the current records are being read. This is useful if you're reading from multiple different files and want to have different code run depending on the input.

task.progress()
Reports progress back to the TaskTracker. Use this if you have a task that is going to take a very long time to complete.
task.setStatus(status)
Reports a status message back to the TaskTracker. Use this to change the message displayed on a task.

happy module

getJobConf()
Retrieves a Hadoop JobConf that is valid for the current task.
path
A list of files and directories that will be included with the current job and copied to the cluster. This is set from the HAPPY_PATH environment variable.
job
This is only set if the current context is in a server task. job.getJobConf() returns a Hadoop JobConf <http://hadoop.apache.org/core/docs/current/api/org/apache/hadoop/mapred/JobConf.html> that is valid for the current task. job.getTaskPartition() returns the current task partition for this context.
results
This is a dictionary of task results that can be set during a map or reduce task, and is passed back to the client process.

happy.log module

The Happy logging module integrates with Hadoop's built-in logging support, which uses Log4J and the Apache Commons Logging adapters. The log objects used in this module are all instances of the Apache Log API. An example usage can be seen in the wordcount example.

log
The default log object, with name "happy.task".
getLog(name)
Gets a named log instance. The name is prefixed with "happy".
setLevel(level)
Sets the happy logging level. Level names are, in order, "trace", "debug", "info", "warn", "error", and "fatal".

happy.json module

encode(o)
Encodes a Python dict, list, string, or other basic type to a JSON string.
decode(s)
Decodes a JSON string to a Python object.

happy.dfs module

Functions for accessing the Hadoop DFS and the local filesystem.

getFileSystem(fs="dfs")
Returns a Hadoop FileSystem object. Valid types are "dfs" (for the default filesystem) and "local".
read(path)
Returns a Python file-like object to read the specified DFS file or path. If a directory is given as a parameter, the returned object transparently iterates over all of the files in the directory.
write(path)
Returns a Python file-like object to write to the specified DFS file. DFS currently doesn't support appends, so you can only create new files using this method. Be sure to close the file or there will be write errors.
delete(path)
Deletes a DFS file or directory.
copyToLocal(path, localpath)
Copies a DFS file or directory to a local file.
copyFromLocal(localpath, path)
Copies a local file to a DFS file.
rename(src, dst)
Renames a file or path.
merge(path, dst)
Merges files in a specified DFS directory to a specified DFS file.
createCollector(path, fs="dfs", type="text", key="text", value="text", compressiontype=None, sequencetype="BLOCK"):
Creates an output collector which collects key value pairs at the specified path. Optional parameters are fs which can be dfs (default) for the HDFS filesystem or local for the local filesystem, type which can be text (default) or sequence, and additional parameters for configuring compression in a sequence file.
createPartitionedCollector(path, fs="dfs", type="text", key="text", value="text", compressiontype=None, sequencetype="BLOCK")
Creates an automatically partitioned output collector in the specified directory. The file is named based on the current task partition of the map or reduce task. Optional parameters are fs which can be dfs (default) for the HDFS filesystem or local for the local filesystem, type which can be text (default) or sequence, and additional parameters for configuring compression in a sequence file.
readSequenceFile(path, fs="dfs")
Opens a sequence file for reading, and returns an iterator over the (key, value) tuples.
getTaskPartition()
Returns an integer indicating which task partition is currently executing. This number will correspond to the map or reduce task number visible in the Hadoop job tracker. It returns -1 if not currently in a task.