flink: Flink Shell throws NullPointerException -


  1. i using flink interactive shell execute wordcount. works file size of 10mb. 100mb file shell throws nullpointerexception:

:

java.lang.nullpointerexception     @ org.apache.flink.api.common.accumulators.serializedlistaccumulator.deserializelist(serializedlistaccumulator.java:93)     @ org.apache.flink.api.scala.dataset.collect(dataset.scala:549)     @ .<init>(<console>:22)  @ .<clinit>(<console>) @ .<init>(<console>:7) @ .<clinit>(<console>) @ $print(<console>) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ scala.tools.nsc.interpreter.imain$readevalprint.call(imain.scala:734) @ scala.tools.nsc.interpreter.imain$request.loadandrun(imain.scala:983) @ scala.tools.nsc.interpreter.imain.loadandrunreq$1(imain.scala:573) @ scala.tools.nsc.interpreter.imain.interpret(imain.scala:604) @ scala.tools.nsc.interpreter.imain.interpret(imain.scala:568) @ scala.tools.nsc.interpreter.iloop.reallyinterpret$1(iloop.scala:760) @ scala.tools.nsc.interpreter.iloop.interpretstartingwith(iloop.scala:805) @ scala.tools.nsc.interpreter.iloop.command(iloop.scala:717) @ scala.tools.nsc.interpreter.iloop.processline$1(iloop.scala:581) @ scala.tools.nsc.interpreter.iloop.innerloop$1(iloop.scala:588) @ scala.tools.nsc.interpreter.iloop.loop(iloop.scala:591) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1$$anonfun$apply$mcv$sp$2.apply(iloop.scala:601) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1$$anonfun$apply$mcv$sp$2.apply(iloop.scala:598) @ scala.reflect.io.streamable$chars$class.applyreader(streamable.scala:104) @ scala.reflect.io.file.applyreader(file.scala:82) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply(iloop.scala:598) @ scala.tools.nsc.interpreter.iloop.savingreplaystack(iloop.scala:130) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1.apply(iloop.scala:597) @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1.apply(iloop.scala:597) @ scala.tools.nsc.interpreter.iloop.savingreader(iloop.scala:135) @ scala.tools.nsc.interpreter.iloop.interpretallfrom(iloop.scala:596) @ scala.tools.nsc.interpreter.iloop$$anonfun$loadcommand$1.apply(iloop.scala:660) @ scala.tools.nsc.interpreter.iloop$$anonfun$loadcommand$1.apply(iloop.scala:659) @ scala.tools.nsc.interpreter.iloop.withfile(iloop.scala:653) @ scala.tools.nsc.interpreter.iloop.loadcommand(iloop.scala:659) @ scala.tools.nsc.interpreter.iloop$$anonfun$standardcommands$7.apply(iloop.scala:262) @ scala.tools.nsc.interpreter.iloop$$anonfun$standardcommands$7.apply(iloop.scala:262) @ scala.tools.nsc.interpreter.loopcommands$linecmd.apply(loopcommands.scala:81) @ scala.tools.nsc.interpreter.iloop.command(iloop.scala:712) @ scala.tools.nsc.interpreter.iloop.processline$1(iloop.scala:581) @ scala.tools.nsc.interpreter.iloop.innerloop$1(iloop.scala:588) @ scala.tools.nsc.interpreter.iloop.loop(iloop.scala:591) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply$mcz$sp(iloop.scala:882) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply(iloop.scala:837) @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply(iloop.scala:837) @ scala.tools.nsc.util.scalaclassloader$.savingcontextloader(scalaclassloader.scala:135) @ scala.tools.nsc.interpreter.iloop.process(iloop.scala:837) @ org.apache.flink.api.scala.flinkshell$.startshell(flinkshell.scala:84) @ org.apache.flink.api.scala.flinkshell$.main(flinkshell.scala:54) @ org.apache.flink.api.scala.flinkshell.main(flinkshell.scala) 

i work on linux system (16mb ram). problem there?

my code (adapted https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html) :

 var filename = new string(<myfilename>)  var text = env.readtextfile(filename)  var counts = text.flatmap { _.tolowercase.split("\\w+") }.map { (_, 1)   }.groupby(0).sum(1)  var result = counts.collect() 
  1. i noticed, flink executes program on 1 core. after setting parallelism env.getconfig.setparallelism(4) , running program again exception occurred:

