TimeBasedPartitioner出现Kafka Connect S3连接器OutOfMemory错误



我目前正在使用Kafka Connect S3 Sink Connector 3.3.1将Kafka消息复制到S3,并且在处理后期数据时出现OutOfMemory错误。

我知道这看起来是一个很长的问题,但我尽力让它变得清晰易懂。我非常感谢你的帮助。

高级信息

  • 连接器对Kafka消息进行简单的逐字节复制,并在字节数组的开头添加消息的长度(用于解压缩目的)。
    • 这是CustomByteArrayFormat类的角色(请参阅下面的配置)
  • 根据Record时间戳对数据进行分区和分块
    • CustomTimeBasedPartitioner扩展了io.confluent.connect.storage.partitioner.TimeBasedPartitioner,其唯一目的是覆盖generatePartitionedPath方法,将主题放在路径的末尾
  • Kafka Connect进程的总堆大小为24GB(只有一个节点)
  • 连接器每秒处理8000到10000条消息
  • 每条邮件的大小接近1 KB
  • Kafka主题有32个分区

内存不足的上下文错误

  • 这些错误只有在连接器关闭数小时并必须获取数据时才会发生
  • 当重新打开连接器时,它开始追赶,但很快就会因OutOfMemory错误而失败

可能但不完整的解释

  • 发生OOM错误时,连接器的timestamp.extractor配置设置为Record
  • 将此配置切换到Wallclock(即Kafka Connect进程的时间)不会抛出OOM错误,并且可以处理所有后期数据,但后期数据不再正确加框
    • 所有后期数据都将在连接器重新打开时的YYYY/MM/dd/HH/mm/topic-name中进行装箱
  • 因此,我的猜测是,当连接器试图根据Record时间戳正确地存储数据时,它进行了太多的并行读取,导致OOM错误
    • "partition.duration.ms": "600000"参数使连接器存储桶数据以每小时6条10分钟的路径传输(2018/06/20/12/[00|10|20|30|40|50]适用于2018-06-20下午12点)
    • 因此,对于24小时的延迟数据,连接器将不得不在24h * 6 = 144不同的S3路径中输出数据
    • 每个10分钟的文件夹包含10000条消息/秒*600秒=60000条消息(大小为6 GB)
    • 如果它确实是并行读取的,那么将有864GB的数据进入内存
  • 我认为我必须正确配置一组给定的参数,以避免OOM错误,但我觉得我看不到全局
    • "flush.size": "100000"意味着,如果读取的消息超过100000条,则应将它们提交到文件(从而释放内存)
      • 对于1KB的消息,这意味着每100MB提交一次
      • 但是,即使有144个并行读取,也只能得到14.4 GB的总容量,这比可用的24GB堆大小还小
      • "flush.size"是提交前每个分区要读取的记录数吗?或者每个连接器的任务
    • 我理解"rotate.schedule.interval.ms": "600000"配置的方式是,即使flush.size的100000条消息尚未到达,数据也将每10分钟提交一次

我的主要问题是,在给定的情况下,允许我计划内存使用的数学是什么:

  • 每秒记录数
  • 记录的大小
  • 我阅读的主题的Kafka分区数
  • 连接器任务的数量(如果相关)
  • 每小时写入的存储桶数(由于"partition.duration.ms": "600000"配置,此处为6)
  • 要处理的延迟数据的最大小时数

配置

S3接收器连接器配置

{
"name": "xxxxxxx",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "600000",
"topics.dir": "xxxxx",
"flush.size": "100000",
"schema.compatibility": "NONE",
"topics": "xxxxxx,xxxxxx",
"tasks.max": "16",
"s3.part.size": "52428800",
"timezone": "UTC",
"locale": "en",
"format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
"partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "xxxxxxxxx",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "xxxxxxx",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH/mm",
"timestamp.extractor": "Record"
}

工人配置

bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX

编辑

我忘了添加一个错误的例子:

