实现原理

使用Cgo将Go代码编译成动态链接库,再通过Java调用动态链接库来开发Hive UDF。

完整示例

https://github.com/superbear/hive-udf-in-go

Hive UDF简介

UDF是User Defined Function的简称,即用户自定义函数。按功能分,Hive 的UDF可分为以下三类:

UDF分类描述
UDF(User Defined Scalar Function)自定义标量函数。输入一行,输出一行
UDTF(User Defined Table-valued Function)自定义表值函数。输入一行,输出多行
UDAF(User Defined Aggregation Function)自定义聚合函数。输入多行,输出一行

使用场景

  • 无法通过Hive内置函数实现
  • 处理逻辑复杂且相对固化

本文介绍的是第一种,即自定义标量函数。

场景举例

某个字段是用Go SDK加密的,无法通过Hive内置函数解密,且没有Java SDK。

几种解决方案

Hive Transform函数

Transform 支持运行自定义Map/Reduce脚本。原理是worker启一个子进程,子进程从标准输入读取数据,将处理结果写到标准输出。 实现起来比较简单,理论上支持标准输入输出的程序都可以,awk都行。 Python、PHP这类解释型语言的话,需要配置好环境。

拿Go举例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
    "bufio"
    "fmt"
    "os"
    "strings"
)

func main() {
    reader := bufio.NewReader(os.Stdin)
    for {
	input, err := reader.ReadString('\n')
	if err != nil {
	    break
	}

	// 输入/输出多个字段通过制表符分隔
	list := strings.Split(input, "\t")
	// 处理逻辑
	// 可以返回多行或一行,行尾记得输出换行符
	fmt.Println(strings.Join(list, "\t"))
    }
}
1
GOOS=linux GOARCH=amd64 go build main.go

编译产出体积比较大?压缩方法:build时加-ldflags “-s -w"参数,再使用upx 对编译产出进行二次压缩。

1
2
3
4
5
-- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform
-- SELECT里只能写一个TRANSFORM,不能有其他字段
SELECT TRANSFORM(stuff)
USING 'script'
AS (thing1 INT, thing2 INT)

Java SDK + Java UDF

UDF基类已标注废弃,改用GenericUDF吧1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 参考:https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToString.java
package com.hive.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

// 帮助信息,可通过DESC FUNCTION xxx查看
@Description(name = "upper", value = "_FUNC_(str) - string to upper")
public class GenericUDFUpper extends GenericUDF {
    private transient StringObjectInspector inputOI0;

    // 初始化的时候校验参数数量及类型 返回值类型
    @Override
    public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
	if (arg0.length != 1) {
	    throw new UDFArgumentLengthException("UPPER requires 1 argument, got " + arg0.length);
	}
	// create an ObjectInspector for the input
	ObjectInspector input0 = arg0[0];
	// check to make sure the input is a string
	if (!(input0 instanceof StringObjectInspector)) {
	    throw new UDFArgumentTypeException(0, "UPPER only takes string, got " + input0.getTypeName());
	}
	this.inputOI0 = (StringObjectInspector) input0;

	// 返回值类型
	return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] arg0) throws HiveException {
	if (arg0[0].get() == null) {
	    return null;
	}

	String str = this.inputOI0.getPrimitiveJavaObject(arg0[0].get()).toString();

	return str.toUpperCase();
    }

    @Override
    public String getDisplayString(String[] children) {
	assert (children.length == 1);
	return getStandardDisplayString("upper", children);
    }
}

打包成jar之后上传至hdfs。

1
2
3
4
5
6
-- https://cwiki.apache.org/confluence/display/Hive/HivePlugins
ADD JAR hdfs://path/to/jar;

CREATE TEMPORARY FUNCTION upper as 'com.hive.udf.Upper';

SELECT upper(xxx) FROM xxx WHERE;

Go SDK + Java UDF

没有Java SDK。在方案二的基础上,Java SDK改成跨语言调用Go shared libraries。可参考:Java调用Go

方案对比

方案优点缺点
Transform简单,语言限制较少;复用已有SDK子进程,开销较大,离线处理可接受一定延迟;返回多列时使用不是很方便,Hive知道的上下文比较少,不知道输出是多行还是一行,所以限制了SELECT里只能直接写Transform语句(一条)?一般是不处理的字段原样(字符串)返回。怎么知道哪些字段不需要处理呢?可以给脚本传参数。
Java SDK + Java UDF原生实现,性能最好;使用上和内置函数区别不大需要了解Java和Hive UDF API;可能需要移植下其他语言的处理逻辑
Go SDK + Java UDF原生实现,性能好;复用已有SDKJava调用本地Native shared libraries,有一定开销;需要了解Java和Hive UDF API

总结:没想到很完美的方案。方案一实现简单,使用起来不是很方便; 而方案二性能最好,有一定学习成本,可能有SDK移植成本(人力成本),方案三比方案二少了SDK移植的过程,增加了跨语言调用的成本(机器成本);

目前没有Java SDK,最终选了方案三,不需要维护多语言的SDK。

参考资料

https://github.com/apache/hive/tree/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic

https://cwiki.apache.org/confluence/display/Hive/HivePlugins