Robin_Morris_299x299As I’ve written before, we do a lot of our development here at Baynote in Hive, allowing us to leverage the power of our Hadoop cluster, whilst insulating us from writing low-level map-reduce jobs. One question that comes up on the hive mailing list from time-to-time is how to implement machine learning algorithms within Hive. Twitter told us how to do it using Pig, by putting the learning algorithm into a a Pig storage function and here I want to show how Hive can be used in a similar manner for both on-line and batch machine learning algorithms. This enables us to easily incorporate machine learning algorithms into our Hive-based workflow.

First, let’s recall how Hadoop processes map-reduce jobs.

The figure shows A file containing the training data.  On a default hdfs installation, the file is split into 64MB blocks. Mappers.  One for each block. Reducers.  The number of reducers depends on the details of the computation being performed.

The figure shows

  • A file containing the training data. On a default hdfs installation, the file is split into 64MB blocks.
  • Mappers. One for each block.
  • Reducers. The number of reducers depends on the details of the computation being performed.

Each mapper reads the data from the block of the file assigned to it, and sends its output to the appropriate reducers. The reducers combine the inputs they receive into the final output which is written back to an hdfs file.

So we have two places where we can put machine learning algorithms, the mappers and the reducers, and these are, respectively, the ideal places to put on-line and batch learning algorithms. If the mappers contain on-line learning algorithms then an ensemble of models is learned, one model for each split of the file containing the training data. (The number of models in the ensemble can be controlled only indirectly, by changing the hdfs block size.) The reducers then just collect up the models and write them out to disk.

If a batch learning algorithm is being used, this is placed in the reducers. The mappers accumulate the data from their block and pass it on to the reducers. The reducers accumulate all the data passed to them (this may cause memory problems if too many training examples are passed to a single reducer), run the batch learning algorithm and write out the learned model. An ensemble of models can be learned by having the mappers split the data based on, for example, a randomly generated key. In this way the number of models in the ensemble can be controlled precisely.

In summary – for an on-line learning algorithm, we have no memory issues, but only indirect control over the number of models in the ensemble. For a batch learning algorithm, we can control easily the number of models, but the training data for each model must fit in memory.

The question now is how to implement this in Hive. The key is to put the machine learning algorithm into a UDAF. The template for a hive UDAF is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
 
public class UDAF_ML_Training extends UDAF {
 
    // State class used to store the aggregation results
    public static class State {
 
        public State(){
        }
 
. . .
    }
 
    public static class UDAF_ML_TrainingEvaluator implements UDAFEvaluator {
 
        // constructor
        public UDAF_ML_TrainingEvaluator(){
            super();
        }
 
        // reset the state of the aggregation
        public void init(){
        }
 
        // process one row of the training data
        public boolean iterate(  ){
            return true;
        }
 
        // Terminate a partial aggregation and return the state
        public State terminatePartial(){
            return state;
        }
 
        // merge the results of a partial aggregation
        public boolean merge(State a){
            return true;
        }
 
        // terminate the aggregation and return the final result
        public ReturnType terminate(){
            return new ReturnType();
        } 
    } 
}

It’s instructive to add comments to these functions so that we can track which functions are called by the mapper, and which by the reducer.

The UDAF is called using a Hive command of the form

> create temporary function ml_training as UDAF_ML_Training;
> create table ml_parameters as select ml_training(target, features) 
as parameters from training_data_table;

On-Line Learning

Let’s first consider an on-line algorithm. In this case the State class would hold the parameters of the model. The Hadoop framework would spin up a mapper for each block of the hdfs file, and each mapper would have its own instance of the State variable. This holds the parameters of the member of the ensemble of models learned from the data in that mapper’s block. Each call to the iterate() function would update these parameters as each row of the training data table is read. The iterate() function would be the place to put a Stochastic Gradient Descent algorithm, for example.

When the last row of the training data for a particular mapper has been processed by iterate(), the terminatePartial() function is called in that mapper, which passes the State variable to the reducer. Within the reducer, these are collected up by being passed to the merge() function – the reducer starts with an empty State variable, and the State variables passed in from the multiple mappers are collected up. For this reason the State variable is often a collection (for example an ArrayList<>) of model parameters, so that the parameters of the separate models coming from the different terminatePartial() calls can be easily merge()d together.

Finally, terminate() is called in the reducer. For an on-line learning algorithm, it simply returns the collection of learned models.

Batch Learning

For a batch learning algorithm, the flow is a little different. In this case the State must have containers to store the training examples – the training examples must be collected up to be passed to the batch learning algorithm.

Calls to the iterate() function now just add the new training example to the collection stored in the mapper’s State variable. When the mapper has read the last training example in its split of the input file, the call to terminatePartial() returns the collected data in the State variable, and passes it to the reducer.

The merge() function merges together the data that has been collected up from each mapper into a single collection in the reducer. Clearly this may cause problems with heap space on the reducer. We can split the data over multiple reducers, and learn an ensemble of models with a Hive command of the form:

> create table training_data_table_with_key as select cast(number_of_partitions*rand() as int) 
as key, * from training_data_table;
> set mapred.reduce.tasks=number_of_partitions;
> create table ml_parameters as select key, ml_training(target,features) as parameters from 
training_data_table_with_key group by key;

Now, the mappers divide up the training data according to the key, and the data for each value of the key is passed to a different reducer. The parameter number_of_partitions controls the number of models in the learned ensemble.

Finally, terminate() is called in the reducer. For a batch algorithm, this is where the learning algorithm is called, as it now has access to all of the data that has been accumulated. Once learning is complete, the model is returned and the UDAF terminates.

We have successfully used these approaches to integrate machine learning algorithms into our Hive-based workflow. Despite the limitations imposed by the framework that I’ve discussed, it provides a practical way to proceed. As always, however, the key to getting a useful model is the choice of features fed to the learning algorithm. More on that another time.