在 python 中的 kafka Direct 流中手动提交偏移量



我正在将一个用scala编写的流应用程序移植到python。我想手动提交 DStream 的偏移量。这是在 scala 中完成的,如下所示:

stream = KafkaUtils.createDirectStream(soomeConfigs)
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

但是我无法在 python 中找到类似的 API。您能否指导我如何使用python客户端手动提交偏移量。

我通过回到 pyspark 2.2 库来解决这个问题,因为它有 API 来获取偏移范围并在 redis 上存储偏移量。我不得不回到python 2.7,因为python 3.6中没有"长期"支持。

import redis
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, KafkaRDD

def get_offset_ranges(topic):
    ranges = None
    rk = '{topic}:offsets'.format(topic=topic)
    cache = redis.Redis()
    if cache.exists(rk):
        mapping = cache.hgetall(rk)
        ranges = dict()
        for k, v in mapping.items():
            tp = TopicAndPartition(topic, int(k))
            ranges[tp] = long(v)
    return ranges

def update_offset_ranges(offset_ranges):
    cache = redis.Redis()
    for rng in offset_ranges:
        rk = '{rng.topic}:offsets'.format(rng=rng)
        print("updating redis_key: {}, partion:{} , lastOffset: {} ".format(rk, rng.partition, rng.untilOffset))
        cache.hset(rk, rng.partition, rng.untilOffset)

def do_some_work(rdd):
    pass

def process_dstream(rdd):
    rdd.foreachPartition(lambda iter: do_some_work(iter))
    krdd = KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
    off_ranges = krdd.offsetRanges()
    for o in off_ranges:
        print(str(o))
    update_offset_ranges(off_ranges)

sc = SparkContext(appName="mytstApp")
ssc = StreamingContext(sc, 1)
kafka_params = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "myUserGroup",
    "enable.auto.commit": "false",
    "auto.offset.reset": "smallest"
}
topic = "mytopic"
offset_ranges = get_offset_ranges(topic)
dstream = KafkaUtils.createDirectStream(ssc, "mytopic", kafka_params, fromOffsets=offset_ranges)
dstream.foreachRDD(process_dstream)
# Start our streaming context and wait for it to 'finish'
ssc.start()
# Wait for the job to finish
try:
    ssc.awaitTermination()
except Exception as e:
    ssc.stop()
    raise e  # to exit with error condition

最新更新