实现原理
使用Cgo将Go代码编译成动态链接库,再通过Java调用动态链接库来开发Hive UDF。
完整示例
https://github.com/superbear/hive-udf-in-go
Hive UDF简介
UDF是User Defined Function的简称,即用户自定义函数。按功能分,Hive
的UDF可分为以下三类:
UDF分类 | 描述 |
---|
UDF(User Defined Scalar Function) | 自定义标量函数。输入一行,输出一行 |
UDTF(User Defined Table-valued Function) | 自定义表值函数。输入一行,输出多行 |
UDAF(User Defined Aggregation Function) | 自定义聚合函数。输入多行,输出一行 |
使用场景
- 无法通过Hive内置函数实现
- 处理逻辑复杂且相对固化
本文介绍的是第一种,即自定义标量函数。
场景举例
某个字段是用Go SDK加密的,无法通过Hive内置函数解密,且没有Java SDK。
几种解决方案
Transform
支持运行自定义Map/Reduce脚本。原理是worker启一个子进程,子进程从标准输入读取数据,将处理结果写到标准输出。
实现起来比较简单,理论上支持标准输入输出的程序都可以,awk都行。
Python、PHP这类解释型语言的话,需要配置好环境。
拿Go举例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| package main
import (
"bufio"
"fmt"
"os"
"strings"
)
func main() {
reader := bufio.NewReader(os.Stdin)
for {
input, err := reader.ReadString('\n')
if err != nil {
break
}
// 输入/输出多个字段通过制表符分隔
list := strings.Split(input, "\t")
// 处理逻辑
// 可以返回多行或一行,行尾记得输出换行符
fmt.Println(strings.Join(list, "\t"))
}
}
|
1
| GOOS=linux GOARCH=amd64 go build main.go
|
编译产出体积比较大?压缩方法:build时加-ldflags “-s -w"参数,再使用upx
对编译产出进行二次压缩。
1
2
3
4
5
| -- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform
-- SELECT里只能写一个TRANSFORM,不能有其他字段
SELECT TRANSFORM(stuff)
USING 'script'
AS (thing1 INT, thing2 INT)
|
Java SDK + Java UDF
UDF基类已标注废弃,改用GenericUDF吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| // 参考:https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToString.java
package com.hive.udf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
// 帮助信息,可通过DESC FUNCTION xxx查看
@Description(name = "upper", value = "_FUNC_(str) - string to upper")
public class GenericUDFUpper extends GenericUDF {
private transient StringObjectInspector inputOI0;
// 初始化的时候校验参数数量及类型 返回值类型
@Override
public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
if (arg0.length != 1) {
throw new UDFArgumentLengthException("UPPER requires 1 argument, got " + arg0.length);
}
// create an ObjectInspector for the input
ObjectInspector input0 = arg0[0];
// check to make sure the input is a string
if (!(input0 instanceof StringObjectInspector)) {
throw new UDFArgumentTypeException(0, "UPPER only takes string, got " + input0.getTypeName());
}
this.inputOI0 = (StringObjectInspector) input0;
// 返回值类型
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arg0) throws HiveException {
if (arg0[0].get() == null) {
return null;
}
String str = this.inputOI0.getPrimitiveJavaObject(arg0[0].get()).toString();
return str.toUpperCase();
}
@Override
public String getDisplayString(String[] children) {
assert (children.length == 1);
return getStandardDisplayString("upper", children);
}
}
|
打包成jar之后上传至hdfs。
1
2
3
4
5
6
| -- https://cwiki.apache.org/confluence/display/Hive/HivePlugins
ADD JAR hdfs://path/to/jar;
CREATE TEMPORARY FUNCTION upper as 'com.hive.udf.Upper';
SELECT upper(xxx) FROM xxx WHERE;
|
Go SDK + Java UDF
没有Java SDK。在方案二的基础上,Java SDK改成跨语言调用Go shared libraries。可参考:Java调用Go
。
方案对比
方案 | 优点 | 缺点 |
---|
Transform | 简单,语言限制较少;复用已有SDK | 子进程,开销较大,离线处理可接受一定延迟;返回多列时使用不是很方便,Hive知道的上下文比较少,不知道输出是多行还是一行,所以限制了SELECT里只能直接写Transform语句(一条)?一般是不处理的字段原样(字符串)返回。怎么知道哪些字段不需要处理呢?可以给脚本传参数。 |
Java SDK + Java UDF | 原生实现,性能最好;使用上和内置函数区别不大 | 需要了解Java和Hive UDF API;可能需要移植下其他语言的处理逻辑 |
Go SDK + Java UDF | 原生实现,性能好;复用已有SDK | Java调用本地Native shared libraries,有一定开销;需要了解Java和Hive UDF API |
总结:没想到很完美的方案。方案一实现简单,使用起来不是很方便;
而方案二性能最好,有一定学习成本,可能有SDK移植成本(人力成本),方案三比方案二少了SDK移植的过程,增加了跨语言调用的成本(机器成本);
目前没有Java SDK,最终选了方案三,不需要维护多语言的SDK。
参考资料
https://github.com/apache/hive/tree/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic
https://cwiki.apache.org/confluence/display/Hive/HivePlugins