我正在研究使用Flink >(版本-1.4.2 (的项目,用于批量数据摄入到我的图形数据库( Janusgraph (。数据摄入有两个阶段,一个是顶点数据摄入,另一个是向图DB的边缘数据摄入。顶点数据摄入无需任何问题即可运行,但是在Edge Intestion 我遇到了一个错误,说丢失了与任务管理器任务Manageragername 的连接。flink-taskmanager-b6f46f6c8-fgtlw
中的详细错误追溯在下面附有:
2019-08-01 18:13:26,025 ERROR org.apache.flink.runtime.operators.BatchTask
- Error in task code: CHAIN Join(Remap EDGES id: TO) -> Map (Key Extractor) -> Combine (Deduplicate edges including bi-directional edges) (62/80)
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager 'flink-taskmanager-b6f46f6c8-gcxnm/10.xx.xx.xx:6121'.
This indicates that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:146)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:835)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:87)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:162)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
... 6 more
为了简化理解:
flink-taskmanager-b6f46f6c8-gcxnm
作为TM1和
flink-taskmanager-b6f46f6c8-fgtlw
as tm2
在调试时,我能够发现从TM2请求ResultPartition (RPP)
的 TM1,TM2开始将结果分配发送到TM1 。但是,在从 tm1 检查日志时,我们发现它等待了很长时间才能从TM2 获得RP,但是一段时间后>一段时间后,它开始消除所接受的任务。我们认为,在Netty远程传输异常之后,请退出任务导致TM2为特定作业发送Lost Taskmanager
错误。这两个 TaskManager都在单独的EC2实例(M4.2xlarge(中运行。我已经验证了两个实例的CPU和内存利用率,并且能够在limition 中看到所有指标。
您能告诉我为什么任务管理员会这样奇怪,也可以解决这个问题。
预先感谢
在上图中,基于信用的流量控制机制实际上位于" Netty Server"(和" Netty Client"(组件中,而录音撰写者正在编写的缓冲区始终将其添加到空置状态下的结果子分区中然后逐渐填充(序列化(记录。但是Netty什么时候真正获得缓冲区?显然,每当它们可用时都不能采用字节,因为这不仅会由于交叉通信和同步而增加大量成本,而且还会使整个缓冲过时。
在Flink中,有三种情况使Netty Server可以消费缓冲区:
在编写记录时,缓冲区会变得完整,或者缓冲区超时命中,或发送了一个特殊事件,例如检查点障碍。
齐平后齐平后
记录作者与当前记录的本地序列化缓冲区一起工作,并将逐渐将这些字节写入位于适当结果子分区队列的一个或多个网络缓冲区。尽管唱片作者可以在多个小区上工作,但每个小区只有一个记录作者编写数据。另一方面,Netty服务器是从多个结果子分区读取的,并如上所述将相应的服务器读取为单个通道。这是一种经典的生产者消费者模式,中间有网络缓冲区,如下图所示。(1(序列化和(2(将数据写入缓冲区之后,录音师相应地更新了缓冲区的作者索引。一旦完全填充缓冲区,记录作者将(3(从其本地缓冲池中获取新的缓冲区,以使用当前记录的任何剩余字节(或下一个记录(获取新的缓冲区,并将新的缓冲区添加到该小区队列中。这将(4(通知Netty服务器,如果数据尚不知道4。每当Netty具有处理此通知的能力时,它将(5(乘缓冲区并沿着适当的TCP通道发送。
图像1
如果队列中有更多的完成缓冲区,我们可以假设它已经获得通知。
缓冲超时后冲洗
为了支持低延迟用例,我们不仅要依靠缓冲区已满来向下游发送数据。在某些情况下,某个通信渠道没有太多的记录流通,并且不必要地增加了您实际拥有的少数记录的延迟。因此,定期过程将刷新堆栈中的任何可用数据:输出冲洗器。周期间隔可以通过StreamExecutionEnvironment#setBufferTimeout进行配置,并充当延迟5(对于低通量通道(上的上限。下图显示了它如何与其他组件进行交互:录制器序列化并将其写入网络缓冲区,但同时,输出冲洗器可能会(3,4(通知Netty的数据,如果Netty尚未知道(类似(到上面的"缓冲区完整"方案(。当Netty处理此通知(5(时,它将消耗缓冲区中的可用数据并更新缓冲区的读取器索引。缓冲区停留在队列中 - Netty服务器端的此缓冲区的任何进一步操作将继续从读者索引下次读取。
图像2
参考:
下面的链接可能会帮助您。
flink-network-stack-details
您可以检查 tm1> tm1 和 tm2 /strong>查看是否有完整的GC可能导致热速度超时。