scala - Using Custom Hadoop input format for processing binary file in Spark -
i have developed hadoop based solution process binary file. uses classic hadoop mr technique. binary file 10gb , divided 73 hdfs blocks, , business logic written map process operates on each of these 73 blocks. have developed custominputformat , customrecordreader in hadoop returns key (intwritable) , value (byteswritable) map function. value nothing contents of hdfs block(bianry data). business logic knows how read data.
now, port code in spark. starter in spark , run simple examples (wordcount, pi example) in spark. however, not straightforward example process binaryfiles in spark. see there 2 solutions use case. in first, avoid using custom input format , record reader. find method (approach) in spark creates rdd hdfs blocks, use map method feeds hdfs block content business logic. if not possible, re-use custom input format , custom reader using methods such hadoopapi, hadooprdd etc. problem:- not know whether first approach possible or not. if possible, can please provide pointers contains examples? trying second approach highly unsuccessful. here code snippet used
package org { object driver { def myfunc(key : intwritable, content : byteswritable):int = { println(key.get()) println(content.getsize()) return 1 } def main(args: array[string]) { // create spark context val conf = new sparkconf().setappname("dummy").setmaster("spark://<host>:7077") val sc = new sparkcontext(conf) println(sc) val rd = sc.newapihadoopfile("hdfs:///user/hadoop/mybin.dat", classof[randomaccessinputformat], classof[intwritable], classof[byteswritable]) val count = rd.map (x => myfunc(x._1, x._2)).reduce(_+_) println("the count *****************************"+count) } }
}
please note print statement in main method prints 73 number of blocks whereas print statements inside map function prints 0.
can tell doing wrong here? think not using api right way failed find documentation/usage examples.
a couple of problems @ glance. define myfunc
call func
. myfunc
has no return type, can't call collect()
. if myfunc
doesn't have return value, can foreach
instead of map
.
collect()
pulls data in rdd driver allow stuff locally (on driver).
Comments
Post a Comment