java - Spark: Running multiple queries on multiple files, optimization -


i using spark 1.5.0.

i have set of files on s3 containing json data in sequence file format, worth around 60gb. have fire around 40 queries on dataset , store results s3.

  1. all queries select statements condition on same field. eg. select a,b,c t event_type='alpha', select x,y,z t event_type='beta' etc.
  2. i using aws emr 5 node cluster 2 core nodes , 2 task nodes.
  3. there fields missing in input. eg. a missing. so, first query, selects a fail. avoid have defined schemas each event_type. so, event_type alpha, schema {"a": "", "b": "", c:"", event_type=""}
  4. based on schemas defined each event, i'm creating dataframe input rdd each event corresponding schema.

i'm using following code:

javapairrdd<longwritable,byteswritable> inputrdd = jsc.sequencefile(bucket, longwritable.class, byteswritable.class); javardd<string> events = inputrdd.map(     new function<tuple2<longwritable,byteswritable>, string>() {         public string call(tuple2<longwritable,byteswritable> tuple) throws jsonexception, unsupportedencodingexception {             string valueasstring = new string(tuple._2.getbytes(), "utf-8");             jsonobject data = new jsonobject(valueasstring);             jsonobject payload = new jsonobject(data.getstring("payload"));             return payload.tostring();         }     } ); events.cache();  (string event_type: events_list) {     string query = //read query s3 file event_type.query      string jsonschemastring = //read schema s3 file event_type.json     list<string> jsonschema = arrays.aslist(jsonschemastring);     javardd<string> jsonschemardd = jsc.parallelize(jsonschema);     dataframe df_schema = sqlcontext.read().option("header", "true").json(jsonschemardd);     structtype schema = df_schema.schema();      dataframe df_query = sqlcontext.read().schema(schema).option("header", "true").json(events);     df_query.registertemptable(tablename);     dataframe df_results = sqlcontext.sql(query);      df_results.write().format("com.databricks.spark.csv").save("s3n://some_location); } 

this code inefficient, takes around 6-8 hours run. how can optimize code?

  • should try using hivecontext.

  • i think current code taking multipe passes @ data, not sure though have cached rdd? how can in single pass if so.


Comments

Popular posts from this blog

javascript - Chart.js (Radar Chart) different scaleLineColor for each scaleLine -

apache - Error with PHP mail(): Multiple or malformed newlines found in additional_header -

java - Android – MapFragment overlay button shadow, just like MyLocation button -