part 1:

    org.apache.flink.client.program.programinvocationexception: program execution failed: job execution failed.     @ org.apache.flink.client.program.client.run(client.java:413)     @ org.apache.flink.client.program.client.run(client.java:356)     @ org.apache.flink.client.program.client.run(client.java:349)     @ org.apache.flink.client.remoteexecutor.executeplanwithjars(remoteexecutor.java:89)     @ org.apache.flink.client.remoteexecutor.executeplan(remoteexecutor.java:82)     @ org.apache.flink.api.java.scalashellremoteenvironment.execute(scalashellremoteenvironment.java:68)     @ org.apache.flink.api.java.executionenvironment.execute(executionenvironment.java:789)     @ org.apache.flink.api.scala.executionenvironment.execute(executionenvironment.scala:576)     @ org.apache.flink.api.scala.dataset.collect(dataset.scala:544)     @ .<init>(<console>:28)     @ .<clinit>(<console>)     @ .<init>(<console>:7)     @ .<clinit>(<console>)     @ $print(<console>)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:606)     @ scala.tools.nsc.interpreter.imain$readevalprint.call(imain.scala:734)     @ scala.tools.nsc.interpreter.imain$request.loadandrun(imain.scala:983)     @ scala.tools.nsc.interpreter.imain.loadandrunreq$1(imain.scala:573)     @ scala.tools.nsc.interpreter.imain.interpret(imain.scala:604)     @ scala.tools.nsc.interpreter.imain.interpret(imain.scala:568)     @ scala.tools.nsc.interpreter.iloop.reallyinterpret$1(iloop.scala:760)     @ scala.tools.nsc.interpreter.iloop.interpretstartingwith(iloop.scala:805)     @ scala.tools.nsc.interpreter.iloop.command(iloop.scala:717)     @ scala.tools.nsc.interpreter.iloop.processline$1(iloop.scala:581)     @ scala.tools.nsc.interpreter.iloop.innerloop$1(iloop.scala:588)     @ scala.tools.nsc.interpreter.iloop.loop(iloop.scala:591)     @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1$$anonfun$apply$mcv$sp$2.apply(iloop.scala:601)     @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1$$anonfun$apply$mcv$sp$2.apply(iloop.scala:598)     @ scala.reflect.io.streamable$chars$class.applyreader(streamable.scala:104)     @ scala.reflect.io.file.applyreader(file.scala:82)     @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(iloop.scala:598)     @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply(iloop.scala:598)     @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1$$anonfun$apply$mcv$sp$1.apply(iloop.scala:598)     @ scala.tools.nsc.interpreter.iloop.savingreplaystack(iloop.scala:130)     @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1.apply(iloop.scala:597)     @ scala.tools.nsc.interpreter.iloop$$anonfun$interpretallfrom$1.apply(iloop.scala:597)     @ scala.tools.nsc.interpreter.iloop.savingreader(iloop.scala:135)     @ scala.tools.nsc.interpreter.iloop.interpretallfrom(iloop.scala:596)     @ scala.tools.nsc.interpreter.iloop$$anonfun$loadcommand$1.apply(iloop.scala:660)     @ scala.tools.nsc.interpreter.iloop$$anonfun$loadcommand$1.apply(iloop.scala:659)     @ scala.tools.nsc.interpreter.iloop.withfile(iloop.scala:653)     @ scala.tools.nsc.interpreter.iloop.loadcommand(iloop.scala:659)     @ scala.tools.nsc.interpreter.iloop$$anonfun$standardcommands$7.apply(iloop.scala:262)     @ scala.tools.nsc.interpreter.iloop$$anonfun$standardcommands$7.apply(iloop.scala:262)     @ scala.tools.nsc.interpreter.loopcommands$linecmd.apply(loopcommands.scala:81)     @ scala.tools.nsc.interpreter.iloop.command(iloop.scala:712)     @ scala.tools.nsc.interpreter.iloop.processline$1(iloop.scala:581)     @ scala.tools.nsc.interpreter.iloop.innerloop$1(iloop.scala:588)     @ scala.tools.nsc.interpreter.iloop.loop(iloop.scala:591)     @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply$mcz$sp(iloop.scala:882)     @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply(iloop.scala:837)     @ scala.tools.nsc.interpreter.iloop$$anonfun$process$1.apply(iloop.scala:837)     @ scala.tools.nsc.util.scalaclassloader$.savingcontextloader(scalaclassloader.scala:135)     @ scala.tools.nsc.interpreter.iloop.process(iloop.scala:837)     @ org.apache.flink.api.scala.flinkshell$.startshell(flinkshell.scala:84)     @ org.apache.flink.api.scala.flinkshell$.main(flinkshell.scala:54)     @ org.apache.flink.api.scala.flinkshell.main(flinkshell.scala) 

part 2:

