如何使用Spark大规模并行构建索引

toly_me 9年前

来自: http://my.oschina.net/u/1027043/blog/612528


使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。 

先看下,整体的拓扑图: 





然后,再来看下,使用scala写的spark程序: 

Java代码   收藏代码
</div>
  1. package com.easy.build.index  
  2.   
  3. import java.util  
  4.   
  5. import org.apache.solr.client.solrj.beans.Field  
  6. import org.apache.solr.client.solrj.impl.HttpSolrClient  
  7. import org.apache.spark.rdd.RDD  
  8. import org.apache.spark.{SparkConf, SparkContext}  
  9.   
  10. import scala.annotation.meta.field  
  11. /** 
  12.   * Created by qindongliang on 2016/1/21. 
  13.   */  
  14.   
  15. //注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下  
  16. case class Record(  
  17.                    @(Field@field )("rowkey")     rowkey:String,  
  18.                    @(Field@field )("title")  title:String,  
  19.                    @(Field@field)("content") content:String,  
  20.                    @(Field@field)("isdel") isdel:String,  
  21.                    @(Field@field)("t1") t1:String,  
  22.                    @(Field@field)("t2")t2:String,  
  23.                    @(Field@field)("t3")t3:String,  
  24.                    @(Field@field)("dtime") dtime:String  
  25.   
  26.   
  27.                  )  
  28.   
  29. /*** 
  30.   * Spark构建索引==>Solr 
  31.   */  
  32. object SparkIndex {  
  33.   
  34.   //solr客户端  
  35.   val client=new  HttpSolrClient("http://192.168.1.188:8984/solr/monitor");  
  36.   //批提交的条数  
  37.   val batchCount=10000;  
  38.   
  39.   def main2(args: Array[String]) {  
  40.   
  41.     val d1=new Record("row1","title","content","1","01","57","58","3");  
  42.     val d2=new Record("row2","title","content","1","01","57","58","45");  
  43.     val d3=new Record("row3","title","content","1","01","57","58",null);  
  44.     client.addBean(d1);  
  45.     client.addBean(d2)  
  46.     client.addBean(d3)  
  47.     client.commit();  
  48.     println("提交成功!")  
  49.   
  50.   
  51.   }  
  52.   
  53.   
  54.   /*** 
  55.     * 迭代分区数据(一个迭代器集合),然后进行处理 
  56.     * @param lines 处理每个分区的数据 
  57.     */  
  58.   def  indexPartition(lines:scala.Iterator[String] ): Unit ={  
  59.           //初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等  
  60.           val datas = new util.ArrayList[Record]()  
  61.           //迭代处理每条数据,符合条件会提交数据  
  62.           lines.foreach(line=>indexLineToModel(line,datas))  
  63.           //操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据  
  64.           commitSolr(datas,true);  
  65.   }  
  66.   
  67.   /*** 
  68.     *  提交索引数据到solr中 
  69.     * 
  70.     * @param datas 索引数据 
  71.     * @param isEnd 是否为最后一次提交 
  72.     */  
  73.   def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={  
  74.           //仅仅最后一次提交和集合长度等于批处理的数量时才提交  
  75.           if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {  
  76.             client.addBeans(datas);  
  77.             client.commit(); //提交数据  
  78.             datas.clear();//清空集合,便于重用  
  79.           }  
  80.   }  
  81.   
  82.   
  83.   /*** 
  84.     * 得到分区的数据具体每一行,并映射 
  85.     * 到Model,进行后续索引处理 
  86.     * 
  87.     * @param line 每行具体数据 
  88.     * @param datas 添加数据的集合,用于批量提交索引 
  89.     */  
  90.   def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={  
  91.     //数组数据清洗转换  
  92.     val fields=line.split("\1",-1).map(field =>etl_field(field))  
  93.     //将清洗完后的数组映射成Tuple类型  
  94.     val tuple=buildTuble(fields)  
  95.     //将Tuple转换成Bean类型  
  96.     val recoder=Record.tupled(tuple)  
  97.     //将实体类添加至集合,方便批处理提交  
  98.     datas.add(recoder);  
  99.     //提交索引到solr  
  100.     commitSolr(datas,false);  
  101.   }  
  102.   
  103.   
  104.   /*** 
  105.     * 将数组映射成Tuple集合,方便与Bean绑定 
  106.     * @param array field集合数组 
  107.     * @return tuple集合 
  108.     */  
  109.   def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={  
  110.      array match {  
  111.        case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)  
  112.      }  
  113.   }  
  114.   
  115.   
  116.   /*** 
  117.     *  对field进行加工处理 
  118.     * 空值替换为null,这样索引里面就不会索引这个字段 
  119.     * ,正常值就还是原样返回 
  120.     * 
  121.     * @param field 用来走特定规则的数据 
  122.     * @return 映射完的数据 
  123.     */  
  124.   def etl_field(field:String):String={  
  125.     field match {  
  126.       case "" => null  
  127.       case _ => field  
  128.     }  
  129.   }  
  130.   
  131.   /*** 
  132.     * 根据条件清空某一类索引数据 
  133.     * @param query 删除的查询条件 
  134.     */  
  135.   def deleteSolrByQuery(query:String): Unit ={  
  136.     client.deleteByQuery(query);  
  137.     client.commit()  
  138.     println("删除成功!")  
  139.   }  
  140.   
  141.   
  142.   def main(args: Array[String]) {  
  143.     //根据条件删除一些数据  
  144.     deleteSolrByQuery("t1:03")  
  145.     //远程提交时,需要提交打包后的jar  
  146.     val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";  
  147.     //远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统  
  148.     System.setProperty("user.name""webmaster");  
  149.     //初始化SparkConf  
  150.     val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");  
  151.     //上传运行时依赖的jar包  
  152.     val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"  
  153.     conf.setJars(seq)  
  154.     //初始化SparkContext上下文  
  155.     val sc = new SparkContext(conf);  
  156.     //此目录下所有的数据,将会被构建索引,格式一定是约定好的  
  157.     val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");  
  158.     //通过rdd构建索引  
  159.     indexRDD(rdd);  
  160.     //关闭索引资源  
  161.     client.close();  
  162.     //关闭SparkContext上下文  
  163.     sc.stop();  
  164.   
  165.   
  166.   }  
  167.   
  168.   
  169.   /*** 
  170.     * 处理rdd数据,构建索引 
  171.     * @param rdd 
  172.     */  
  173.   def indexRDD(rdd:RDD[String]): Unit ={  
  174.     //遍历分区,构建索引  
  175.     rdd.foreachPartition(line=>indexPartition(line));  
  176.   }  
  177.   
  178.   
  179.   
  180. }  
</div>

ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client )  模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建 
</div>