Flink状态后端失败后无法恢复任务管理器



我是flink的新成员,我正在实现一个模式识别模块(不使用CEP实现模式匹配),它将从EventHub主题读取json流,并在模式匹配的情况下推送到另一个事件中心主题。我的模块功能如下

  1. 从eventub topic接收JSON有效负载

  2. 我正在使用一个RichSourceFunction,它将从API中读取模式并发送到广播流

  3. 我使用Flink BroadcastProcessFunction来处理广播状态下可用模式列表的数据,我不使用键流或我的程序中的任何状态,因为没有状态范围,我只需要检查JSON中是否存在某些值。

  4. 我读到flink内部保持状态,即使没有显式实现状态

  5. 下面是检查点的设置,我平均每小时接收大约100万有效载荷,有时还会更多

    env.enableCheckpointing(interval);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    StateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath, true);
    env.setStateBackend(stateBackend);
    env.getCheckpointConfig().setCheckpointInterval(12000);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
    env.getCheckpointConfig().setCheckpointTimeout(120000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().enableExternalizedCheckpoints(org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

但是当任务管理器失败时,它试图从状态后端恢复状态,我使用RocksDB作为我的状态后端,但它失败了下面的错误。我使用Flink 1.10.0版本和Java 1.8

05:39:14.260 [Source: Custom Source -> Flat Map (5/12)] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(5/12) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.io.IOException: Stream Closed
at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
at org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap(LinkedOptionalMapSerializer.java:86) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.readDefaultKryoSerializerClasses(KryoSerializerSnapshotData.java:208) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom(KryoSerializerSnapshotData.java:72) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.readSnapshot(KryoSerializerSnapshot.java:77) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
... 15 more
05:39:14.261 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (5/12) 0b418e2ffcd028a58f39029d3f8be08e.
05:39:14.261 [Source: Custom Source -> Flat Map (3/12)] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(3/12) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.io.IOException: Stream Closed
at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
at org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap(LinkedOptionalMapSerializer.java:86) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.readDefaultKryoSerializerClasses(KryoSerializerSnapshotData.java:208) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom(KryoSerializerSnapshotData.java:72) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.readSnapshot(KryoSerializerSnapshot.java:77) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
... 15 more
05:39:14.262 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (3/12) a973d1d62f5086d1126d83d81278cc0a.
05:39:14.283 [Source: Custom Source -> Flat Map (1/12)] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(1/12) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.io.IOException: Stream Closed
at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:165) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
... 15 more
05:39:14.283 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (1/12) c1a83f3812be2a4099737d6eee5b41d0.
05:39:14.441 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Sink: Cassandra Sink (1/4) caadf9ad0d011d308659cf47a3b74cc4.
05:40:36.616 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: f5741b19f3f1281ae65d67994dba045b, jobId: a0d922bbf1c20ed9417415827c32e1a3).
05:40:36.617 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: 5a92c83b6a105b726105cb0432980be6, jobId: a0d922bbf1c20ed9417415827c32e1a3).
05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: dd952690f30c88860b451b1ce4e2fc6d, jobId: a0d922bbf1c20ed9417415827c32e1a3).
05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService cisco - Remove job a0d922bbf1c20ed9417415827c32e1a3 from job leader monitoring.
05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Close JobManager connection for job a0d922bbf1c20ed9417415827c32e1a3.
05:40:36.621 [flink-akka.actor.default-dispatcher-110] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Close JobManager connection for job a0d922bbf1c20ed9417415827c32e1a3.
05:40:36.621 [flink-akka.actor.default-dispatcher-110] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService cisco - Cannot reconnect to job a0d922bbf1c20ed9417415827c32e1a3 because it is not registered.

如果我做错了什么,请帮助我解决这个问题,如果需要任何信息,请让我知道。

下面是BroadcastProcess函数和Cassandra Sink的代码,我使用它们来持久化传入信号的状态以进行审计

================================Source Function To Read Patterns from API Call=================================================
public class PatternSource extends RichSourceFunction<Map<String, Map<String, Pattern>>> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Map<String, Map<String, Pattern>>> ctx) throws Exception {
String patternUrl =
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Map<String, Map<String, Pattern>> patterns = getPatternData(patternUrl);
ctx.collect(patterns);
while (isRunning) {
Thread.sleep(10000);
}
}
@Override
public void cancel() {
isRunning = false;
}

=================================================================================================================================

====================================================BroadcastProcessFunction Class================================================
public static final MapStateDescriptor<String, Map<String, String>> patternPatternDescriptor = new MapStateDescriptor("PatternPatternDescriptor", 
BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo(String.class, Pattern.class));
public class PatternDetection extends BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>> {
public void processElement(Tuple2<String, InputSignal> InputSignal, BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>>.ReadOnlyContext ctx, Collector<Tuple2<String, InputSignal>> out) throws Exception {
InputSignal signal = (InputSignal)InputSignal.f1;
JSONObject InputSignalPayLoad = new JSONObject(signal.getSignalPayload());
HashMap<String, InputSignal> finalOutput = new HashMap();
String sourceName = ((InputSignal)InputSignal.f1).getSignalHeader().getSignalSource().toUpperCase();
Map<String, Pattern> patternList = ctx.getBroadcastState(patternPatternDescriptor).get(sourceName);

String patternName = Pattern.getPatternName();
String patternDefinition = Pattern.getPatternDefinition();
/*Implemented my custom JSON data matched*/
Matcher<?> jsonMatcher = this.buildMatcher(patternDefinition);
if (jsonMatcher != null && jsonMatcher.matches(Arrays.asList(InputSignalPayLoad))) {
ctx.output(validSignalOutput, InputSignalPayLoad);
}
} 
}

public void processBroadcastElement(Tuple2<String, Map<String, Pattern>> patternCondition, BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>>.Context ctx, Collector<Tuple2<String, InputSignal>> out) throws Exception {
String signalSource = ((String)patternCondition.f0).toUpperCase();
BroadcastState<String, Map<String, Pattern>> state = ctx.getBroadcastState(patternPatternDescriptor);
Map<String, Pattern> patterns = ctx.getBroadcastState(patternPatternDescriptor).get(signalSource);
}
}
======================================================================================================================================

====================================================Cassandra Sink====================================================================
public static void createInputSignalSink(DataStream<InputSignalSignalHistory> dataStream, Properties properties, int parallelism) {
try {
log.info(LogMessageBuilder.buildLogMessage("Inserting InputSignal signal history to cassandra database"));
CassandraSink.addSink(dataStream).setClusterBuilder(buildClusterBuilder(properties)).setMapperOptions(() -> {
return new Option[]{Option.saveNullFields(true)};
}).build().setParallelism(parallelism);
log.info(LogMessageBuilder.buildLogMessage("Cassandra sink cluster builder is ready"));
} catch (Exception exp) {
(exp.printstacktrace());
}
}
=====================================================================================================================================

我读到flink内部保持状态,即使没有显式实现状态

一些内置操作符隐式地维护状态,例如

  • 一些源和汇
  • windows
  • CEP
  • reduce, sum, max, min

从您共享的堆栈跟踪来看,似乎检查点包含无法恢复的源操作符的状态;可能是您的自定义源代码。

如果您想共享自定义源代码的代码,那么诊断问题将更容易,但我首先要看看您是否正确实现了CheckpointedFunction接口——特别是initializeState(FunctionInitializationContext context)方法。

最新更新