<T>'org.apache.kafka.streams.StoreQueryParameters)' 中的 'store(org.apache.kafka.streams.Kafk



我使用流表示例来解释如下:https://udemy.com/course/kafka-streams-real-time-stream-processing-master-class/learn/lecture/14244016#questions,当我将kafka依赖关系从2.x升级到3.3.2时,下面的方法失败了

错误:

store(org.apache.kafka.streams.StoreQueryParameters<T>)' in 'org.apache.kafka.streams.KafkaStreams' cannot be applied to '(java.lang.String, org.apache.kafka.streams.state.QueryableStoreType<org.apache.kafka.streams.state.ReadOnlyKeyValueStore<java.lang.Object,java.lang.Object>>)'

QueryServer.java

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import spark.Spark;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import java.util.ArrayList;
import java.util.List;
class QueryServer {
private static final Logger logger = LogManager.getLogger();
private final String NO_RESULTS = "No Results Found";
private final String APPLICATION_NOT_ACTIVE = "Application is not active. Try later.";
private final KafkaStreams streams;
private Boolean isActive = false;
private final HostInfo hostInfo;
private Client client;
QueryServer(KafkaStreams streams, String hostname, int port) {
this.streams = streams;
this.hostInfo = new HostInfo(hostname, port);
client = ClientBuilder.newClient();
}
void setActive(Boolean state) {
isActive = state;
}
private List<KeyValue<String, String>> readAllFromLocal() {
List<KeyValue<String, String>> localResults = new ArrayList<>();
ReadOnlyKeyValueStore<String, String> stateStore =
streams.store(AppConfigs.stateStoreName, QueryableStoreTypes.keyValueStore());
stateStore.all().forEachRemaining(localResults::add);
return localResults;
}
void start() {
logger.info("Starting Query Server at http://" + hostInfo.host() + ":" + hostInfo.port()
+ "/" + AppConfigs.stateStoreName + "/all");
Spark.port(hostInfo.port());
Spark.get("/" + AppConfigs.stateStoreName + "/all", (req, res) -> {
List<KeyValue<String, String>> allResults;
String results;
if (!isActive) {
results = APPLICATION_NOT_ACTIVE;
} else {
allResults = readAllFromLocal();
results = (allResults.size() == 0) ? NO_RESULTS
: allResults.toString();
}
return results;
});
}
void stop() {
client.close();
Spark.stop();
}
}

MainApp.java

public class StreamingTableApp {
private static final Logger logger = LogManager.getLogger();
public static void main(final String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
props.put(StreamsConfig.STATE_DIR_CONFIG, AppConfigs.stateStoreLocation);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KTable<String, String> KT0 = streamsBuilder.table(AppConfigs.topicName);
KT0.toStream().print(Printed.<String, String>toSysOut().withLabel("KT0"));
KTable<String, String> KT1 = KT0.filter((k, v) -> k.matches(AppConfigs.regExSymbol) && !v.isEmpty(),
Materialized.as(AppConfigs.stateStoreName));
KT1.toStream().print(Printed.<String, String>toSysOut().withLabel("KT1"));
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), props);
//Query Server
QueryServer queryServer = new QueryServer(streams, AppConfigs.queryServerHost, AppConfigs.queryServerPort);
streams.setStateListener((newState, oldState) -> {
logger.info("State Changing to " + newState + " from " + oldState);
queryServer.setActive(newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING);
});
streams.start();
queryServer.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down servers");
queryServer.stop();
streams.close();
}));
}
}

您可以使用以下代码创建StoreQueryParameters的新对象。然后在store((中传递对象。

StoreQueryParameters<ReadOnlyKeyValueStore<Object, Object>> queryParameters = StoreQueryParameters.fromNameAndType(AppConfigs.stateStoreName, QueryableStoreTypes.keyValueStore()); 
streams.store(queryParameters);

相关内容

最新更新