部分业务场景需要将Hive(离线数仓)数据同步到Elasticsearch集群里,供在线业务使用;
或将Elasticsearch的数据写入Hive,用于离线分析。
完整步骤
1、确认目标Elasticsearch集群版本(以7.15.0为例)
2、前往maven仓库
下载和Elasticsearch版本一致的jar包
1
2
3
4
| -- 测试阶段可先不下载,直接从maven仓库拉取即可
ADD JAR ivy://org.elasticsearch:elasticsearch-hadoop:7.15.0;
-- 生产环境建议上传至hdfs集群
-- ADD JAR /${path}/elasticsearch-hadoop-${version}.jar;
|
3、新建Hive外表。完整配置项及字段类型映射见Apache Hive integration
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| -- DROP TABLE default.es_xxx
CREATE EXTERNAL TABLE IF NOT EXISTS default.es_xxx(
doc_id STRING COMMENT 'ID',
field1 STRING COMMENT '字段1',
field2 STRING COMMENT '字段2',
field3 BIGINT COMMENT '字段3'
)
COMMENT '表注释'
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.batch.write.refresh' = 'false',
'es.index.auto.create' = 'false',
'es.input.use.sliced.partitions' = 'false',
-- es.mapping.id只能填一个字段,多个字段作为主键的话,可以在Hive SQL里拼接出一个字段
'es.mapping.id' = 'doc_id',
'es.mapping.exclude' = 'doc_id',
'es.net.http.auth.user' = 'xxxx',
'es.net.http.auth.pass' = 'xxxx',
'es.nodes' = 'node1,node2',
'es.nodes.wan.only' = 'true',
'es.port' = 'xxx',
'es.resource' = 'index',
'es.scroll.size' = '10000',
'es.write.operation' = 'upsert'
);
|
4、表建好之后就可以像操作普通Hive表一样,读写Elasticsearch数据了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| -- 写入
INSERT INTO
default.es_xxx(doc_id, field1, field2, field3)
SELECT xxx,
xxx,
xxx,
xxx
FROM default.source_table_xxx
WHERE
dt='{@date}';
-- 读取
SELECT xxx,
xxx,
xx
FROM default.es_xxx
LIMIT 100;
|
5、进行修改表结构(如新增字段)或更改PROPERTIES(如更换索引、集群)等操作时,重建表即可。不会删除Elasticsearch集群的数据。
1
2
| -- 非原生Hive表不支持通过ALTER语句修改
DROP TABLE default.es_xxx
|
遇到的问题
1、写了一部分数据,任务失败了。报错信息:Exception in thread “xxx” java.lang.NoClassDefFoundError: org/antlr/runtime/tree/CommonTree
原因:用作doc_id的字段存在NULL值。处理下这部分数据,任务就可以正常运行了。
2、如何限速
在elasticsearch-hadoop
找了下没有直接限速方案。可通过控制计算引擎的worker数量来间接限速。
1
2
| -- Hive on Spark
SET spark.executor.instances=xx;
|
参考文档
Elasticsearch 与 Hive 集成
https://github.com/elastic/elasticsearch-hadoop
Hive SQL Language Manual