通过HBase Observer同步数据到ElasticSearch
原文 http://guoze.me/2015/04/23/hbase-observer-sync-elasticsearch/
Observer希望解决的问题
众所周知,HBase是一个分布式的存储体系,数据按照RowKey分成不同的Region,再分配给RegionServer管理。但是 RegionServer只承担了存储的功能,如果Region能拥有一部分的计算能力,从而实现一个HBase框架上的MapReduce,那 HBase的操作性能将进一步提升。正是为了解决这一问题,HBase 0.92版本后推出了Coprocessor — 协处理器,一个工作在Master/RegionServer中的框架,能运行用户的代码,从而灵活地完成分布式数据处理的任务。
Coprocessor包含两个组件,一个是EndPoint(类似关系型数据库的存储过程),用以加快特定查询的响应,另一个就是 Observer(类似关系型数据库的触发器)。Observer也分为几个类型,其中RegionObserver提供了一组表数据操作的钩子函数,覆盖了Get、Put、Scan、Delete等操作(通常有pre和post两种情况,表示在操作发生之前或发生之后),我们可以通过重载这些钩子函数,利用RegionServer实现特定的数据处理需求。
应用场景
我们在同一批主机上同时建立了一个HBase集群和一个ElasticSearch集群,然后存储到HBase的数据必须实时地同步到ElasticSearch。而恰好HBase和ElasticSearch都没有更新的概念,我们的需求可以简化为两步:
- 当一个新的Put操作产生时,将Put数据转化为json,索引到ElasticSearch,并把RowKey作为新文档的ID
- 当一个新的Delete操作产生时,获取Delete数据的RowKey,删除ElasticSearch中对应的ID
Java实现
Observer的Java实现并不复杂,只需要继承BaseRegionObserver的基类,然后重载postPut和postDelete两个函数。考虑到未来HBase的写入比较频繁,我们利用ElasticSearch的 Bulk API 做了一层缓冲,不是每次提交HBase数据都触发索引操作,而是积累到一定数量或者到达一定时间间隔才去批量操作,从而降低了RegionServer的网络I/O压力。
完整项目请参见: HBaseObserver
Observer的部署
Observer提供了两种部署方式:
- 全局部署。把jar包的路径加入HBASE_CLASSPATH并且修改hbase-site.xml,这样Observer会对每一个表都生效。
- 单表部署。通过HBase Shell修改表结构,加入coprocessor信息。
显然后一种更加灵活。通过HBase Shell安装Observer的详细步骤如下:
- 把Java项目打包为jar包,上传到HDFS的特定路径
- 进入HBase Shell,disable你希望加载的表
- 通过以下指令激活Observer:
alter 'table_name' , METHOD => ' table_att ', ' coprocessor ' => ' hdfs : ///your/jar/path/on/hdfs|com.foo.bar|1001|arg1=1,arg2=2'
coprocessor对应的格式以|分隔,依次为:
- jar包的HDFS路径
- Observer的主类
- 优先级(一般不用改)
- 参数(一般不用改)
新安装的coprocessor会自动生成名称:coprocessor + $ + 序号(通过describe table_name可查看)
因为一张表可能拥有多个coprocessor,卸载coprocessor需要输入对应的coprocessor名称,比如:
alter 'table_name' , METHOD => ' table_att_unset ', NAME => ' coprocessor $1'
需要注意的是,HBase Observer的部署有一个大坑:
修改Java代码后,上传到HDFS的jar包文件必须和之前不一样,否则就算卸载掉原有的coprocessor再重新安装也不能生效