Robin Morris

(How to convert an array<struct<target:
bigint,quantity:int,price:float>>

into an
array<struct<target:
bigint,quantity:int,price:float,externalid:string>>
 )

If you’ve been reading my blog posts over the last few months, you will have noticed that they’ve been focused more on the issues in the engineering side of Baynote, and less on the technical details. This blog post is an exception. It’s going to get very technical very quickly. Sorry.

The back-end component of our platform is being built on top of a Hadoop cluster, and we do a lot of our model building and analytics using Hive. This insulates us from having to write low-level map-reduce jobs, and makes our algorithm development more efficient. We find it to be an extremely useful tool, but sometimes lacking in documentation.

One of the data items that we capture from our customers is the details of the purchases that their website users make. The tags (see Why Amazon and Netflix Have it Easy) send back the quantity purchased, the item’s price, and the customer’s identifier for the item. We translate the customer identifier into a Baynote-internal identifier, and store the purchase in our Hive table as an array<struct<target:bigint,quantity:int,price:float>> where target is the internal identifier.

Now it turns out that sometimes customers put new items on their web sites before we get the new catalog that tells us about them. In these cases, when creating the entries in the Hive table, we have no mapping from the customer’s identifier to the internal Baynote identifier. So we decided to change the schema in the table to array<struct<target:bigint,quantity:int,price:float,externalid:string>>, so that if we have an externalid with no current target, we could perform the lookup later.

So we have a change in the table schema, and so our data is necessarily split over two tables, one before the change, and one after. It’s very inconvenient to have the basic data that all models and analytics are based on split over two tables. But it is not a major problem, because Hive supports views, and it’s easy to create a view that uses a UNION ALL to merge the two – but only if you can translate the old schema into the new one.

So, how to convert an array<struct<target:bigint,quantity:int,price:float>> into an array<struct<target:bigint,quantity:int,price:float,externalid:string>> ? Unfortunately HiveQL has limited support for creating structs and arrays, so it’s time to turn to a UDF.