caused by: org.apache.flink.runtime.client.jobexecutionexception: job execution failed.     @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$receivewithlogmessages$1.applyorelse(jobmanager.scala:314)     @ scala.runtime.abstractpartialfunction$mcvl$sp.apply$mcvl$sp(abstractpartialfunction.scala:33)     @ scala.runtime.abstractpartialfunction$mcvl$sp.apply(abstractpartialfunction.scala:33)     @ scala.runtime.abstractpartialfunction$mcvl$sp.apply(abstractpartialfunction.scala:25)     @ org.apache.flink.runtime.actorlogmessages$$anon$1.apply(actorlogmessages.scala:43)     @ org.apache.flink.runtime.actorlogmessages$$anon$1.apply(actorlogmessages.scala:29)     @ scala.partialfunction$class.applyorelse(partialfunction.scala:118)     @ org.apache.flink.runtime.actorlogmessages$$anon$1.applyorelse(actorlogmessages.scala:29)     @ akka.actor.actor$class.aroundreceive(actor.scala:465)     @ org.apache.flink.runtime.jobmanager.jobmanager.aroundreceive(jobmanager.scala:92)     @ akka.actor.actorcell.receivemessage(actorcell.scala:516)     @ akka.actor.actorcell.invoke(actorcell.scala:487)     @ akka.dispatch.mailbox.processmailbox(mailbox.scala:254)     @ akka.dispatch.mailbox.run(mailbox.scala:221)     @ akka.dispatch.mailbox.exec(mailbox.scala:231)     @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)     @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)     @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)     @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: org.apache.flink.runtime.jobmanager.scheduler.noresourceavailableexception: not enough free slots available run job. can decrease operator parallelism or increase number of slots per taskmanager in configuration. task schedule: < attempt #0 (chain datasource (at .<init>(<console>:26) (org.apache.flink.api.java.io.textinputformat)) -> flatmap (flatmap @ .<init>(<console>:27)) -> map (map @ .<init>(<console>:27)) -> combine(sum(1)) (2/4)) @ (unassigned) - [scheduled] > groupid < fc507fbb50fea681c726ca1d824c7577 > in sharing group < slotsharinggroup [fc507fbb50fea681c726ca1d824c7577, fb90f780c9d5a4a9dbf983cb06bec946, 52b8abe5a21ed808f0473a599d89f046] >. resources available scheduler: number of instances=1, total number of slots=1, available slots=0     @ org.apache.flink.runtime.jobmanager.scheduler.scheduler.scheduletask(scheduler.java:250)     @ org.apache.flink.runtime.jobmanager.scheduler.scheduler.scheduleimmediately(scheduler.java:126)     @ org.apache.flink.runtime.executiongraph.execution.scheduleforexecution(execution.java:271)     @ org.apache.flink.runtime.executiongraph.executionvertex.scheduleforexecution(executionvertex.java:430)     @ org.apache.flink.runtime.executiongraph.executionjobvertex.scheduleall(executionjobvertex.java:307)     @ org.apache.flink.runtime.executiongraph.executiongraph.scheduleforexecution(executiongraph.java:508)     @ org.apache.flink.runtime.jobmanager.jobmanager.org$apache$flink$runtime$jobmanager$jobmanager$$submitjob(jobmanager.scala:606)     @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$receivewithlogmessages$1.applyorelse(jobmanager.scala:190)     ... 18 more 

does mean taskmanager.numberoftaskslots? in flink-conf.yaml key set 4. how can set in shell?

you asked 2 questions:

  1. why print() not work big datasets?

when use count(), collect(), or print() on dataset, data has been partitioned on task managers has transferred through job manager client. best, use these methods testing or materialize small datasets. large data, please use 1 of sinks provided in apache flink, e.g. writeastextfile(..). each parallel task, 1 output file created then.

if still want transfer data client, may increasing framesize of akka. akka message-passing library flink uses under hood. so, set akka.framesize in flink-conf.yaml. default 10485760 bytes (10 mb). akka.framesize: 100mb increase 100 mb.

for apache flink 1.0 committers have considered remove limit , there pull request use means of transportation large materialized datasets.

  1. what task slots , how relate parallelism?

the default configuration of flink starts 1 task slot per task manager. when start scala shell in local mode, starts 1 task manager. total number of task slots one. when change parallelism n, need @ least n task slots execute operation in parallel. either increase number of task slots in flink-conf.yaml or start additional task managers. if run locally, advise increase number of task slots. more information see flink documentation on http://flink.apache.org.

edit: if run scala-shell, embedded flink cluster started 1 task manager. can start local cluster using ./bin/start-local.sh , connect using scala shell's host , port parameters (host: localhost, port: 6123).


Comments

Popular posts from this blog

javascript - Chart.js (Radar Chart) different scaleLineColor for each scaleLine -

apache - Error with PHP mail(): Multiple or malformed newlines found in additional_header -

java - Android – MapFragment overlay button shadow, just like MyLocation button -