加入2个kafka流时出现问题(使用自定义时间戳提取程序)



我在加入从我的事件字段中提取日期的2个kafka流时遇到问题。当我没有定义自定义TimeStampExtractor时,联接工作正常,但当我定义时,联接不再工作。我的拓扑结构很简单:

val builder = new StreamsBuilder()
val couponConsumedWith = Consumed.`with`(Serdes.String(),
getAvroCouponSerde(schemaRegistryHost, schemaRegistryPort))
val couponStream: KStream[String, Coupon] = builder.stream(couponInputTopic, couponConsumedWith)
val purchaseConsumedWith = Consumed.`with`(Serdes.String(),
getAvroPurchaseSerde(schemaRegistryHost, schemaRegistryPort))
val purchaseStream: KStream[String, Purchase] = builder.stream(purchaseInputTopic, purchaseConsumedWith)
val couponStreamKeyedByProductId: KStream[String, Coupon] = couponStream.selectKey(couponProductIdValueMapper)
val purchaseStreamKeyedByProductId: KStream[String, Purchase] = purchaseStream.selectKey(purchaseProductIdValueMapper)
val couponPurchaseValueJoiner = new ValueJoiner[Coupon, Purchase, Purchase]() {
@Override
def apply(coupon: Coupon, purchase: Purchase): Purchase = {
val discount = (purchase.getAmount * coupon.getDiscount) / 100
new Purchase(purchase.getTimestamp, purchase.getProductid, purchase.getProductdescription, purchase.getAmount - discount)
}
}
val fiveMinuteWindow = JoinWindows.of(TimeUnit.MINUTES.toMillis(10))
val outputStream: KStream[String, Purchase] = couponStreamKeyedByProductId.join(purchaseStreamKeyedByProductId,
couponPurchaseValueJoiner,
fiveMinuteWindow
)
outputStream.to(outputTopic)
builder.build()

正如我所说,当我不使用自定义TimeStampExtractor,而是通过设置StreamsConfig时,这段代码就像一个魅力。DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG到我的自定义提取器类(我已经仔细检查了该类是否正确提取了日期(,联接不再工作。

我正在通过运行单元测试并将以下事件传递给它来测试拓扑:

val coupon1 = new Coupon("Dec 05 2018 09:10:00.000 UTC", "1234", 10F)
// Purchase within the five minutes after the coupon - The discount should be applied
val purchase1 = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 25.00F)
val purchase1WithDiscount = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 22.50F)
val couponRecordFactory1 = couponRecordFactory.create(couponInputTopic, "c1", coupon1)
val purchaseRecordFactory1 = purchaseRecordFactory.create(purchaseInputTopic, "p1", purchase1)
testDriver.pipeInput(couponRecordFactory1)
testDriver.pipeInput(purchaseRecordFactory1)
val outputRecord1 = testDriver.readOutput(outputTopic,
new StringDeserializer(),
JoinTopologyBuilder.getAvroPurchaseSerde(
schemaRegistryHost,
schemaRegistryPort).deserializer())
OutputVerifier.compareKeyValue(outputRecord1, "1234", purchase1WithDiscount)

不确定选择新钥匙的步骤是否删除了正确的日期。我测试了很多没有运气的组合:(

任何帮助都将不胜感激!

我不确定,因为我不知道你测试了多少代码,但我的猜测是:

1( 你的代码使用默认的时间戳提取器,因为它使用你将记录发送到管道中的时间作为时间戳记录,所以基本上它会起作用,因为在你的测试中,你会一个接一个地不停顿地发送数据。

2( 您正在使用TopologyTestDriver进行测试!请注意,它对于将业务代码和拓扑作为一个单元进行测试非常有用(我有什么作为输入,什么是正确的输出(,但在这些测试中没有运行Kafka Stream应用程序。

在您的情况下,您可以在TopologyTestDriver类中使用方法advanceWallClockTime(long)来模拟系统时间行走。

如果你想启动拓扑,你必须使用嵌入式kafka集群进行集成测试(在kafka库上有一个运行良好!(。

如果有帮助,请告诉我:-(

感谢您的回复。我昨天正在处理这个问题,我想我发现了问题。正如您所说,我正在使用TopologyTestDriver来运行我的测试,当您初始化TopologyTest Driver类时,它使用initialWallClockTime,如果您不提供值,TopologyTEST Driver将获取currentTimeMillis:

public TopologyTestDriver(Topology topology, Properties config) {
this(topology, config, System.currentTimeMillis());
} 

还有另一个构造函数允许您传入initialWallClockTime。我一直在测试这种方法,但由于某种原因,它对我不起作用

总之,我的解决方案是创建具有当前时间戳的Purchase和Coupon对象。我仍然在使用我的自定义时间戳提取器,但我总是得到当前的时间戳,而不是硬编码日期,这样连接就可以很好地工作了。

对我的最终解决方案不太满意,因为我不知道为什么最初的WallClockTime对我不起作用,但至少测试现在运行得很好。

最新更新