Hive supports two types of UDF. Simple UDFs [https://cwiki.apache.org/Hive/hiveplugins.html] and GenericUDFs. Simple UDFs are, well, simple. Easy to write, but limited in the types of data that can be input to them. In particular, they don’t deal with structs. GenericUDFs are the swiss-army knife of UDFs, but the documentation for them is very scant. Especially for dealing with complex inputs. There’s a tutorial for writing UDAFs at https://cwiki.apache.org/Hive/genericudafcasestudy.html, and a posting describing developing a serde (which uses a lot of the same techniques as a GenericUDF when it comes to accessing the data) at http://www.congiu.com/a-json-readwrite-serde-for-hive/. But, as evidenced by the lack of an answer to the stackoverflow question http://stackoverflow.com/questions/10895933/how-to-handle-nested-struct-in-hive-udf there’s a lack of readily available information regarding UDFs and complex data structures.

Here’s what goes in to a GenericUDF that takes as input an array of structs and outputs an array of a different type of struct.

Before we start, a few notes:

  1. A UDF gets access to the data passed to it via ObjectInspectors, so the ObjectInspector concept is very important.
  2. Many of the functions that are used in GenericUDFs, in particular the various subclasses of ObjectInspector, return Objects. This means that
    1. the compiler is of very little help when it comes to type-checking during development
    2. it’s sometimes hard to find out what the actual type that’s returned really is. The hadoop task logs can be very useful here, as we’ll see later.

All user-defined GenericUDFs inherit from the GenericUDF class, and so must implement the following three methods:

  1. public ObjectInspector initialize(ObjectInspector[] arguments)
  2. public Object evaluate(DeferredObject[] arguments)
  3. public String getDisplayString(String[] children)

initialize() is called the first time the UDF is invoked. It is typically used to do four things, only the first two of which are obvious.

  1. Verify that the input is of the type expected (in our case array<struct<target:bigint,quantity:int,price:float>> )
  2. Set up and return an ObjectInspector for the type of the output of the UDF. For a UDF that deals with struct<>s this is where the names of the elements of the structs in the output are defined.
  3. Store in global variables the ObjectInspectors for the elements of the input
  4. Set up the storage variable for the output.

Step 3 isn’t strictly necessary, as the ObjectInspectors could be set up in the call to evaluate(), but setting them up in the call to initialize() means that this is only done once.

evaluate() gets passed the input, does whatever it wants to it, and then returns the output. The input is accessed using the ObjectInspectors that were saved into global variables in the call to initialize(). How to store and return the output is also important – the signature for the function says that it returns an Object, so we need to know what sorts of Object Hive will not object to when it gets them. What’s important for this particular UDF is the way in which array<> and struct<> are represented so that they can be passed back to hive.

  1. array<> is represented by java ArrayList<>
  2. struct<> is represented by java Object[], and only the values are stored in the Object array (the names are defined in the ObjectInspector for the output that was set up in initialize() ). The elements must be stored in the same order as the fields were added to the output ObjectInspector.

Other than that, use hadoop’s Writable and Text classes rather than java primitives.

getDisplayString() returns the string that will be returned when explain is used.

So, that said, here’s the code, to which I’ve added a lot of comments explaining what’s going on and how the code fits with what I’ve written above.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.lazy.LazyLong;
import org.apache.hadoop.hive.serde2.lazy.LazyInteger;
import org.apache.hadoop.hive.serde2.lazy.LazyFloat;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
 
import java.util.ArrayList;
 
public class GenericUDFAddExternalIdToPurchaseDetails extends GenericUDF {
 
     // the return variable. Java Object[] become hive struct<>. Java ArrayList<> become hive array<>.
     // The return variable only holds the values that are in the struct<>. The field names
     // are defined in the ObjectInspector that is returned by the initialize() method.
     private ArrayList ret;
 
     // Global variables that inspect the input.
     // These are set up during the initialize() call, and are then used during the
     // calls to evaluate()
     //
     // ObjectInspector for the list (input array<>)
     // ObjectInspector for the struct<>
     // ObjectInspectors for the elements of the struct<>, target, quantity and price
     private ListObjectInspector loi;
     private StructObjectInspector soi;
     private ObjectInspector toi, qoi, poi;
 
     @Override
     // This is what we do in the initialize() method:
     // Verify that the input is of the type expected
     // Set up the ObjectInspectors for the input in global variables
     // Initialize the output ArrayList
     // Return the ObjectInspector for the output
     public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
 
          // Verify the input is of the required type.
          // Set the global variables (the various ObjectInspectors) while we're doing this
 
          // Exactly one input argument
          if( arguments.length != 1 )
          throw new UDFArgumentLengthException("AddExternalIdToPurchaseDetails() accepts exactly one argument.");
 
          // Is the input an array<>
          if( arguments[0].getCategory() != ObjectInspector.Category.LIST )
          throw new UDFArgumentTypeException(0,"The single argument to AddExternalIdToPurchaseDetails should be "
          + "Array<Struct>"
          + " but " + arguments[0].getTypeName() + " is found");
 
          // Is the input an array<struct<>>
          // Get the object inspector for the list(array) elements; this should be a StructObjectInspector
          // Then check that the struct has the correct fields.
          // Also, store the ObjectInspectors for use later in the evaluate() method
          loi = ((ListObjectInspector)arguments[0]);
          soi = ((StructObjectInspector)loi.getListElementObjectInspector());
 
          // Are there the correct number of fields?
          if( soi.getAllStructFieldRefs().size() != 3 )
               throw new UDFArgumentTypeException(0,"Incorrect number of fields in the struct. "
                                                     + "The single argument to AddExternalIdToPurchaseDetails should be "
                                                     + "Array<Struct>"
                                                     + " but " + arguments[0].getTypeName() + " is found");
 
          // Are the fields the ones we want?
          StructField target = soi.getStructFieldRef("target");
          StructField quantity = soi.getStructFieldRef("quantity");
          StructField price = soi.getStructFieldRef("price");
 
          if( target==null )
               throw new UDFArgumentTypeException(0,"No \"target\" field in input structure "+arguments[0].getTypeName());
          if( quantity==null )
               throw new UDFArgumentTypeException(0,"No \"quantity\" field in input structure "+arguments[0].getTypeName());
          if( price==null )
               throw new UDFArgumentTypeException(0,"No \"price\" field in input structure "+arguments[0].getTypeName());
 
          // Are they of the correct types? (primitives WritableLong, WritableInt, WritableFloat)
          // We store these Object Inspectors for use in the evaluate() method.
          toi = target.getFieldObjectInspector();
          qoi = quantity.getFieldObjectInspector();
          poi = price.getFieldObjectInspector();
 
          // First, are they primitives?
          if(toi.getCategory() != ObjectInspector.Category.PRIMITIVE )
               throw new UDFArgumentTypeException(0,"Is input primitive? target field must be a bigint; found "+toi.getTypeName());
          if(qoi.getCategory() != ObjectInspector.Category.PRIMITIVE )
               throw new UDFArgumentTypeException(0,"Is input primitive? quantity field must be an int; found "+toi.getTypeName());
          if(poi.getCategory() != ObjectInspector.Category.PRIMITIVE )
               throw new UDFArgumentTypeException(0,"Is input primitive? price field must be a float; found "+toi.getTypeName());
 
          // Second, are they the correct type of primitive?
          if( ((PrimitiveObjectInspector)toi).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG )
               throw new UDFArgumentTypeException(0,"Is input correct primitive? target field must be a bigint; found "+toi.getTypeName());
          if( ((PrimitiveObjectInspector)qoi).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT )
               throw new UDFArgumentTypeException(0,"Is input correct primitive? target field must be an int; found "+toi.getTypeName());
          if( ((PrimitiveObjectInspector)poi).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.FLOAT )
               throw new UDFArgumentTypeException(0,"Is input correct primitive? price field must be a float; found "+toi.getTypeName());
 
          // If we get to here, the input is an array<struct>
 
          // HOW TO RETURN THE OUTPUT?
          // A struct<> is stored as an Object[], with the elements being ,,...
          // See GenericUDFNamedStruct
          // The object inspector that we set up below and return at the end of initialize() takes care of the names,
          // so the Object[] only holds the values.
          // A java ArrayList is converted to a hive array<>, so the output is an ArrayList
          ret = new ArrayList();
 
          // Now set up and return the object inspector for the output of the UDF
 
          // Define the field names for the struct<> and their types
          ArrayList structFieldNames = new ArrayList();
          ArrayList structFieldObjectInspectors = new ArrayList();
 
          structFieldNames.add("target");
          structFieldNames.add("quantity");
          structFieldNames.add("price");
          structFieldNames.add("externalId");
 
          // To get instances of PrimitiveObjectInspector, we use the PrimitiveObjectInspectorFactory
          structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableLongObjectInspector );
          structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableIntObjectInspector );
          structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableFloatObjectInspector );
          structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector );
 
          // Set up the object inspector for the struct<> for the output
          StructObjectInspector si2;
          si2 = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
 
          // Set up the list object inspector for the output, and return it
          ListObjectInspector li2;
          li2 = ObjectInspectorFactory.getStandardListObjectInspector( si2 );
          return li2;
     }
 
     @Override
     // The evaluate() method. The input is passed in as an array of DeferredObjects, so that
     // computation is not wasted on deserializing them if they're not actually used
     public Object evaluate(DeferredObject[] arguments) throws HiveException {
 
          // Empty the return array (re-used between calls)
          ret.clear();
 
          // Should be exactly one argument
          if( arguments.length!=1 )
          return null;
 
          // If passed a null, return a null
          if( arguments[0].get()==null )
          return null;
 
          // Iterate over the elements of the input array
          // Convert the struct<>'s to the new format
          // Put them into the output array
          // Return the output array
 
          int nelements = loi.getListLength(arguments[0].get());
          for( int i=0; i
 
               // getStructFieldData() returns an Object; however, it's actually a LazyLong
               // (How do we know it's a LazyLong, as the documentation says that getStructFieldData() returns an Object?
               //We know, because during development, the error in the hadoop task log was "can't cast LazyLong to ...")
               // How do you get the data out of a LazyLong? Using a LongObjectInspector...
               LazyLong LLtarget = (LazyLong)(soi.getStructFieldData( loi.getListElement(arguments[0].get(),i), soi.getStructFieldRef("target")));
               long tt = ((LongObjectInspector)toi).get( LLtarget );
 
               LazyInteger LIquantity = (LazyInteger)(soi.getStructFieldData( loi.getListElement(arguments[0].get(),i), soi.getStructFieldRef("quantity")));
               int qq = ((IntObjectInspector)qoi).get( LIquantity );
 
               LazyFloat LFprice = (LazyFloat)(soi.getStructFieldData( loi.getListElement(arguments[0].get(),i), soi.getStructFieldRef("price")));
               float pp = ((FloatObjectInspector)poi).get( LFprice );
 
               // The struct<> we're returning is stored as an Object[] of length 4 (it has 4 fields)
               Object[] e;
               e = new Object[4];
 
               // The field values must be inserted in the same order as defined in the ObjectInspector for the output
               // The fields must also be hadoop writable/text classes
               e[0] = new LongWritable(tt);
               e[1] = new IntWritable(qq);
               e[2] = new FloatWritable(pp);
               e[3] = new Text();
 
               ret.add(e);
 
          }
 
          return ret;
     }
 
     @Override
     public String getDisplayString(String[] children) {
          assert( children.length>0 );
 
          StringBuilder sb = new StringBuilder();
          sb.append("AddExternalIdToPurchaseDetails(");
          sb.append(children[0]);
          sb.append(")");
 
          return sb.toString();
     }
}

So there you have it. A Hive GenericUDF that does stuff with array<struct<>> Hive data. Hope it’s interesting/useful.

[And I’ll send a Baynote t-shirt to the first person who emails me (rdm at baynote.com) pointing out where the input validation is incomplete.]