flink: Flink Shell throws NullPointerException -
- i using flink interactive shell execute wordcount. works file size of 10mb. 100mb file shell throws nullpointerexception:
:
java.lang.nullpointerexception @ org.apache.flink.api.common.accumulators.serializedlistaccumulator.deserializelist(serializedlistaccumulator.java:93) @ org.apache.flink.api.scala.dataset.collect(dataset.scala:549) @ .<init>(<console>:22) @ .<clinit>(<console>) @ .<init>(<console>:7) @ .<clinit>(<console>) @ $print(<console>) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ scala.tools.nsc.interpreter.imain$readevalprint.call(imain.scala:734) @ scala.tools.nsc.interpreter.imain$request.loadandrun(imain.scala:983) @ scala.tools.nsc.interpreter.imain.loadandrunreq$1(imain.scala:573) @ scala.tools.nsc.interpreter.imain.interpret(imain.scala:604) @ scala.tools.nsc.interpreter.imain.interpret(imain.scala:568) @ scala.tools.nsc.interpreter.iloop.reallyinterpret$1(iloop.scala:760) @ scala.tools.nsc.interpreter.iloop.interpretstartingwith(iloop.scala:805) @ scala.tools.nsc.interpreter.iloop.command(iloop.scala:717) @ scala.tools.nsc.interpreter.iloop.processline$1(iloop.scala:581) @ scala.tools.nsc.interpreter.iloop.innerloop$1(iloop.scala:588) @ scala.tools.nsc.interpreter.iloop.loop(iloop.scala:591) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1$$anonfun$apply$mcv$sp$2.apply(iloop.scala:601) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1$$anonfun$apply$mcv$sp$2.apply(iloop.scala:598) @ scala.reflect.io.streamable$chars$class.applyreader(streamable.scala:104) @ scala.reflect.io.file.applyreader(file.scala:82) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop.savingreplaystack(iloop.scala:130) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1.apply(iloop.scala:597) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1.apply(iloop.scala:597) @ scala.tools.nsc.interpreter.iloop.savingreader(iloop.scala:135) @ scala.tools.nsc.interpreter.iloop.interpretallfrom(iloop.scala:596) @ scala.tools.nsc.interpreter.iloop$$anonfun$loadcommand$1.apply(iloop.scala:660) @ scala.tools.nsc.interpreter.iloop$$anonfun$loadcommand$1.apply(iloop.scala:659) @ scala.tools.nsc.interpreter.iloop.withfile(iloop.scala:653) @ scala.tools.nsc.interpreter.iloop.loadcommand(iloop.scala:659) @ scala.tools.nsc.interpreter.iloop$$anonfun$standardcommands$7.apply(iloop.scala:262) @ scala.tools.nsc.interpreter.iloop$$anonfun$standardcommands$7.apply(iloop.scala:262) @ scala.tools.nsc.interpreter.loopcommands$linecmd.apply(loopcommands.scala:81) @ scala.tools.nsc.interpreter.iloop.command(iloop.scala:712) @ scala.tools.nsc.interpreter.iloop.processline$1(iloop.scala:581) @ scala.tools.nsc.interpreter.iloop.innerloop$1(iloop.scala:588) @ scala.tools.nsc.interpreter.iloop.loop(iloop.scala:591) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply$mcz$sp(iloop.scala:882) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply(iloop.scala:837) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply(iloop.scala:837) @ scala.tools.nsc.util.scalaclassloader$.savingcontextloader(scalaclassloader.scala:135) @ scala.tools.nsc.interpreter.iloop.process(iloop.scala:837) @ org.apache.flink.api.scala.flinkshell$.startshell(flinkshell.scala:84) @ org.apache.flink.api.scala.flinkshell$.main(flinkshell.scala:54) @ org.apache.flink.api.scala.flinkshell.main(flinkshell.scala)
i work on linux system (16mb ram). problem there?
my code (adapted https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html) :
var filename = new string(<myfilename>) var text = env.readtextfile(filename) var counts = text.flatmap { _.tolowercase.split("\\w+") }.map { (_, 1) }.groupby(0).sum(1) var result = counts.collect()
- i noticed, flink executes program on 1 core. after setting parallelism env.getconfig.setparallelism(4) , running program again exception occurred:
part 1:
org.apache.flink.client.program.programinvocationexception: program execution failed: job execution failed. @ org.apache.flink.client.program.client.run(client.java:413) @ org.apache.flink.client.program.client.run(client.java:356) @ org.apache.flink.client.program.client.run(client.java:349) @ org.apache.flink.client.remoteexecutor.executeplanwithjars(remoteexecutor.java:89) @ org.apache.flink.client.remoteexecutor.executeplan(remoteexecutor.java:82) @ org.apache.flink.api.java.scalashellremoteenvironment.execute(scalashellremoteenvironment.java:68) @ org.apache.flink.api.java.executionenvironment.execute(executionenvironment.java:789) @ org.apache.flink.api.scala.executionenvironment.execute(executionenvironment.scala:576) @ org.apache.flink.api.scala.dataset.collect(dataset.scala:544) @ .<init>(<console>:28) @ .<clinit>(<console>) @ .<init>(<console>:7) @ .<clinit>(<console>) @ $print(<console>) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ scala.tools.nsc.interpreter.imain$readevalprint.call(imain.scala:734) @ scala.tools.nsc.interpreter.imain$request.loadandrun(imain.scala:983) @ scala.tools.nsc.interpreter.imain.loadandrunreq$1(imain.scala:573) @ scala.tools.nsc.interpreter.imain.interpret(imain.scala:604) @ scala.tools.nsc.interpreter.imain.interpret(imain.scala:568) @ scala.tools.nsc.interpreter.iloop.reallyinterpret$1(iloop.scala:760) @ scala.tools.nsc.interpreter.iloop.interpretstartingwith(iloop.scala:805) @ scala.tools.nsc.interpreter.iloop.command(iloop.scala:717) @ scala.tools.nsc.interpreter.iloop.processline$1(iloop.scala:581) @ scala.tools.nsc.interpreter.iloop.innerloop$1(iloop.scala:588) @ scala.tools.nsc.interpreter.iloop.loop(iloop.scala:591) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1$$anonfun$apply$mcv$sp$2.apply(iloop.scala:601) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1$$anonfun$apply$mcv$sp$2.apply(iloop.scala:598) @ scala.reflect.io.streamable$chars$class.applyreader(streamable.scala:104) @ scala.reflect.io.file.applyreader(file.scala:82) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop.savingreplaystack(iloop.scala:130) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1.apply(iloop.scala:597) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1.apply(iloop.scala:597) @ scala.tools.nsc.interpreter.iloop.savingreader(iloop.scala:135) @ scala.tools.nsc.interpreter.iloop.interpretallfrom(iloop.scala:596) @ scala.tools.nsc.interpreter.iloop$$anonfun$loadcommand$1.apply(iloop.scala:660) @ scala.tools.nsc.interpreter.iloop$$anonfun$loadcommand$1.apply(iloop.scala:659) @ scala.tools.nsc.interpreter.iloop.withfile(iloop.scala:653) @ scala.tools.nsc.interpreter.iloop.loadcommand(iloop.scala:659) @ scala.tools.nsc.interpreter.iloop$$anonfun$standardcommands$7.apply(iloop.scala:262) @ scala.tools.nsc.interpreter.iloop$$anonfun$standardcommands$7.apply(iloop.scala:262) @ scala.tools.nsc.interpreter.loopcommands$linecmd.apply(loopcommands.scala:81) @ scala.tools.nsc.interpreter.iloop.command(iloop.scala:712) @ scala.tools.nsc.interpreter.iloop.processline$1(iloop.scala:581) @ scala.tools.nsc.interpreter.iloop.innerloop$1(iloop.scala:588) @ scala.tools.nsc.interpreter.iloop.loop(iloop.scala:591) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply$mcz$sp(iloop.scala:882) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply(iloop.scala:837) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply(iloop.scala:837) @ scala.tools.nsc.util.scalaclassloader$.savingcontextloader(scalaclassloader.scala:135) @ scala.tools.nsc.interpreter.iloop.process(iloop.scala:837) @ org.apache.flink.api.scala.flinkshell$.startshell(flinkshell.scala:84) @ org.apache.flink.api.scala.flinkshell$.main(flinkshell.scala:54) @ org.apache.flink.api.scala.flinkshell.main(flinkshell.scala)
part 2:
caused by: org.apache.flink.runtime.client.jobexecutionexception: job execution failed. @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$receivewithlogmessages$1.applyorelse(jobmanager.scala:314) @ scala.runtime.abstractpartialfunction$mcvl$sp.apply$mcvl$sp(abstractpartialfunction.scala:33) @ scala.runtime.abstractpartialfunction$mcvl$sp.apply(abstractpartialfunction.scala:33) @ scala.runtime.abstractpartialfunction$mcvl$sp.apply(abstractpartialfunction.scala:25) @ org.apache.flink.runtime.actorlogmessages$$anon$1.apply(actorlogmessages.scala:43) @ org.apache.flink.runtime.actorlogmessages$$anon$1.apply(actorlogmessages.scala:29) @ scala.partialfunction$class.applyorelse(partialfunction.scala:118) @ org.apache.flink.runtime.actorlogmessages$$anon$1.applyorelse(actorlogmessages.scala:29) @ akka.actor.actor$class.aroundreceive(actor.scala:465) @ org.apache.flink.runtime.jobmanager.jobmanager.aroundreceive(jobmanager.scala:92) @ akka.actor.actorcell.receivemessage(actorcell.scala:516) @ akka.actor.actorcell.invoke(actorcell.scala:487) @ akka.dispatch.mailbox.processmailbox(mailbox.scala:254) @ akka.dispatch.mailbox.run(mailbox.scala:221) @ akka.dispatch.mailbox.exec(mailbox.scala:231) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: org.apache.flink.runtime.jobmanager.scheduler.noresourceavailableexception: not enough free slots available run job. can decrease operator parallelism or increase number of slots per taskmanager in configuration. task schedule: < attempt #0 (chain datasource (at .<init>(<console>:26) (org.apache.flink.api.java.io.textinputformat)) -> flatmap (flatmap @ .<init>(<console>:27)) -> map (map @ .<init>(<console>:27)) -> combine(sum(1)) (2/4)) @ (unassigned) - [scheduled] > groupid < fc507fbb50fea681c726ca1d824c7577 > in sharing group < slotsharinggroup [fc507fbb50fea681c726ca1d824c7577, fb90f780c9d5a4a9dbf983cb06bec946, 52b8abe5a21ed808f0473a599d89f046] >. resources available scheduler: number of instances=1, total number of slots=1, available slots=0 @ org.apache.flink.runtime.jobmanager.scheduler.scheduler.scheduletask(scheduler.java:250) @ org.apache.flink.runtime.jobmanager.scheduler.scheduler.scheduleimmediately(scheduler.java:126) @ org.apache.flink.runtime.executiongraph.execution.scheduleforexecution(execution.java:271) @ org.apache.flink.runtime.executiongraph.executionvertex.scheduleforexecution(executionvertex.java:430) @ org.apache.flink.runtime.executiongraph.executionjobvertex.scheduleall(executionjobvertex.java:307) @ org.apache.flink.runtime.executiongraph.executiongraph.scheduleforexecution(executiongraph.java:508) @ org.apache.flink.runtime.jobmanager.jobmanager.org$apache$flink$runtime$jobmanager$jobmanager$$submitjob(jobmanager.scala:606) @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$receivewithlogmessages$1.applyorelse(jobmanager.scala:190) ... 18 more
does mean taskmanager.numberoftaskslots? in flink-conf.yaml key set 4. how can set in shell?
you asked 2 questions:
- why
print()
not work bigdataset
s?
when use count()
, collect()
, or print()
on dataset
, data has been partitioned on task managers has transferred through job manager client. best, use these methods testing or materialize small dataset
s. large data, please use 1 of sinks provided in apache flink, e.g. writeastextfile(..)
. each parallel task, 1 output file created then.
if still want transfer data client, may increasing framesize of akka. akka message-passing library flink uses under hood. so, set akka.framesize
in flink-conf.yaml
. default 10485760 bytes (10 mb). akka.framesize: 100mb
increase 100 mb.
for apache flink 1.0 committers have considered remove limit , there pull request use means of transportation large materialized datasets.
- what task slots , how relate parallelism?
the default configuration of flink starts 1 task slot per task manager. when start scala shell in local mode, starts 1 task manager. total number of task slots one. when change parallelism n
, need @ least n
task slots execute operation in parallel. either increase number of task slots in flink-conf.yaml
or start additional task managers. if run locally, advise increase number of task slots. more information see flink documentation on http://flink.apache.org.
edit: if run scala-shell, embedded flink cluster started 1 task manager. can start local cluster using ./bin/start-local.sh
, connect using scala shell's host , port parameters (host: localhost, port: 6123).
Comments
Post a Comment