java - How can I update a broadcast variable in spark streaming? -
i have, believe, relatively common use case spark streaming:
i have stream of objects filter based on reference data
initially, thought simple thing achieve using broadcast variable:
public void startsparkengine { broadcast<referencedata> refdatabroadcast = sparkcontext.broadcast(getrefdata()); final javadstream<myobject> filteredstream = objectstream.filter(obj -> { final referencedata refdata = refdatabroadcast.getvalue(); return obj.getfield().equals(refdata.getfield()); } filteredstream.foreachrdd(rdd -> { rdd.foreach(obj -> { // final processing of filtered objects }); return null; }); }
however, albeit infrequently, my reference data change periodically
i under impression modify , re-broadcast variable on driver , propagated each of workers, broadcast
object not serializable
, needs final
.
what alternatives have? 3 solutions can think of are:
move reference data lookup
foreachpartition
orforeachrdd
resides entirely on workers. reference data lives beind rest api need somehow store timer / counter stop remote being accessed every element in stream.restart spark context every time refdata changes, new broadcast variable.
convert reference data rdd,
join
streams in such way streamingpair<myobject, refdata>
, though ship reference data every object.
extending answer @rohan aletty. here sample code of broadcastwrapper refresh broadcast variable based on ttl
public class broadcastwrapper { private broadcast<referencedata> broadcastvar; private date lastupdatedat = calendar.getinstance().gettime(); private static broadcastwrapper obj = new broadcastwrapper(); private broadcastwrapper(){} public static broadcastwrapper getinstance() { return obj; } public javasparkcontext getsparkcontext(sparkcontext sc) { javasparkcontext jsc = javasparkcontext.fromsparkcontext(sc); return jsc; } public broadcast<referencedata> updateandget(sparkcontext sparkcontext){ date currentdate = calendar.getinstance().gettime(); long diff = currentdate.gettime()-lastupdatedat.gettime(); if (var == null || diff > 60000) { //lets want refresh every 1 min = 60000 ms if (var != null) var.unpersist(); lastupdatedat = new date(system.currenttimemillis()); //your logic refresh referencedata data = getrefdata(); var = getsparkcontext(sparkcontext).broadcast(data); } return var; } }
your code :
public void startsparkengine() { final javadstream<myobject> filteredstream = objectstream.transform(stream -> { broadcast<referencedata> refdatabroadcast = broadcastwrapper.getinstance().updateandget(stream.context()); stream.filter(obj -> obj.getfield().equals(refdatabroadcast.getvalue().getfield())); }); filteredstream.foreachrdd(rdd -> { rdd.foreach(obj -> { // final processing of filtered objects }); return null; }); }
this worked me on multi-cluster well. hope helps
Comments
Post a Comment