带嵌入式RocksDBStateBackend的Flink状态处理器API



我正试图使用此示例创建一个保存点https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf

但不是

.create(new FsStateBackend("file:///tmp/checkpoints"), 256)

我需要RocksDBStateBackend。由于在1.13.1中RocksDBStateBackend被弃用,我不得不使用EmbeddedRocksDBSStateBackend。但是

.create(new EmbeddedRocksDBStateBackend(), 256)

不起作用。这就是错误:

org.apache.flink.util.FlinkException: Application failed unexpectedly.
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAndShutdownClusterAsync$0(ApplicationDispatcherBootstrap.java:170)ntat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)ntat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)ntat java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.ntat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 13 morenCaused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 11 morenCaused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
... 10 morenCaused by: java.lang.IllegalStateExceptionntat org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
tat org.apache.flink.state.api.output.OperatorSubtaskStateReducer.<init>(OperatorSubtaskStateReducer.java:50)
at org.apache.flink.state.api.BootstrapTransformation.writeOperatorState(BootstrapTransformation.java:146)
at org.apache.flink.state.api.WritableSavepoint.lambda$writeOperatorStates$0(WritableSavepoint.java:134)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:546)
at org.apache.flink.state.api.WritableSavepoint.writeOperatorStates(WritableSavepoint.java:139)
at org.apache.flink.state.api.WritableSavepoint.write(WritableSavepoint.java:99)
at CreateSavepointJob.main(CreateSavepointJob.java:157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 13 more

我做错了什么?

感谢

我刚刚修改了那个例子,使其使用EmbeddedRocksDBStateBackend,它运行良好。

你需要在你的项目中有这个依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

然而,我怀疑问题出在你没有分享的东西上。

最新更新