我在使用flink将数据插入Apachi-hudi表时遇到了一个错误



环境:
Flink:1.15.2
Hudi Flink:Hudi-flink1.15-bundle-0.12.0.jar

当我执行语句时:

Flink SQL> CREATE TABLE t1(
>   uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
>   name VARCHAR(10),
>   age INT,
>   ts TIMESTAMP(3),
>   `partition` VARCHAR(20)
> )
> PARTITIONED BY (`partition`)
> WITH (
>   'connector' = 'hudi',
>   'path' = 's3a://flink-hudi/t1',
>   'table.type' = 'MERGE_ON_READ'
> );
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 77f98b6de7db94a52206b2a4f7961141

hudi表t1已创建,但仅包含元数据diretory.houdie.

出现错误:

org.apache.flink.runtime.JobException:恢复被NoRestartBackoffTimeStrategy抑制网址:org.apache.flink.runtime.executiongraph.f故障切换.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)网址:org.apache.flink.runtime.executiongraph.f故障切换.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)网址:org.apache.flink.runtimescheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)网址:org.apache.flink.runtimescheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)网址:org.apache.flink.runtimescheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)网址:org.apache.flink.runtimescheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)位于org.apache.flink.runtime.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)网址:org.apache.flink.runtime.jobmaster.jobmaster.updateTaskExecutionState(jobmaster.java:443)在sun.reflect.GeneratedMethodAccessor61.invoke(未知源)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)位于java.lang.reflect.Method.ioke(Method.java:498)网址:org.apache.flink.runtime.rpc.akka.AkkaRpcActorLambda$handleRpcInvocation$1(AkkaRpcActor.java:304)网址:org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)网址:org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)网址:org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)网址:org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkka RpcActor.java:78)网址:org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)在akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)在akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)在scala。PartialFunction.applyOrElse(PartialFunction.scala:123)在scala。PartialFunction.applyOrElse$(PartialFunction.scala:122)在akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)在scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)在scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)在scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)在akka.aactor.actor.aroundRecive(actor.scala:537)在akka.aactor.actor.aroundRecive$(actor.scala:535)在akka.actor.AbstractActor.aroundRecive(AbstractActor.scala:220)在akka.aactor.ActorCell.rereceiveMessage(ActorCell.scala:580)在akka.aactor.ActorCell.ioke(ActorCell.scala:548)位于akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)在akka.dispatch.Mailbox.run(Mailbox.scala:231)位于akka.dispatch.Mailbox.exec(Mailbox.scala:243)位于java.util.concurrent.FukJoinTask.doExec(ForkJoinTask.java:289)位于java.util.concurrent.FukJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)位于java.util.concurrent.FukJoinPool.runWorker(ForkJoinPool.java:1692)位于java.util.concurrent.FForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)由:org.apache.hudi.exception.HoodieException引起:扫描路径s3a://flink hudi/t1/.houdie/.aux/ckp_meta下的检查点元文件时发生异常网址:org.apache.hudi.sink.meta.CkpMetadata.load(CkpMetadata.java:169)网址:org.apache.hudi.sink.meta.CkpMetadata.lastPendingInstant(CkpMetadata.java:175)网址:org.apache.hudi.ssink.common.AbstractStreamWriteFunction.lastPendingInstant(AbstractStreamWriteFunction.java:243)网址:org.apache.hudi.ssink.common.AbstractStreamWriteFunction.initializeState(AbstractStreamWriteFunction.java:151)网址:org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)网址:org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)网址:org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState网址:org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)网址:org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)网址:org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorsChain.java:106)网址:org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)网址:org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutiator$1.call(StreamTaskActionExecutor.java:55)网址:org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)网址:org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)网址:org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)网址:org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)网址:org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)网址:org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)在java.lang.Thread.run(线程.java:748)由:java.io.FileNotFoundException引起:没有这样的文件或目录:s3a://flink hudi/t1/.woodie/.aux/ckp_meta网址:org.apache.hoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2344)网址:org.apache.hoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2226)网址:org.apache.hoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2160)网址:org.apache.hoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1961)网址:org.apache.hoop.fs.s3a.S3AFileSystem.lambda$listStatus$9(S3AFileSystem.java:1940)网址:org.apache.hadop.fs.s3a.Invoker.one(Invoker.java:109)网址:org.apache.hoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1940)网址:org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$15(HoodieWrapper FileSystem.java:365)网址:org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapper FileSystem.java:106)网址:org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapper FileSystem.java:364)网址:org.apache.hudi.sink.meta.CkpMetadata.scanCkpMetadata(CkpMetadata.java:216)网址:org.apache.hudi.sink.meta.CkpMetadata.load(CkpMetadata.java:167)…再增加18个

期望能成功地将数据插入到hudi表中。

看起来Flink作业正试图从状态恢复,但Hudi遇到了由No such file or directory: s3a://flink-hudi/t1/.hoodie/.aux/ckp_meta引起的错误。由于您在启动时确认bucket是干净的,所以我能提供的最好建议是使用s3p:///文件系统,这迫使Flink使用Presto实现(而不是Hadoop版本)。请参阅关于为什么使用Hadoop文件系统会导致";找不到文件";情况

最新更新