如何更改@KafkaStreamsStateStore的默认serdes?我知道在Kafka流云中的3.0.1新版本中,方法在这里解释:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.1.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_state_store.但由于我使用的是2.1.12,请你帮我举一些代码示例。我找了很多地方都没找到。
@KafkaStreamsStateStore(名称=DEDUP_STORE,类型=KafkaStreamsStateStoreProperties.StoreType.KEYVALUE,keySerde="????"&";,valueSerde="????"(这也于事无补。
https://www.bountysource.com/issues/87943127-consider-changing-the-default-serdes-of-kafkastreamsstatestore
我尝试过:
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")
public class CustomSerde {
static public final class CustomSerdes extends WrapperSerde<Entity> {
public CustomSerdes () {
super(new JsonPOJOSerializer<Entity>(), new JsonPOJODeserializer<Entity>());
}
}
}
public static final String VALUE_SERDE = "CustomSerde$CustomSerdes";
public class JsonPOJODeserializer<T> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
public JsonPOJODeserializer() {
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;
T data;
try {
data = objectMapper.readValue(bytes, tClass);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
@Override
public void close() {
}
}
public class JsonPOJOSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
public JsonPOJOSerializer() {
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON
message", e);
}
}
@Override
public void close() {
}
}
但不起作用。请告知。
在上面的代码片段中,您有以下内容:
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")
VALUE_SERDE
究竟是什么?它是否实现了Serde
接口?只要它实现了一个合适的Serde
接口,它就应该工作。活页夹在内部将这个值降到StoreBuilder
。你有什么错误吗?如果您仍然面临问题,请与我们分享一个小样本应用程序,我们可以进一步调查。
找到了一个帮助我的解决方案:
@SuppressWarnings({"WeakerAccess", "unused"})
public class PageViewTypedDemo {
/**
* A serde for any class that implements {@link JSONSerdeCompatible}. Note that the classes also need to
* be registered in the {@code @JsonSubTypes} annotation on {@link JSONSerdeCompatible}.
*
* @param <T> The concrete type of the class that gets de/serialized
*/
public static class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>, Deserializer<T>, Serde<T> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {}
@SuppressWarnings("unchecked")
@Override
public T deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
try {
return (T) OBJECT_MAPPER.readValue(data, JSONSerdeCompatible.class);
} catch (final IOException e) {
throw new SerializationException(e);
}
}
@Override
public byte[] serialize(final String topic, final T data) {
if (data == null) {
return null;
}
try {
return OBJECT_MAPPER.writeValueAsBytes(data);
} catch (final Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {}
@Override
public Serializer<T> serializer() {
return this;
}
@Override
public Deserializer<T> deserializer() {
return this;
}
}
/**
* An interface for registering types that can be de/serialized with {@link JSONSerde}.
*/
@SuppressWarnings("DefaultAnnotationParam") // being explicit for the example
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t")
@JsonSubTypes({
@JsonSubTypes.Type(value = PageView.class, name = "pv"),
@JsonSubTypes.Type(value = UserProfile.class, name = "up"),
@JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
@JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"),
@JsonSubTypes.Type(value = RegionCount.class, name = "rc")
})
public interface JSONSerdeCompatible {
}
// POJO classes
static public class PageView implements JSONSerdeCompatible {
public String user;
public String page;
public Long timestamp;
}
static public class UserProfile implements JSONSerdeCompatible {
public String region;
public Long timestamp;
}
static public class PageViewByRegion implements JSONSerdeCompatible {
public String user;
public String page;
public String region;
}
static public class WindowedPageViewByRegion implements JSONSerdeCompatible {
public long windowStart;
public String region;
}
static public class RegionCount implements JSONSerdeCompatible {
public long count;
public String region;
}
public static void main(final String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, JSONSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JSONSerde.class);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), new JSONSerde<>()));
final KTable<String, UserProfile> users = builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new JSONSerde<>()));
final KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
.leftJoin(users, (view, profile) -> {
final PageViewByRegion viewByRegion = new PageViewByRegion();
viewByRegion.user = view.user;
viewByRegion.page = view.page;
if (profile != null) {
viewByRegion.region = profile.region;
} else {
viewByRegion.region = "UNKNOWN";
}
return viewByRegion;
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {
final WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
wViewByRegion.windowStart = key.window().start();
wViewByRegion.region = key.key();
final RegionCount rCount = new RegionCount();
rCount.region = key.key();
rCount.count = value;
return new KeyValue<>(wViewByRegion, rCount);
});
// write to the result topic
regionCount.to("streams-pageviewstats-typed-output");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-pipe-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
e.printStackTrace();
System.exit(1);
}
System.exit(0);
}
}