KAFKA流 - 根据流数据发送不同的主题



我有一个Kafka流应用程序,等待在主题user_activity上发布记录。它将收到JSON数据,并取决于对密钥的值,我想将该流将其推入不同的主题。

这是我的流应用程序代码:

KStream<String, String> source_user_activity = builder.stream("user_activity");
        source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                System.out.println("value: " +  value);
                ArrayList<String> keywords = new ArrayList<String>();
                try {
                    JSONObject send = new JSONObject();
                    JSONObject received = new JSONObject(value);
                    send.put("current_date", getCurrentDate().toString());
                    send.put("activity_time", received.get("CreationTime"));
                    send.put("user_id", received.get("UserId"));
                    send.put("operation_type", received.get("Operation"));
                    send.put("app_name", received.get("Workload"));
                    keywords.add(send.toString());
                    // apply regex to value and for each match add it to keywords
                } catch (Exception e) {
                    // TODO: handle exception
                    System.err.println("Unable to convert to json");
                    e.printStackTrace();
                }
                return keywords;
            }
        }).to("user_activity_by_date");

在此代码中,我想检查操作类型,然后根据我想将流将流推入相关主题的不同。

我该如何实现?

编辑:

我已更新了代码:

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch( 
      (key, value) -> (value.contains("Operation":"SharingSet") && value.contains("ItemType":"File")),
      (key, value) -> (value.contains("Operation":"AddedToSecureLink") && value.contains("ItemType":"File")),
      (key, value) -> true
     );
branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");

您可以使用branch方法来拆分流。此方法采用将源流分为几个流的谓词。

以下代码取自Kafka-streams-escamples:

KStream<String, OrderValue>[] forks = ordersWithTotals.branch(
    (id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,
    (id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);
forks[0].mapValues(
    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL))
    .to(ORDER_VALIDATIONS.name(), Produced
        .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));
forks[1].mapValues(
    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS))
    .to(ORDER_VALIDATIONS.name(), Produced
  .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));

原始KStream.branch方法由于混合阵列和仿制药而引起不便,并且因为它迫使人们使用'魔法数'从结果中提取正确的分支(请参阅例如Kafka-5488问题(。从Spring-Kafka 2.2.4开始,可以使用Kafkastreambrancher类。有了它,可以更方便分支:

        
new KafkaStreamBrancher<String, String>()
    .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
    .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
    .defaultBranch(ks->ks.to("C"))
    .onTopOf(builder.stream("source"))
    //onTopOf returns the provided stream so we can continue with method chaining 
    //and do something more with the original stream

也有KIP-418,因此A在Kafka本身中也有可能在其他版本中改善分支。

另一种可能性是使用A topic -nameExtractor

动态路由事件。

https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing

您需要提前创建主题,

val outputTopic: TopicNameExtractor[String, String] = (_, value: String, _) => defineOutputTopic(value)
builder
  .stream[String, String](inputTopic)
  .to(outputTopic)

defineOutputtopic 可以返回一个定义的主题之一,给定值(或该问题的键或记录上下文(。PD:对不起Scala代码,在链接中有一个Java示例。

相关内容

  • 没有找到相关文章

最新更新