java - How can I update a broadcast variable in spark streaming? -


i have, believe, relatively common use case spark streaming:

i have stream of objects filter based on reference data

initially, thought simple thing achieve using broadcast variable:

public void startsparkengine {     broadcast<referencedata> refdatabroadcast       = sparkcontext.broadcast(getrefdata());      final javadstream<myobject> filteredstream = objectstream.filter(obj -> {         final referencedata refdata = refdatabroadcast.getvalue();         return obj.getfield().equals(refdata.getfield());     }      filteredstream.foreachrdd(rdd -> {         rdd.foreach(obj -> {             // final processing of filtered objects         });         return null;     }); } 

however, albeit infrequently, my reference data change periodically

i under impression modify , re-broadcast variable on driver , propagated each of workers, broadcast object not serializable , needs final.

what alternatives have? 3 solutions can think of are:

  1. move reference data lookup foreachpartition or foreachrdd resides entirely on workers. reference data lives beind rest api need somehow store timer / counter stop remote being accessed every element in stream.

  2. restart spark context every time refdata changes, new broadcast variable.

  3. convert reference data rdd, join streams in such way streaming pair<myobject, refdata>, though ship reference data every object.

extending answer @rohan aletty. here sample code of broadcastwrapper refresh broadcast variable based on ttl

public class broadcastwrapper {      private broadcast<referencedata> broadcastvar;     private date lastupdatedat = calendar.getinstance().gettime();      private static broadcastwrapper obj = new broadcastwrapper();      private broadcastwrapper(){}      public static broadcastwrapper getinstance() {         return obj;     }      public javasparkcontext getsparkcontext(sparkcontext sc) {        javasparkcontext jsc = javasparkcontext.fromsparkcontext(sc);        return jsc;     }      public broadcast<referencedata> updateandget(sparkcontext sparkcontext){         date currentdate = calendar.getinstance().gettime();         long diff = currentdate.gettime()-lastupdatedat.gettime();         if (var == null || diff > 60000) { //lets want refresh every 1 min = 60000 ms             if (var != null)                var.unpersist();             lastupdatedat = new date(system.currenttimemillis());              //your logic refresh             referencedata data = getrefdata();              var = getsparkcontext(sparkcontext).broadcast(data);        }        return var;    } } 

your code :

public void startsparkengine() {      final javadstream<myobject> filteredstream = objectstream.transform(stream -> {         broadcast<referencedata> refdatabroadcast = broadcastwrapper.getinstance().updateandget(stream.context());          stream.filter(obj -> obj.getfield().equals(refdatabroadcast.getvalue().getfield()));     });      filteredstream.foreachrdd(rdd -> {         rdd.foreach(obj -> {         // final processing of filtered objects         });         return null;     }); } 

this worked me on multi-cluster well. hope helps


Comments

Popular posts from this blog

javascript - Chart.js (Radar Chart) different scaleLineColor for each scaleLine -

apache - Error with PHP mail(): Multiple or malformed newlines found in additional_header -

java - Android – MapFragment overlay button shadow, just like MyLocation button -