Robin1- imgHadoop has, for good reasons, become the platform of choice for big data processing. It’s open source, it’s being developed rapidly, it runs on commodity hardware with no per-node licensing fees, and it has an active community and a steadily growing body of knowledge and experience built around it. All of which make adopting hadoop an easy sell for both start-ups and enterprise users who wish to migrate away from legacy database systems. No one been fired for running hadoop.

But it also pays, as the article referenced above discusses , to think back to where hadoop came from and the scale of the data processing that it was optimized for. Hadoop was Yahoo’s implementation of Google’s distributed file system and map-reduce API. Both Yahoo and Google process data on the terabyte or even petabyte scale, requiring that jobs be distributed over thousands or even tens of thousands of processing nodes.

Why Small-Big-Data Can be Beneficial

Most of us, though, are not working at that scale. Yes, in aggregate, the data that we process here at Baynote falls into the category of big data, but many of the jobs we run, particularly those for small to medium-sized customers, have data which will easily fit into RAM on a reasonably sized machine.

This has some useful consequences. In terms of machine learning algorithms, it means that we can use in-memory algorithms rather than distributed streaming algorithms. This is useful, because many algorithms do not (yet) exist in distributed streaming form. It also allows iterative algorithms to be used, without the overhead of starting up a map-reduce job for each iteration.

Nice though it is to leverage these in-memory algorithms it is even better to be able to integrate them into the same workflow that performs all of our other analyses and model building. This way we keep all the data on HDFS, both ensuring there’s only one store for all our data and reducing the complexity of the platform code that collects and pre-processes the data. You can see why we prefer to integrate these in-memory, iterative algorithms into hive scripts.

In an earlier blog post I wrote about doing this in a hive UDAF. This works, but has a number of limitations, mostly to do with memory. If the data is coming from a number of mappers, then the map-reduce framework decides which order to merge the output from the different mappers together. This means that, worse case, you have half your data merged in one place, half in another, and when these two are merged, you need memory for 1.5 times your data. (You need to make a deep copy of data coming in to a UDAF.) The other problem is that the JVM that is running the UDAF is also running hadoop, which eats up a lot of the memory assigned to the JVM.

An alternative approach is to do this with hive streaming. This allows you to output data from hive to standard input of an external program, and collect the output of the program back in to a hive table. This approach comes with overhead. For instance, the table entries have to be converted to strings to be streamed to the external program (which must then parse the input), and the output of the program comes back to hive as strings (which can then be parsed on read into simple types but not complex types). However, the advantages are many: the external program can be written in a memory efficient language, it’s a standalone program and therefore easier to debug than a UDAF, it can even return as many rows as it wants. For example, if we’re running cross-validation over a set of regularization parameters it can return a row for each one. And if the external program is doing a lot of computation, the overhead of parsing the strings may well be negligible.

To use hive streaming in this way is, in a sense, “abusing hadoop.” It means giving up all of the inherent parallelism and distributed nature of hdfs. But when it’s useful, it’s very useful. Hive streaming is described here, where there’s also the important note – even though the steps are called MAP and REDUCE, the query planner may not run them in the map and reduce phases, unless you’re a little bit careful.

Putting Hive Streaming into Practice: The Details

1. Add the executable files to the distributed cache. Because the map phase is doing nothing, we use a very simple perl script that just echoes back the input. The second executable is the machine learning algorithm.

add file /path/to/identity.pl;
add file /path/to/iterative/machine/learning/algorithm_executable;

2. Set up the table that’s going to hold the output. Remember that the machine learning algorithm is returning its output on standard out, and hive will only parse this into simple types.

drop table if exists output_from_iterative_ml_algorithm;
create output_from_iterative_ml_algorithm
       (A string, B string, C double, D string );

3. Run the job. The first phase is the no-op mapper. The second phase is the machine learning reducer. However, note the distribute by line – by distributing on a constant value (in this case the number of rows of training data, a useful input to the machine learning algorithm to avoid memory reallocation and copying) and setting the number of reduce tasks to 1, we force all of the input into a single copy of the machine learning executable.

set mapred.reduce.tasks=1;
from (
       from training_data_table
       MAP nrows, target, features
          USING 'identity.pl'
          as nrows, target, features,
          distribute by nrows
         ) map_output
insert overwrite table output_from_iterative_ml_algorithm
REDUCE nrows, target, features
       USING 'algorithm_executable'
       as A, B, C, D
;
set mapred.reduce.tasks=-1;

And there you have it, integrating an in-memory machine learning algorithm into a hive workflow.

What is the best way to go?

It’s been our experience that sometimes the UDAF approach works best, sometimes the hive streaming approach works best, and sometimes you just have to bite the bullet and build something fully-distributed.