博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
自定义Spark Partitioner提升es-hadoop Bulk效率
阅读量:7028 次
发布时间:2019-06-28

本文共 5333 字,大约阅读时间需要 17 分钟。

http://www.jianshu.com/p/cccc56e39429/comments/2022782 和 https://github.com/elastic/elasticsearch-hadoop/issues/745 都有提到通过自定义Spark Partitioner提升es-hadoop Bulk效率,但是无可运行代码,自己针对其思路在spark-shell里实现了一份。

思路:

spark streming监控/tmp/data下的新文件,并将文中每行内容存储到ES的web/blog索引里!

注意:代码里使用了doc ID来定制路由,该id为自动生成的uuid!因此在启动ES后,需要:

curl -s -XPUT localhost:9200/web -d '{    "mappings": {        "blog": {            "_id": {                "path": "uuid"            },            "properties": {                "title": {                    "type":   "string",                    "index":  "analyzed"                }            }        }    }}'

告诉ES使用blog document中的uuid字段作为_id。ES 2.0以后见 http://stackoverflow.com/questions/32334709/how-to-set-id-in-elasticsearch-2-0

下面是spark-shell代码:

import org.apache.spark._import org.apache.spark.streaming._import org.elasticsearch.spark._import org.apache.spark.Partitionerimport org.elasticsearch.hadoop.cfg.PropertiesSettingsimport org.elasticsearch.spark.cfg.SparkSettingsManagerimport org.elasticsearch.hadoop.cfg.Settingsimport org.elasticsearch.hadoop.rest.RestRepositoryimport scala.collection.JavaConversions._// 为方便测试,下面是自己用scala实现的es hash函数// 尤其注意:在生产环境下,使用ES jar包里的函数,位置为:// https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/cluster/routing/Murmur3HashFunction.javaobject Murmur3HashFunction {  def hash(routing: String): Int = {    val bytesToHash = Array.ofDim[Byte](routing.length * 2)    for (i <- 0 until routing.length) {      val c = routing.charAt(i)      val b1 = c.toByte      val b2 = (c >>> 8).toByte      assert(((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c)      bytesToHash(i * 2) = b1      bytesToHash(i * 2 + 1) = b2    }    hash(bytesToHash, 0, bytesToHash.length)  }  def hash(bytes: Array[Byte], offset: Int, length: Int): Int = {    murmurhash3_x86_32(bytes, offset, length, 0)  }  def murmurhash3_x86_32(data: Array[Byte],       offset: Int,       len: Int,       seed: Int): Int = {    val c1 = 0xcc9e2d51    val c2 = 0x1b873593    var h1 = seed    val roundedEnd = offset + (len & 0xfffffffc)    var i = offset    while (i < roundedEnd) {      var k1 = (data(i) & 0xff) | ((data(i + 1) & 0xff) << 8) | ((data(i + 2) & 0xff) << 16) |         (data(i + 3) << 24)      k1 *= c1      k1 = (k1 << 15) | (k1 >>> 17)      k1 *= c2      h1 ^= k1      h1 = (h1 << 13) | (h1 >>> 19)      h1 = h1 * 5 + 0xe6546b64      i += 4    }    var k1 = 0    len & 0x03 match {      case 3 => k1 = (data(roundedEnd + 2) & 0xff) << 16      case 2 => k1 |= (data(roundedEnd + 1) & 0xff) << 8      case 1 =>         k1 |= (data(roundedEnd) & 0xff)        k1 *= c1        k1 = (k1 << 15) | (k1 >>> 17)        k1 *= c2        h1 ^= k1      case _ => //break    }    h1 ^= len    h1 ^= h1 >>> 16    h1 *= 0x85ebca6b    h1 ^= h1 >>> 13    h1 *= 0xc2b2ae35    h1 ^= h1 >>> 16    h1  }}// 自定义Partitionerclass ESShardPartitioner(settings: String) extends Partitioner {      protected var _numPartitions = -1            override def numPartitions: Int = {           val newSettings = new PropertiesSettings().load(settings)        // 生产环境下,需要自行设置索引的 index/type,我是以web/blog作为实验的index        newSettings.setResourceRead("web/blog") // ******************** !!! modify it !!! ********************         newSettings.setResourceWrite("web/blog") // ******************** !!! modify it !!! ********************         val repository = new RestRepository(newSettings)        val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly())        repository.close()        _numPartitions = targetShards.size()        _numPartitions      }                   override def getPartition(docID: Any): Int = {        var shardId = Murmur3HashFunction.hash(docID.toString()) % _numPartitions;        if (shardId < 0) {            shardId += _numPartitions;        }        shardId      }}sc.getConf.setMaster("local").setAppName("RDDTest").set("es.nodes", "127.0.0.1").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("es.index.auto.create", "true");val ssc = new StreamingContext(sc, Seconds(2));val fileStream = ssc.textFileStream("/tmp/data");fileStream.foreachRDD { rdd => {    def makeItem(content: String) : (String, Map[String,String]) = {        val uuid = java.util.UUID.randomUUID.toString();        (uuid, Map("content"->content, "uuid"->uuid))         }    println("********************start*************************");    var r2 = rdd.map(makeItem);    val sparkCfg = new SparkSettingsManager().load(rdd.sparkContext.getConf)    val settings = sparkCfg.save();    var r3 = r2.partitionBy(new ESShardPartitioner(settings));        r3.map(x=>x._2).saveToEs("web/blog")    println("data count: " + rdd.count.toString);    println("*********************end************************");}};ssc.start();ssc.awaitTermination();

运行方法:

 ./spark-shell --jars ../lib/elasticsearch-spark-1.2_2.10-2.1.2.jar

然后在spark shell里运行上述代码。

通过shell 伪造数据:

mkdir /mmp/data#rm -rf  /tmp/ ata"rm -f "/tmp/data/*"for ((j=0;j<30;j++)); do        {        for ((i=0;i<20;i++)); do        file_name=`python -c 'import random;print random.random()'`        echo "$j $i is sad story." >"/tmp/data/$file_name.log"        done        sleep 1        }doneecho "OK, waiting..."echo "done"

运行上述脚本,看到spark shell里显示:

见http://www.cnblogs.com/bonelee/p/6078956.html ES路由底层实现!

你可能感兴趣的文章
第六章——函数(inout参数与变异方法)
查看>>
掘金翻译计划月报 — 2018 年 2 月
查看>>
Android属性动画
查看>>
渐进式Express源码学习5-全副武装
查看>>
JVM难学?那是因为你没认真看完这篇文章
查看>>
python面试题(五)
查看>>
老司机 iOS 周报 #40 | 2018-10-22
查看>>
VirtualView iOS 模板加载功能实现详解
查看>>
这可能是最好的性能优化教程(二)
查看>>
被马化腾点赞的微信车票设计,背后有哪些故事?
查看>>
Spring理论基础-面向切面编程
查看>>
BloomFilter 原理,实现及优化
查看>>
PHP本地文件包含漏洞环境搭建与利用
查看>>
OGNL设计及使用不当造成的远程代码执行漏洞
查看>>
Vue-cli + express 构建的SPA Blog(采用前后端分离方案)
查看>>
ios中的多播委托
查看>>
Java基础-单例模式
查看>>
轻仿QQ音乐之音频歌词播放、锁屏歌词
查看>>
MongoDB 4.0 RC 版本强势登陆
查看>>
AliOS Things网络适配框架 - SAL
查看>>