为什么点燃连接器抛出IgniteCheckedException重新连接亲和节点?



我的kafka-ignite连接器突然停止,因为Ignite服务器重新启动了。当我重新启动连接器时,它会抛出与特定数据的关联节点相同的错误。它没有获取特定分区的节点详细信息。这是我通过异常堆栈跟踪所理解的。请查收附件并帮助我。

谢谢。

java.lang.IllegalStateException: Data streamer has been closed.
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.closedException(DataStreamerImpl.java:1102)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.lock(DataStreamerImpl.java:447)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:647)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:632)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:754)
at org.apache.ignite.stream.kafka.connect.IgniteSinkTask.put(IgniteSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to get affinity nodes [aff=AffinityInfo [affFunc=RendezvousAffinityFunction [parts=1024, mask=1023, exclNeighbors=false, exclNeighborsWarn=false, backupFilter=null, affinityBackupFilter=null], mapper=org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper@5c3c8bba, assignment=GridAffinityAssignment [topVer=AffinityTopologyVersion [topVer=13, minorTopVer=0], super=org.apache.ignite.internal.processors.affinity.GridAffinityAssignment@193], cacheObjCtx=org.apache.ignite.internal.processors.cache.CacheObjectContext@44a4e9d], key=UserKeyCacheObjectImpl [part=276, val=Person(name=123), hasValBytes=true]]
at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.primary(GridAffinityProcessor.java:686)
at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.affinityMap(GridAffinityProcessor.java:649)
at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.keysToNodes(GridAffinityProcessor.java:382)
at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.mapKeyToNode(GridAffinityProcessor.java:293)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.nodes(DataStreamerImpl.java:1124)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.load0(DataStreamerImpl.java:902)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.access$1500(DataStreamerImpl.java:132)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$1.run(DataStreamerImpl.java:997)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$2.call(DataStreamerImpl.java:1024)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$2.call(DataStreamerImpl.java:1012)
at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:7085)
at org.apache.ignite.internal.processors.closure.GridClosureProcessor$2.body(GridClosureProcessor.java:971)
at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
... 3 more
[ERROR] 2021-09-10 15:56:28,439 [task-thread-map_2-0] org.apache.kafka.connect.runtime.WorkerTask doRun - WorkerSinkTask{id=map_2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Data streamer has been closed.
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.closedException(DataStreamerImpl.java:1102)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.lock(DataStreamerImpl.java:447)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:647)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:632)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:754)
at org.apache.ignite.stream.kafka.connect.IgniteSinkTask.put(IgniteSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to get affinity nodes [aff=AffinityInfo [affFunc=RendezvousAffinityFunction [parts=1024, mask=1023, exclNeighbors=false, exclNeighborsWarn=false, backupFilter=null, affinityBackupFilter=null], mapper=org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper@5c3c8bba, assignment=GridAffinityAssignment [topVer=AffinityTopologyVersion [topVer=13, minorTopVer=0], super=org.apache.ignite.internal.processors.affinity.GridAffinityAssignment@193], cacheObjCtx=org.apache.ignite.internal.processors.cache.CacheObjectContext@44a4e9d], key=UserKeyCacheObjectImpl [part=276, val=Person(name=123), hasValBytes=true]]
at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.primary(GridAffinityProcessor.java:686)
at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.affinityMap(GridAffinityProcessor.java:649)
at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.keysToNodes(GridAffinityProcessor.java:382)
at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.mapKeyToNode(GridAffinityProcessor.java:293)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.nodes(DataStreamerImpl.java:1124)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.load0(DataStreamerImpl.java:902)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.access$1500(DataStreamerImpl.java:132)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$1.run(DataStreamerImpl.java:997)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$2.call(DataStreamerImpl.java:1024)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$2.call(DataStreamerImpl.java:1012)
at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:7085)
at org.apache.ignite.internal.processors.closure.GridClosureProcessor$2.body(GridClosureProcessor.java:971)
at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)

我的服务器配置是:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">

<property name="workDirectory" value="/ignite/work"/>

<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="persistenceEnabled" value="true"/>
</bean>
</property>
<property name="walPath" value="/ignite/wal"/>
<property name="walArchivePath" value="/ignite/walarchive"/>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="myCache"/>
<property name="cacheMode" value="PARTITIONED"/>
</bean>
</list>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder">
<constructor-arg>
<bean class="org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration">
<property name="namespace" value="myspacename"/>
<property name="serviceName" value="myservicename"/>
</bean>
</constructor-arg>
</bean>
</property>
</bean>
</property>
</bean>

我的连接器是一个使用XML配置文件默认配置和只发现spi的厚客户端。

另一个问题是当我的点燃pod由于内存满而重新启动时抛出此错误?我能改变它的行为,把数据放在持久性而不是内存中吗?或者什么是空闲的方式来解决这个问题,如果它重新启动由于这个问题?

提前感谢。

看起来Kafka连接器无法确定键的主节点

key=UserKeyCacheObjectImpl [part=276, val=Person(name=123), hasValBytes=true]

尝试确定与此键对应的节点:

ignite.affinity(cacheName).mapKeyToNode(key);

相关内容

  • 没有找到相关文章

最新更新