我正在尝试实现类似于本教程的内容。但是,它之所以起作用,是因为数据集很小。我该如何做较大的桌子?因为我不断出现错误的记忆错误。我的日志是
ka.connect.runtime.rest.RestServer:60)
[2018-04-04 17:16:17,937] INFO [Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator ip-172-31-14-140.ec2.internal:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[2018-04-04 17:16:17,938] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:218)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,939] ERROR Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | connect-sink-redshift': (org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread:51)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,940] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2018-04-04 17:16:17,940] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2018-04-04 17:16:17,940] ERROR WorkerSinkTask{id=sink-redshift-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,940] ERROR WorkerSinkTask{id=sink-redshift-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-04-04 17:16:17,940] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:96)
[2018-04-04 17:16:17,941] INFO WorkerSourceTask{id=production-db-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-04-04 17:16:17,940] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-statuses,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,946] INFO WorkerSourceTask{id=production-db-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
[2018-04-04 17:16:17,954] ERROR WorkerSourceTask{id=production-db-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,960] ERROR WorkerSourceTask{id=production-db-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-04-04 17:16:17,960] INFO [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:341)
[2018-04-04 17:16:17,960] INFO Stopped ServerConnector@64f4bfe4{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2018-04-04 17:16:17,967] INFO Stopped o.e.j.s.ServletContextHandler@2f06a90b{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
我还尝试通过此处的建议增加内存,但我无法将整个表加载到内存中。有没有办法限制产生的数据数?
对于JDBC连接器,您可能应用的最重要的属性就是您所要求的。
batch.max.rows
在对新数据进行轮询时,最大的行数包括在单批中。该设置可用于限制数据量 内部在连接器中进行缓冲。
没有必要"将整个桌子缓冲到内存中",较小的批次,更频繁的民意调查和提交,您可以确保几乎所有行都会被扫描,并且您不会有大量的风险批处理失败,然后连接器停止一段时间,然后在下一个民意调查中重新启动并缺少几行。
否则,请确保您没有执行散装表模式,因为它将尝试一次又一次地扫描整个表。
query
选项也可以在表上进行列投影。
您可以在文档中找到更多配置选项,但是任何OOM错误都需要通过逐案仔细检查,通过启用JMX监视和将这些值导出到某些聚合系统中,您可以像Prometheus一样更仔细地监视您而不是仅仅看到OOM错误,而不知道更改任何特定参数是否真的有帮助。
另一个选项是使用基于CDC的连接器,例如另一个博客文章Show