为什么插入查询会导致数据流管道停止处理数据库插入?



我有一个Google Cloud Dataflow管道(流),它全天处理大量插入并将它们插入Cloud SQL。我正在尝试使用现有数据填充另一个表,但是当我尝试插入到这些表中时,它会导致管道停止插入。第二次我停止执行查询时,插入继续插入而没有问题。

我已经在我的数据库上尝试了其他查询,但只有插入导致它停止处理插入。

我的插入查询是:

INSERT INTO t2 (c1, c2, firstUsed, lastUsed)
SELECT DISTINCT c3, c4, MIN(created), MAX(created) FROM t1 WHERE created 
<= '2018-12-01 23:59:59' GROUP BY c3, c4
ON DUPLICATE KEY UPDATE lastUsed = VALUES(lastUsed)

我在谷歌云上收到的例外是:

Processing stuck in step ParDo(ProcessDatabaseEvent) for at least 05m00s without outputting or completing in state process
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107)
at com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150)
at com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63)
at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41)
at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54)
at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44)
at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:557)
at com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:735)
at com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:674)
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:966)
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:1165)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:937)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1116)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1066)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1396)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdate(ClientPreparedStatement.java:1051)
at mypardo.processElement(StatsCollector.java:173)
at mypardo.invokeProcessElement(Unknown Source)
timestamp 2019-04-04T17:40:01.724Z
logger com.google.cloud.dataflow.worker.DataflowOperationContext
severity WARNING
worker myworker
step ParDo(ProcessDatabaseEvent)
thread 34

任何帮助将不胜感激!

编辑:

添加数据库架构

T1:(约 1000 万条记录)

c1          varchar(255)  NO   PRI NULL    
c2          varchar(255)  NO   PRI NULL     
c3          varchar(255)  YES  NULL    
c4          varchar(255)  YES  NULL    
c5          varchar(255)  YES  NULL    
c6          varchar(255)  YES  NULL    
created     datetime      YES  NULL

T2:

c1         varchar(255) NO   PRI NULL    
c2         varchar(255) NO   PRI NULL    
firstUsed  datetime     YES      NULL    
lastUsed   datetime     YES      NULL    

如果你使用 group by,你不需要 distinct ,你不应该在 ON DUPLICATE clase 中使用值(lastUsed),而是 t1.lastUse

INSERT INTO t2 (c1, c2, firstUsed, lastUsed)
SELECT c3, c4, MIN(created), MAX(created) 
FROM t1 
WHERE created <= '2018-12-01 23:59:59' 
GROUP BY c3, c4
ON DUPLICATE KEY UPDATE lastUsed = t1.lastUsed

相关内容

  • 没有找到相关文章

最新更新