java - Spark: Running multiple queries on multiple files, optimization -
i using spark 1.5.0.
i have set of files on s3 containing json data in sequence file format, worth around 60gb. have fire around 40 queries on dataset , store results s3.
- all queries select statements condition on same field. eg.
select a,b,c t event_type='alpha'
,select x,y,z t event_type='beta'
etc. - i using aws emr 5 node cluster 2 core nodes , 2 task nodes.
- there fields missing in input. eg.
a
missing. so, first query, selectsa
fail. avoid have defined schemas eachevent_type
. so, event_typealpha
, schema{"a": "", "b": "", c:"", event_type=""}
- based on schemas defined each event, i'm creating dataframe input rdd each event corresponding schema.
i'm using following code:
javapairrdd<longwritable,byteswritable> inputrdd = jsc.sequencefile(bucket, longwritable.class, byteswritable.class); javardd<string> events = inputrdd.map( new function<tuple2<longwritable,byteswritable>, string>() { public string call(tuple2<longwritable,byteswritable> tuple) throws jsonexception, unsupportedencodingexception { string valueasstring = new string(tuple._2.getbytes(), "utf-8"); jsonobject data = new jsonobject(valueasstring); jsonobject payload = new jsonobject(data.getstring("payload")); return payload.tostring(); } } ); events.cache(); (string event_type: events_list) { string query = //read query s3 file event_type.query string jsonschemastring = //read schema s3 file event_type.json list<string> jsonschema = arrays.aslist(jsonschemastring); javardd<string> jsonschemardd = jsc.parallelize(jsonschema); dataframe df_schema = sqlcontext.read().option("header", "true").json(jsonschemardd); structtype schema = df_schema.schema(); dataframe df_query = sqlcontext.read().schema(schema).option("header", "true").json(events); df_query.registertemptable(tablename); dataframe df_results = sqlcontext.sql(query); df_results.write().format("com.databricks.spark.csv").save("s3n://some_location); }
this code inefficient, takes around 6-8 hours run. how can optimize code?
should try using hivecontext.
i think current code taking multipe passes @ data, not sure though have cached rdd? how can in single pass if so.
Comments
Post a Comment