到目前为止,我已经能够在主题的帮助下创建一个KStream。
KStream<String, Object> testqa2 = builder.stream("testqa2", Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> {
System.out.println(value);
return value;
});
它不打印任何东西,所以在调试时-我意识到我只是在创建我的KStream。里面没有数据
我在为worker类创建序列化器/反序列化器时遇到了一点麻烦。
package com.copart.mwa.Avro;
public class Worker {
private static String WorkerActivityName;
private static String WorkerSid;
private static String WorkerPreviousActivityName;
private static String WorkerPreviousActivitySid;
public String getWorkerActivityName() {
return WorkerActivityName;
}
public void setWorkerActivityName(String workerActivityName) {
WorkerActivityName = workerActivityName;
}
public static String getWorkerSid() {
return WorkerSid;
}
public void setWorkerSid(String workerSid) {
WorkerSid = workerSid;
}
public String getWorkerPreviousActivityName() {
return WorkerPreviousActivityName;
}
public void setWorkerPreviousActivityName(String workerPreviousActivityName) {
WorkerPreviousActivityName = workerPreviousActivityName;
}
public String getWorkerPreviousActivitySid() {
return WorkerPreviousActivitySid;
}
public void setWorkerPreviousActivitySid(String workerPreviousActivitySid) {
WorkerPreviousActivitySid = workerPreviousActivitySid;
}
@Override
public String toString() {
return "Worker(" + WorkerSid + ", " + WorkerActivityName + ")";
} }
从生产者到消费者的消息是JSON
{
"WorkerActivityName": "Available",
"EventType": "worker.activity.update",
"ResourceType": "worker",
"WorkerTimeInPreviousActivityMs": "237",
"Timestamp": "1626114642",
"WorkerActivitySid": "WAc9030ef021bc1786d3ae11544f4d9883",
"WorkerPreviousActivitySid": "WAf4feb231e97c1878fecc58b26fdb95f3",
"WorkerTimeInPreviousActivity": "0",
"AccountSid": "AC8c5cd8c9ba538090da104b26d68a12ec",
"WorkerName": "Dorothy.Finegan@Copart.Com",
"Sid": "EV284c8a8bc27480e40865263f0b42e5cf",
"TimestampMs": "1626114642204",
"P": "WKe638256376188fab2a98cccb3c803d67",
"WorkspaceSid": "WS38b10d521442ecb74fcc263d5a4d726e",
"WorkspaceName": "Copart-MiPhone",
"WorkerPreviousActivityName": "Unavailable(RNA)",
"EventDescription": "Worker Dorothy.Finegan@Copart.Com updated to Available Activity",
"ResourceSid": "WKe638256376188fab2a98cccb3c803d67",
"WorkerAttributes": "{"miphone_dept":["USA_YRD_OPS"],"languages":["en"],"home_region":"GL","roles":["supervisor"],"miphone_yards":["81"],"miphone_enabled":true,"miphone_states":["IL"],"home_state":"IL","skills":["YD_SELLER","YD_TITLE"],"home_division":"Northern","miphone_divisions":["Northern"],"miphone_functions":["outbound_only"],"full_name":"Dorothy Finegan","miphone_regions":["GL"],"home_country":"USA","copart_user_id":"USA3204","home_yard":"81","home_dept":"USA_YRD_OPS","email":"dorothy.finegan@copart.com","home_dept_category":"OPS","contact_uri":"client:Dorothy_2EFinegan_40Copart_2ECom","queue_activity":"Available","teams":[],"remote_employee":false,"miphone_call_center_units":["USA_YRD_OPS|81"],"miphone_call_center_teams":[]}"
}
我想实现一个客户反序列化器,其中
"WorkspaceSid">WS38b10d521442ecb74fcc263d5a4d726e
为键,其他属性的其余值作为键-值对的值。谢谢,Anmol
它不打印任何内容
如果testqa2
主题中有数据,并且您有auto.offset.reset=earliest
,那么它应该。
在为worker类创建序列化器/反序列化器时遇到一点麻烦
Kafka有内置的JSON序列化器,你可以为它构建一个Serde。你不需要自己制作。
" workspaceid ",是关键
如果要修改键,请使用selectKey
或map
,而不是mapValues
Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonNodeSerializer,jsonNodeDeserializer);
KStream<String, JsonNode> testqa2 = builder.stream("testqa2", Consumed.with(Serdes.String(), jsonSerde))
.selectKey((k, json) -> json.get("WorkspaceSid"))
.print(Printed.toSysOut());
或者,修复你的生产者代码,从值中获取Sid
,并在那里设置键…
如果你想使用Avro,你不会写一个Worker类——你会从Avro模式中生成它。