借hbase-rdd二次開發(fā)談如何在Spark Core之上擴(kuò)建自己的模塊
我是51CTO學(xué)院講師張敏,在51CTO學(xué)院 “4.20 IT充電節(jié)”(4月19~20日) 到來之際,和大家分享一下Spark Core之上擴(kuò)建自己的模塊的經(jīng)驗(yàn)。正文來啦~~~
hbase-rdd是一個(gè)構(gòu)建在SparkContext基礎(chǔ)之上的用于對Hbase進(jìn)行增刪改查的第三方開源模塊,目前***版本為0.7.1。目前該rdd在操作hbase時(shí),默認(rèn)調(diào)用隱式方法。
- implicitdef stringToBytes(s: String): Array[Byte] = {
- Bytes.toBytes(s)
- }
將RDD的key轉(zhuǎn)換成字節(jié)b,然后調(diào)用Hbase的put(b)方法保存rowkey,之后將RDD的每一行存入hbase。
在軌跡圖繪制項(xiàng)目數(shù)據(jù)計(jì)算中,我們考慮到hbase的rowkey的設(shè)計(jì)——盡量減少rowkey存儲的開銷。雖然hbase-rdd最終的rowkey默認(rèn)都是采用字節(jié)數(shù)組,但這個(gè)地方我們希望按自己的方式組裝rowkey。使用MD5(imei)+dateTime組成的字節(jié)數(shù)組作為rowkey。因此默認(rèn)的hbase-rdd提供的方法是不滿足我們存儲需求的,需要對源代碼進(jìn)行修改。在toHbase方法中,有一個(gè)convert方法,該方法將對RDD中的每一行數(shù)據(jù)進(jìn)行轉(zhuǎn)化,使用RDD中的key生成Put(Bytes.toBytes(key))對象,該對象為之后存儲Hbase提供rowkey。
在convert函數(shù)中,對其實(shí)現(xiàn)進(jìn)行了改造,hbase-rdd默認(rèn)使用stringToBytes隱式函數(shù)將RDD的String類型的key轉(zhuǎn)換成字節(jié)數(shù)組,這里我們需要改造,不使stringToBytes隱式方法,而是直接生成字節(jié)數(shù)據(jù)。
- protected def convert(id: String, values: Map[String, Map[String, A]], put: PutAdder[A]) = {
- val strs = id.split(",")
- val imei = strs {0}
- val dateTime = strs {1}
- val b1 = MD5Utils.computeMD5Hash(imei.getBytes())
- val b2 = Bytes.toBytes(dateTime.toLong)
- val key = b1.++(b2)
- val p = new Put(key)//改造
- var empty = true
- for {
- (family, content) <- values
- (key, value) <- content
- } {
- empty = false
- if (StrUtils.isNotEmpty(family) &&StrUtils.isNotEmpty(key)) {
- put(p, family, key, value)
- }
- }
- if (empty) None else Some(new ImmutableBytesWritable, p)
- }
這樣就實(shí)現(xiàn)了使用自己的方式構(gòu)建rowkey,當(dāng)然基于此思想我們可以使用任意的方式構(gòu)建rowkey。
在使用hbase-rdd插件的過程中,我在思考,默認(rèn)的RDD上是沒有toHbase方法的,那為什么引入hbase-rdd包之后,RDD之上就有toHbase方法了?經(jīng)過查看源碼,發(fā)現(xiàn)hbase-rdd包中提供了兩個(gè)隱式方法:
- implicitdef toHBaseRDDSimple[A](rdd: RDD[(String, Map[String, A])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[A] =new HBaseWriteRDDSimple(rdd, pa[A])
- implicit def toHBaseRDDSimpleTS[A](rdd: RDD[(String, Map[String, (A, Long)])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[(A, Long)] =new HBaseWriteRDDSimple(rdd, pa[A])
這兩個(gè)方法在發(fā)現(xiàn)RDD上沒有toHbase方法時(shí)會自動嘗試調(diào)用,從隱式定義中嘗試找到解決方案,嘗試之后發(fā)現(xiàn)有定義toHBaseRDDSimple隱式方法,于是調(diào)用該隱式方法新建HBaseWriteRDDSimple類,返回hBaseWriteRDDSimple,而在hBaseWriteRDDSimple對象中是有toHbase方法的,因此在引入hbase-rdd之后,可以發(fā)現(xiàn)原本沒有toHbase方法的RDD上有toHbase方法了。這一切都要?dú)w功于Scala強(qiáng)大的隱式轉(zhuǎn)換功能。
那明白了原理,是否我們可以基于RDD寫自己的模塊,說干就干!
***步:新建Trait
- traitHaha{
- implicitdef gaga[A](rdd: RDD[String]): Hehe=
- newHehe(rdd)
- }
第二步:新建Hehe類
- final class Hehe(rdd:RDD[String]) {
- def wow(tableName:String,family:String): Unit ={
- println("---------------------------------------------")
- println("tableName:"+tableName+" - family:"+family)
- println("size:"+rdd.count())
- rdd.collect().foreach(data=>println(data))
- println("---------------------------------------------")
- }
- }
第三步:新建包對象
- package object test extends Haha
第四步:新建test類
- object Test{
- def main(args: Array[String]) {
- valsparkConf = new SparkConf().setAppName("Test")
- valsc = new SparkContext(sparkConf)
- sc.makeRDD(Seq("one","two","three","four")).wow("taskDataPre","T")
- }
- }
項(xiàng)目結(jié)構(gòu)圖:
運(yùn)行效果圖:
希望對大家以后的開發(fā)有幫助,同時(shí)借鑒本案例,在Spark Core之上構(gòu)建自己的小模塊。
51CTO學(xué)院 4.20 IT充電節(jié)
(19-20號兩天,100門視頻課程免單搶,更有視頻課程會員享6折,非會員享7折,套餐折上8折,微職位立減2000元鉅惠)
活動鏈接:http://edu.51cto.com/activity/lists/id-47.html?wenzhang
相關(guān)視頻教程:
【大數(shù)據(jù) Spark2.x 流數(shù)據(jù)處理】精通Spark流數(shù)據(jù)處理(持續(xù)完畢)