我有一个非常简单的Java/Spring应用程序来演示KStream功能,但不幸的是,我无法使KStream加载数据。想法是创建一个KStream对象,并使用控制器GET方法简单地检索其内容。样本代码:
@RestController
@RequestMapping("/resources/")
public class StreamController {
private KafkaStreams streams;
private KStream<String, ResourceMessage> resourceStream;
StreamController() {
// configure streams/consumer
Properties props = new Properties();
// make sure stream starts from the beginning
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.STATE_DIR_CONFIG, Path.of(System.getProperty("java.io.tmpdir")).toAbsolutePath().toString());
//create POJO serdes
StreamsBuilder builder = new StreamsBuilder();
Map<String, Object> serdeProps = new HashMap<>();
Serializer<ResourceMessage> resourceSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", ResourceMessage.class);
resourceSerializer.configure(serdeProps, false);
Deserializer<ResourceMessage> resourceDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", Resource.class);
resourceDeserializer.configure(serdeProps, false);
Serde<ResourceMessage> resourceSerde = Serdes.serdeFrom(resourceSerializer, resourceDeserializer);
// create KStream with POJO serdes for value
resourceStream = builder.stream("Resources", Consumed.with(Serdes.String(), resourceSerde));
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
// GET method that enumerates KStream and returns contents
@GetMapping(value = "/resource")
public List<Resource> getResources() {
List<ResourceMessage> messages = new LinkedList<ResourceMessage>();
// problem is here - there are messages in the topic but KStream returns no values in foreach(...)
resourceStream.foreach((key, value) -> messages.add(value));
return messages.stream().map(m -> Resource.builder()
.id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
}
}
问题-主题中有消息,但foreach(…(中的KStream枚举没有检索到任何结果;RUNNING";并且日志中没有错误。
生成随机APPLICATION_ ID并将AUTO_;最早的";没有帮助。使用Kafka工具,我可以清楚地看到主题中的一些消息。在控制器运行时添加新消息也没有帮助。关于卡夫卡流媒体,我有什么遗漏或不了解的吗?
PS我使用的是POJO序列化程序和反序列化程序的示例。
Kafka Streams是一个用于实时流处理的Kafka客户端。在您的情况下,您不需要Kafka Streams客户端(它不起作用(,您需要一个简单的Kafka Consumer,它从Kafka轮询记录并使用Rest API将其发回。例如:
@RestController
@RequestMapping("/resources/")
public class StreamController {
private KafkaStreams streams;
private Consumer<String, ResourceMessage> consumer;
StreamController() {
// configure consumer properties
Properties props = new Properties();
// make the right properties with your Serialize and deserialiser
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Create the consumer using props.
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
}
// GET method that enumerates KStream and returns contents
@GetMapping(value = "/resource")
public List<Resource> getResources() {
List<ResourceMessage> messages = new LinkedList<ResourceMessage>();
ConsumerRecords<String, ResourceMessage> consumerRecords =
consumer.poll(1000);
messages = consumerRecords... // Convert the records to your custom POJO
return messages.stream().map(m -> Resource.builder()
.id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
}
}
你可以在这里找到一个完整的示例链接。
更新
此外,您应该知道RestControlled是请求范围的,因此为每个请求创建了一个控制器实例。因此,最终您将不会得到API响应。如果你想使用Kafka Streams,你可以在你的主方法中启动它,同时你有一个Spring启动应用程序。例如,您可以看到链接。