部分业务场景需要将Hive(离线数仓)数据同步到Elasticsearch集群里,供在线业务使用; 或将Elasticsearch的数据写入Hive,用于离线分析。

完整步骤

1、确认目标Elasticsearch集群版本(以7.15.0为例)
2、前往maven仓库 下载和Elasticsearch版本一致的jar包

1
2
3
4
-- 测试阶段可先不下载,直接从maven仓库拉取即可
ADD JAR ivy://org.elasticsearch:elasticsearch-hadoop:7.15.0;
-- 生产环境建议上传至hdfs集群
-- ADD JAR /${path}/elasticsearch-hadoop-${version}.jar;

3、新建Hive外表。完整配置项及字段类型映射见Apache Hive integration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- DROP TABLE default.es_xxx
CREATE EXTERNAL TABLE IF NOT EXISTS default.es_xxx(
    doc_id STRING COMMENT 'ID',
    field1 STRING COMMENT '字段1',
    field2 STRING COMMENT '字段2',
    field3 BIGINT COMMENT '字段3'
)
COMMENT '表注释'
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.batch.write.refresh' = 'false',
    'es.index.auto.create' = 'false',
    'es.input.use.sliced.partitions' = 'false',
    -- es.mapping.id只能填一个字段,多个字段作为主键的话,可以在Hive SQL里拼接出一个字段
    'es.mapping.id' = 'doc_id',
    'es.mapping.exclude' = 'doc_id',
    'es.net.http.auth.user' = 'xxxx',
    'es.net.http.auth.pass' = 'xxxx',
    'es.nodes' = 'node1,node2',
    'es.nodes.wan.only' = 'true',
    'es.port' = 'xxx',
    'es.resource' = 'index',
    'es.scroll.size' = '10000',
    'es.write.operation' = 'upsert'
);

4、表建好之后就可以像操作普通Hive表一样,读写Elasticsearch数据了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
-- 写入
INSERT INTO
    default.es_xxx(doc_id, field1, field2, field3)
SELECT xxx,
    xxx,
    xxx,
    xxx
FROM default.source_table_xxx
WHERE
   dt='{@date}';

-- 读取
SELECT xxx,
    xxx,
    xx
FROM default.es_xxx
LIMIT 100;

5、进行修改表结构(如新增字段)或更改PROPERTIES(如更换索引、集群)等操作时,重建表即可。不会删除Elasticsearch集群的数据。

1
2
-- 非原生Hive表不支持通过ALTER语句修改
DROP TABLE default.es_xxx

遇到的问题

1、写了一部分数据,任务失败了。报错信息:Exception in thread “xxx” java.lang.NoClassDefFoundError: org/antlr/runtime/tree/CommonTree
原因:用作doc_id的字段存在NULL值。处理下这部分数据,任务就可以正常运行了。

2、如何限速
elasticsearch-hadoop 找了下没有直接限速方案。可通过控制计算引擎的worker数量来间接限速。

1
2
-- Hive on Spark
SET spark.executor.instances=xx;

参考文档

Elasticsearch 与 Hive 集成
https://github.com/elastic/elasticsearch-hadoop
Hive SQL Language Manual