2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我终于能够理解堆大小的使用是如何在Kafka Connect S3连接器中工作的

  • S3连接器将把每个Kafka分区的数据写入分区的paths
    • 这些paths的分区方式取决于partitioner.class参数
    • 默认情况下,它是按时间戳设置的,然后partition.duration.ms的值将确定每个分区paths的持续时间
  • S3连接器将为每个Kafka分区(用于读取的所有主题)和每个分区分配一个s3.part.size字节的缓冲区paths
    • 例如,读取20个分区,timestamp.extractor设置为Recordpartition.duration.ms设置为1h,s3.part.size设置为50MB
      • 则每小时所需的堆大小等于20 * 50 MB=1 GB
      • 但是,如果timestamp.extractor被设置为Record,则具有与读取它们的时间戳之前的时间戳相对应的时间戳的消息将被缓冲在该较早的时间缓冲器中。因此,在现实中,连接器将需要最小20 * 50 MB * 2h=2GB的内存,因为总是存在延迟事件,如果存在延迟超过1小时的事件,则需要更多
      • 请注意,如果timestamp.extractor设置为Wallclock,则情况并非如此,因为就Kafka Connect而言,实际上永远不会发生延迟事件
    • 这些缓冲区在3个条件下被刷新(即离开内存)
      • rotate.schedule.interval.ms时间已过
        • 此刷新条件总是触发
      • timestamp.extractor时间而言,rotate.interval.ms时间已经过去了
        • 这意味着,如果timestamp.extractor设置为Record,则10分钟的Record时间可以在更短或更长的时间内过去,而10分钟的实际时间
          • 例如,在处理延迟数据时,10分钟的数据将在几秒内处理,并且如果CCD_ 42被设置为10分钟,则该条件将每秒触发一次(应该如此)
          • 相反,如果事件流中存在暂停,则该条件将不会触发,直到它看到具有时间戳的事件,该时间戳显示自上次触发该条件以来已超过rotate.interval.ms
      • flush.size消息的读取时间少于min(rotate.schedule.interval.msrotate.interval.ms)
        • 对于rotate.interval.ms,如果没有足够的消息,则可能永远不会触发此条件
    • 因此,您至少需要计划Kafka partitions * s3.part.size堆大小
      • 如果使用Record时间戳进行分区,则应将其乘以max lateness in milliseconds / partition.duration.ms
        • 这是最坏的情况,在所有分区和所有范围的max lateness in milliseconds中都有不断延迟的事件
  • 当S3连接器从Kafka读取时,它还将为每个分区缓冲consumer.max.partition.fetch.bytes字节
    • 默认设置为2.1 MB
  • 最后,您不应该考虑所有的堆大小都可以用来缓冲Kafka消息,因为其中还有很多不同的对象
    • 一个安全的考虑是确保Kafka消息的缓冲不会超过总可用堆大小的50%

@raphael已经完美地解释了工作
粘贴我遇到的类似问题的一个小变化(需要处理的事件太少,但需要花费很多小时/天)。

在我的案例中,我有大约150个连接器,其中8个连接器因OOM而失败,因为它们必须处理大约7天的数据(我们的kafka在测试环境中关闭了大约2周)

遵循的步骤:

  1. 将所有连接器的s3.part.size从25MB减少到5MB。(在我们的场景中,rotate.interval设置为10分钟,flush.size设置为10000。我们的大多数活动应该很容易适应这个限制)
  2. 在此设置之后,只有一个连接器仍处于OOM,并且该连接器在启动后5秒内进入OOM(基于堆分析),堆利用率从200MB-1.5GB飙升。从卡夫卡偏移滞后来看,在所有7天内,只有8K事件需要处理。因此,这并不是因为要处理的事件太多,而是因为要处理/flush的事件太少
  3. 由于我们使用的是Hourly分区,在一个小时内几乎没有100个事件,所以这7天的所有缓冲区都是在没有刷新(没有释放到JVM)的情况下创建的-7 * 24 * 5MB * 3 partitions = 2.5GB(xmx-1.5GB)

修复:执行以下步骤之一,直到连接器赶上,然后恢复旧配置。(推荐方法-1)

  1. 更新连接器配置以处理100或1000条记录flush.size(取决于数据的结构)
    缺点:如果实际事件超过1000,则一小时内会创建过多的小文件
  2. 将Partition更改为Daily,这样就只有Daily分区了
    缺点:您现在将在S3中混合使用Hourly和Daily分区

最新更新