我正在AWS EC2实例上运行Kafka Cluster Docker Compose。我想接收特定关键词的所有推文,并将它们推送到Kafka。这很好。但我也想数一下这些推文中使用次数最多的单词。
这是WordCount代码:import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.StreamsBuilder;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.concurrent.CountDownLatch;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
public class WordCount {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> textLines = builder
.stream("test-topic");
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to("test-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
Properties props = new Properties();
props.put(APPLICATION_ID_CONFIG, "streams-word-count");
props.put(BOOTSTRAP_SERVERS_CONFIG, "ec2-ip:9092");
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(
new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
当我检查Control Center中的输出主题时,它看起来像这样:
关键
价值看起来它可以将tweet拆分为单个单词。但是计数值不是Long格式的,尽管它在代码中指定了。
当我使用kafka-console-consumer从这个主题中消费时,它说:
" LongDeserializer接收的数据大小不是8">
默认情况下,控制中心UI和控制台消费者只能呈现UTF8数据。
您需要显式地将LongDeserializer传递给控制台使用者,因为值deserializer仅
试试用KTable代替:
KStream<String, String> textLines = builder.stream("test-topic", Consumed.with(stringSerde, stringSerde));
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.groupBy((key, value) -> value)
.count()
.toStream()
.to("test-output", Produced.with(Serdes.String(), Serdes.Long()));