输出Kafka流输入到控制台



我一直在为我正在开发的一个java应用程序查看大量Kafka文档。我已经尝试过Java 8中引入的lambda语法,但我对这方面的知识还不太了解,而且我对它是否应该是我目前所使用的还不太有信心。

我有一个Kafka/Zookeeper服务运行得很顺利,我想做的是写一个小的例子程序,根据输入将它写出来,但不做单词计数,因为已经有很多这样的例子了。

对于样本数据,我将得到以下结构的字符串:

<标题> 示例数据
This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.
<标题>

我希望能够提取3个字母的关键字,并打印它们与System.out.println。我如何得到一个包含输入的字符串变量?我知道如何应用正则表达式,甚至只是通过搜索字符串来获得关键字。

<标题> 代码
public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    final Serde<String> stringSerde = Serdes.String();
    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();
    //How do I assign the input from in-stream to the following variable?
    String variable = ?
}

我有zookeeper, kafka,生产者和消费者运行都连接到同一个主题,所以我想基本上看到相同的String出现在所有的实例(生产者,消费者和流)。

如果你使用Kafka Streams,你需要在你的数据流上应用函数/操作符。在您的示例中,您创建了一个KStream对象,因此,您希望对source应用操作符。

根据你想做的事情,有操作符可以独立地对流中的每个记录应用函数(例如:map()),或其他将一个函数应用于多个记录的运算符(例如:aggregateByKey())。您应该查看一下文档:http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl和示例https://github.com/confluentinc/kafka-streams-examples

因此,你永远不会像上面的例子中那样使用Kafka流创建局部变量,而是将所有内容嵌入到操作符/函数中,并链接在一起。

例如,如果您想将所有输入记录打印到标准输出,您可以执行

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
    void apply(String key, String value) {
        System.out.println(key + ": " + value);
    }
 });

因此,在通过streams.start()启动应用程序之后,它将使用来自输入主题的记录,并且对于您的主题的每个记录,将调用apply(...),它将在stdout上打印记录。

当然,将流打印到控制台的更本机的方法是使用source.print()(其内部基本上与所示的foreach()操作符与已经给定的ForeachAction相同)

对于将字符串分配给局部变量的示例,您需要将代码放入apply(...)并在那里执行regex-stuff等以"提取3个字母的关键字"。

然而,表达这一点的最佳方式是通过flatMapValues()print()的组合(即source.flatMapValues(...).print())。对于每个输入记录调用flatMapValues()(在您的情况下,我假设key将是null,因此您可以忽略它)。在flatMapValue函数中,应用正则表达式,对于每个匹配项,将匹配项添加到最终返回的值列表中。
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        ArrayList<String> keywords = new ArrayList<String>();
        // apply regex to value and for each match add it to keywords
        return keywords;
    }
}

flatMapValues的输出将再次成为KStream,包含每个找到的关键字的记录(即,输出流是ValueMapper#apply()中返回的所有列表的"联合")。最后,您只需通过print()将结果打印到控制台。(当然,您也可以使用单个foreach而不是flatMapValue + print,但这将不那么模块化。)

相关内容

  • 没有找到相关文章

最新更新