部分业务场景需要将Hive(离线数仓)数据同步到Elasticsearch集群里,供在线业务使用; 或将Elasticsearch的数据写入Hive,用于离线分析。

完整步骤

1、确认目标Elasticsearch集群版本(以7.15.0为例)
2、前往maven仓库 下载和Elasticsearch版本一致的jar包

1
2
3
4
-- 测试阶段可先不下载,直接从maven仓库拉取即可
ADD JAR ivy://org.elasticsearch:elasticsearch-hadoop:7.15.0;
-- 生产环境建议上传至hdfs集群
-- ADD JAR /${path}/elasticsearch-hadoop-${version}.jar;

3、新建Hive外表。完整配置项及字段类型映射见Apache Hive integration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- DROP TABLE default.es_xxx
-- 仅同步数据到Elasticsearch可建临时外表,在当前任务session中有效
-- CREATE TEMPORARY EXTERNAL TABLE
CREATE EXTERNAL TABLE IF NOT EXISTS default.es_xxx(
    doc_id STRING COMMENT 'ID',
    field1 STRING COMMENT '字段1',
    field2 STRING COMMENT '字段2',
    field3 BIGINT COMMENT '字段3'
)
COMMENT '表注释'
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.batch.write.refresh' = 'false',
    'es.index.auto.create' = 'false',
    'es.input.use.sliced.partitions' = 'false',
    -- es.mapping.id只能填一个字段,多个字段作为主键的话,可以在Hive SQL里拼接出一个字段
    'es.mapping.id' = 'doc_id',
    'es.mapping.exclude' = 'doc_id',
    'es.net.http.auth.user' = 'xxxx',
    'es.net.http.auth.pass' = 'xxxx',
    'es.nodes' = 'node1,node2',
    'es.nodes.wan.only' = 'true',
    'es.port' = 'xxx',
    'es.resource' = 'index',
    'es.batch.size.bytes' = '1mb', -- bulk size大小,默认1mb,可通过这两个参数限速
    'es.batch.size.entries' = '1000', -- bulk操作单批次大小
    'es.write.operation' = 'upsert'
);

4、表建好之后就可以像操作普通Hive表一样,读写Elasticsearch数据了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
-- 写入
INSERT INTO
    default.es_xxx(doc_id, field1, field2, field3)
SELECT xxx,
    xxx,
    xxx,
    xxx
FROM default.source_table_xxx
WHERE
   dt='{@date}';

-- 读取
SELECT xxx,
    xxx,
    xx
FROM default.es_xxx
LIMIT 100;

5、进行修改表结构(如新增字段)或更改PROPERTIES(如更换索引、集群)等操作时,重建表即可。不会删除Elasticsearch集群的数据。

1
2
-- 非原生Hive表不支持通过ALTER语句修改
DROP TABLE default.es_xxx

遇到的问题

1、写了一部分数据,任务失败了。报错信息:Exception in thread “xxx” java.lang.NoClassDefFoundError: org/antlr/runtime/tree/CommonTree
原因:用作doc_id的字段存在NULL值。处理下这部分数据,任务就可以正常运行了。

2、如何限速

  • 限制同时运行的executor数量。如Hive On Spark引擎可通过spark.dynamicAllocation.maxExecutors参数限制;

  • 限制单次写es的数据量,可通过es.batch.size.bytes、es.batch.size.entries参数限制单次写入条数来限速。更多配置详见:Elasticsearch for Apache Hadoop Configuration

参考文档

Elasticsearch 与 Hive 集成
Elasticsearch for Apache Hadoop Configuration
https://github.com/elastic/elasticsearch-hadoop
Hive SQL Language Manual