Kafka自定义反序列化器



到目前为止,我已经能够在主题的帮助下创建一个KStream。

KStream<String, Object> testqa2 = builder.stream("testqa2", Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> {
System.out.println(value);
return value;
});

它不打印任何东西,所以在调试时-我意识到我只是在创建我的KStream。里面没有数据

我在为worker类创建序列化器/反序列化器时遇到了一点麻烦。

package com.copart.mwa.Avro;
public class Worker {
private static String WorkerActivityName;
private static String WorkerSid;
private static String WorkerPreviousActivityName;
private static String WorkerPreviousActivitySid;
public String getWorkerActivityName() {
return WorkerActivityName;
}
public void setWorkerActivityName(String workerActivityName) {
WorkerActivityName = workerActivityName;
}
public static String getWorkerSid() {
return WorkerSid;
}
public void setWorkerSid(String workerSid) {
WorkerSid = workerSid;
}
public String getWorkerPreviousActivityName() {
return WorkerPreviousActivityName;
}
public void setWorkerPreviousActivityName(String workerPreviousActivityName) {
WorkerPreviousActivityName = workerPreviousActivityName;
}
public String getWorkerPreviousActivitySid() {
return WorkerPreviousActivitySid;
}
public void setWorkerPreviousActivitySid(String workerPreviousActivitySid) {
WorkerPreviousActivitySid = workerPreviousActivitySid;
}
@Override
public String toString() {
return "Worker(" + WorkerSid + ", " + WorkerActivityName + ")";
} }

从生产者到消费者的消息是JSON

{
"WorkerActivityName": "Available",
"EventType": "worker.activity.update",
"ResourceType": "worker",
"WorkerTimeInPreviousActivityMs": "237",
"Timestamp": "1626114642",
"WorkerActivitySid": "WAc9030ef021bc1786d3ae11544f4d9883",
"WorkerPreviousActivitySid": "WAf4feb231e97c1878fecc58b26fdb95f3",
"WorkerTimeInPreviousActivity": "0",
"AccountSid": "AC8c5cd8c9ba538090da104b26d68a12ec",
"WorkerName": "Dorothy.Finegan@Copart.Com",
"Sid": "EV284c8a8bc27480e40865263f0b42e5cf",
"TimestampMs": "1626114642204",
"P": "WKe638256376188fab2a98cccb3c803d67",
"WorkspaceSid": "WS38b10d521442ecb74fcc263d5a4d726e",
"WorkspaceName": "Copart-MiPhone",
"WorkerPreviousActivityName": "Unavailable(RNA)",
"EventDescription": "Worker Dorothy.Finegan@Copart.Com updated to Available Activity",
"ResourceSid": "WKe638256376188fab2a98cccb3c803d67",
"WorkerAttributes": "{"miphone_dept":["USA_YRD_OPS"],"languages":["en"],"home_region":"GL","roles":["supervisor"],"miphone_yards":["81"],"miphone_enabled":true,"miphone_states":["IL"],"home_state":"IL","skills":["YD_SELLER","YD_TITLE"],"home_division":"Northern","miphone_divisions":["Northern"],"miphone_functions":["outbound_only"],"full_name":"Dorothy Finegan","miphone_regions":["GL"],"home_country":"USA","copart_user_id":"USA3204","home_yard":"81","home_dept":"USA_YRD_OPS","email":"dorothy.finegan@copart.com","home_dept_category":"OPS","contact_uri":"client:Dorothy_2EFinegan_40Copart_2ECom","queue_activity":"Available","teams":[],"remote_employee":false,"miphone_call_center_units":["USA_YRD_OPS|81"],"miphone_call_center_teams":[]}"
}

我想实现一个客户反序列化器,其中

"WorkspaceSid">WS38b10d521442ecb74fcc263d5a4d726e

为键,其他属性的其余值作为键-值对的值。

谢谢,Anmol

它不打印任何内容

如果testqa2主题中有数据,并且您有auto.offset.reset=earliest,那么它应该。

在为worker类创建序列化器/反序列化器时遇到一点麻烦

Kafka有内置的JSON序列化器,你可以为它构建一个Serde。你不需要自己制作。

" workspaceid ",是关键

如果要修改键,请使用selectKeymap,而不是mapValues

Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonNodeSerializer,jsonNodeDeserializer);
KStream<String, JsonNode> testqa2 = builder.stream("testqa2", Consumed.with(Serdes.String(), jsonSerde))
.selectKey((k, json) -> json.get("WorkspaceSid"))
.print(Printed.toSysOut());

或者,修复你的生产者代码,从值中获取Sid,并在那里设置键…

如果你想使用Avro,你不会写一个Worker类——你会从Avro模式中生成它。

相关内容

  • 没有找到相关文章

最新更新