我有一个Google Cloud Dataflow管道(流),它全天处理大量插入并将它们插入Cloud SQL。我正在尝试使用现有数据填充另一个表,但是当我尝试插入到这些表中时,它会导致管道停止插入。第二次我停止执行查询时,插入继续插入而没有问题。
我已经在我的数据库上尝试了其他查询,但只有插入导致它停止处理插入。
我的插入查询是:
INSERT INTO t2 (c1, c2, firstUsed, lastUsed)
SELECT DISTINCT c3, c4, MIN(created), MAX(created) FROM t1 WHERE created
<= '2018-12-01 23:59:59' GROUP BY c3, c4
ON DUPLICATE KEY UPDATE lastUsed = VALUES(lastUsed)
我在谷歌云上收到的例外是:
Processing stuck in step ParDo(ProcessDatabaseEvent) for at least 05m00s without outputting or completing in state process
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107)
at com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150)
at com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63)
at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41)
at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54)
at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44)
at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:557)
at com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:735)
at com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:674)
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:966)
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:1165)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:937)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1116)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1066)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1396)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdate(ClientPreparedStatement.java:1051)
at mypardo.processElement(StatsCollector.java:173)
at mypardo.invokeProcessElement(Unknown Source)
timestamp 2019-04-04T17:40:01.724Z
logger com.google.cloud.dataflow.worker.DataflowOperationContext
severity WARNING
worker myworker
step ParDo(ProcessDatabaseEvent)
thread 34
任何帮助将不胜感激!
编辑:
添加数据库架构
T1:(约 1000 万条记录)
c1 varchar(255) NO PRI NULL
c2 varchar(255) NO PRI NULL
c3 varchar(255) YES NULL
c4 varchar(255) YES NULL
c5 varchar(255) YES NULL
c6 varchar(255) YES NULL
created datetime YES NULL
T2:
c1 varchar(255) NO PRI NULL
c2 varchar(255) NO PRI NULL
firstUsed datetime YES NULL
lastUsed datetime YES NULL
如果你使用 group by,你不需要 distinct ,你不应该在 ON DUPLICATE clase 中使用值(lastUsed),而是 t1.lastUse
INSERT INTO t2 (c1, c2, firstUsed, lastUsed)
SELECT c3, c4, MIN(created), MAX(created)
FROM t1
WHERE created <= '2018-12-01 23:59:59'
GROUP BY c3, c4
ON DUPLICATE KEY UPDATE lastUsed = t1.lastUsed