我正在运行一个简单的Kafka docker实例并尝试将数据插入Elasticsearch实例,但是我看到这种异常:
[2018-01-08 16:17:20,839] ERROR Failed to execute batch 36528 of 1 records after total of 6 attempt(s) (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:48)
at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:57)
at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:34)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:350)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:327)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:313)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我的连接配置如下:
{
"name": "elasticsearch-analysis",
"config": {
"tasks.max": 1,
"topics": "analysis",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topic.index.map": "analysis:analysis",
"schema.ignore": true,
"key.ignore": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema_registry:8081",
"type.name": "analysis",
"batch.size": 200,
"flush.timeout.ms": 600000,
"transforms":"insertKey,extractId",
"transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields": "Id",
"transforms.extractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractId.field":"Id"
}
}
该主题的数据不多,只有大约 70000 条唯一消息。
如您所见,我增加了刷新时间并减少了批大小,但我仍然遇到这些超时。
我无法找到可以修复它的方法。
可能是索引刷新太快(默认值为 1 秒(。尝试将其更新为频率较低的值,甚至最初将其关闭。
curl -X PUT http://$ES_HOST/$ELASTICSEARCH_INDEX_NEW/_settings
-H "Content-Type: application/json" -d '
{
"index" : {
"refresh_interval" : "15s"
}
}'