我正在寻找一个使用Kafka流的示例,以进行此类操作,即使用地址表加入客户表并将数据沉入ES: -
客户
+------+------------+----------------+-----------------------+
| id | first_name | last_name | email |
+------+------------+----------------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Davidson | ed@walker.com |
| 1004 | Anne | Kim | annek@noanswer.org |
+------+------------+----------------+-----------------------+
地址
+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street | city | state | zip | type |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 | 1001 | 3183 Moore Avenue | Euless | Texas | 76036 | SHIPPING |
| 11 | 1001 | 2389 Hidden Valley Road | Harrisburg | Pennsylvania | 17116 | BILLING |
| 12 | 1002 | 281 Riverside Drive | Augusta | Georgia | 30901 | BILLING |
| 13 | 1003 | 3787 Brownton Road | Columbus | Mississippi | 39701 | SHIPPING |
| 14 | 1003 | 2458 Lost Creek Road | Bethlehem | Pennsylvania | 18018 | SHIPPING |
| 15 | 1003 | 4800 Simpson Square | Hillsdale | Oklahoma | 73743 | BILLING |
| 16 | 1004 | 1289 University Hill Road | Canehill | Arkansas | 72717 | LIVING |
+----+-------------+---------------------------+------------+--------------+-------+----------+
输出Elasticsearch索引
"hits": [
{
"_index": "customers_with_addresses",
"_type": "_doc",
"_id": "1",
"_score": 1.3278645,
"_source": {
"first_name": "Sally",
"last_name": "Thomas",
"email": "sally.thomas@acme.com",
"addresses": [{
"street": "3183 Moore Avenue",
"city": "Euless",
"state": "Texas",
"zip": "76036",
"type": "SHIPPING"
}, {
"street": "2389 Hidden Valley Road",
"city": "Harrisburg",
"state": "Pennsylvania",
"zip": "17116",
"type": "BILLING"
}],
}
}, ….
表数据来自Debezium主题,我认为我需要一些Java在中间加入流,然后将其输出到一个新主题中,然后将其沉入ES?
有人会有任何示例代码吗?
谢谢。
- 将主题作为流消费。
- 使用客户ID汇总列表中的地址流并将流转换为表。
- 使用地址表加入客户流
以下是一个示例(考虑数据以JSON格式消费(:
KStream<String,JsonNode> customers = builder.stream("customer", Consumed.with(stringSerde, jsonNodeSerde));
KStream<String,JsonNode> addresses = builder.stream("address", Consumed.with(stringSerde, jsonNodeSerde));
// Select the customer ID as key in order to join with address.
KStream<String,JsonNode> customerRekeyed = customers.selectKey(value-> value.get("id").asText());
ObjectMapper mapper = new ObjectMapper();
// Select Customer_id as key to aggregate the addresses and join with customer
KTable<String,JsonNode> addressTable = addresses
.selectKey(value-> value.get("customer_id").asText())
.groupByKey()
.aggregate(() ->mapper::createObjectNode, //initializer
(key,value,aggregate) -> aggregate.add(value),
Materialized.with(stringSerde, jsonNodeSerde)
); //adder
// Join Customer Stream with Address Table
KStream<String,JsonNode> customerAddressStream = customerRekeyed.leftJoin(addressTable,
(left,right) -> {
ObjectNode finalNode = mapper.createObjectNode();
ArrayList addressList = new ArrayList<JsonNode>();
// Considering the address is arrayNode
((ArrayNode)right).elements().forEachRemaining(addressList ::add);
left.putArray("addresses").allAll(addressList);
return left;
},Joined.keySerde(stringSerde).withValueSerde(jsonNodeSerde));
您可以在此处引用有关所有类型连接类型的详细信息:
https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#joining
取决于您要求在一个客户节点中嵌套多个地址的严格要求,您可以在KSQL中执行此操作(这是在Kafka流的顶部构建的(。
将一些测试数据填充到KAFKA中(在您的情况下是通过Debezium完成的(:
$ curl -s "https://api.mockaroo.com/api/ffa9ff20?count=10&key=ff7856d0" | kafkacat -b localhost:9092 -t addresses -P
$ curl -s "https://api.mockaroo.com/api/9b868890?count=4&key=ff7856d0" | kafkacat -b localhost:9092 -t customers -P
启动ksql,首先检查数据:
ksql> PRINT 'addresses' FROM BEGINNING ;
Format:JSON
{"ROWTIME":1558519823351,"ROWKEY":"null","id":1,"customer_id":1004,"street":"8 Moulton Center","city":"Bronx","state":"New York","zip":"10474","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":2,"customer_id":1001,"street":"5 Hollow Ridge Alley","city":"Washington","state":"District of Columbia","zip":"20016","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":3,"customer_id":1000,"street":"58 Maryland Point","city":"Greensboro","state":"North Carolina","zip":"27404","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":4,"customer_id":1002,"street":"55795 Derek Avenue","city":"Temple","state":"Texas","zip":"76505","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":5,"customer_id":1002,"street":"164 Continental Plaza","city":"Modesto","state":"California","zip":"95354","type":"SHIPPING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":6,"customer_id":1004,"street":"6 Miller Road","city":"Louisville","state":"Kentucky","zip":"40205","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":7,"customer_id":1003,"street":"97 Shasta Place","city":"Pittsburgh","state":"Pennsylvania","zip":"15286","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":8,"customer_id":1000,"street":"36 Warbler Circle","city":"Memphis","state":"Tennessee","zip":"38109","type":"SHIPPING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":9,"customer_id":1001,"street":"890 Eagan Circle","city":"Saint Paul","state":"Minnesota","zip":"55103","type":"SHIPPING"}
{"ROWTIME":1558519823354,"ROWKEY":"null","id":10,"customer_id":1000,"street":"8 Judy Terrace","city":"Washington","state":"District of Columbia","zip":"20456","type":"SHIPPING"}
^C
Topic printing ceased
ksql>
ksql> PRINT 'customers' FROM BEGINNING;
Format:JSON
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1001,"first_name":"Jolee","last_name":"Handasyde","email":"jhandasyde0@nhs.uk"}
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1002,"first_name":"Rebeca","last_name":"Kerrod","email":"rkerrod1@sourceforge.net"}
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1003,"first_name":"Bobette","last_name":"Brumble","email":"bbrumble2@cdc.gov"}
{"ROWTIME":1558519852368,"ROWKEY":"null","id":1004,"first_name":"Royal","last_name":"De Biaggi","email":"rdebiaggi3@opera.com"}
现在,我们在数据上声明STREAM
(KAFKA主题 架构(,以便我们可以进一步操纵它:
ksql> CREATE STREAM addresses_RAW (ID INT, CUSTOMER_ID INT, STREET VARCHAR, CITY VARCHAR, STATE VARCHAR, ZIP VARCHAR, TYPE VARCHAR) WITH (KAFKA_TOPIC='addresses', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM customers_RAW (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
我们将将customers
建模为TABLE
,并做需要正确键入KAFKA消息(以及它们具有空键的时刻,从上面的PRINT
输出中的"ROWKEY":"null"
可以看出(。您可以配置Debezium以设置消息键,因此对于KSQL而言,您可能不需要此步骤:
ksql> CREATE STREAM CUSTOMERS_KEYED WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_RAW PARTITION BY ID;
Message
----------------------------
Stream created and running
----------------------------
现在,我们声明了一个TABLE
( state 为给定键,从kafka主题 架构实例化(:
ksql> CREATE TABLE CUSTOMER (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='CUSTOMERS_KEYED', VALUE_FORMAT='JSON', KEY='ID');
Message
---------------
Table created
---------------
现在我们可以加入数据:
ksql> CREATE STREAM customers_with_addresses AS
SELECT CUSTOMER_ID,
FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME,
FIRST_NAME,
LAST_NAME,
TYPE AS ADDRESS_TYPE,
STREET,
CITY,
STATE,
ZIP
FROM ADDRESSES_RAW A
INNER JOIN CUSTOMER C
ON A.CUSTOMER_ID = C.ID;
Message
----------------------------
Stream created and running
----------------------------
这将创建一个新的KSQL流,进而填充一个新的Kafka主题。
ksql> SHOW STREAMS;
Stream Name | Kafka Topic | Format
------------------------------------------------------------------------------------------
CUSTOMERS_KEYED | CUSTOMERS_KEYED | JSON
ADDRESSES_RAW | addresses | JSON
CUSTOMERS_RAW | customers | JSON
CUSTOMERS_WITH_ADDRESSES | CUSTOMERS_WITH_ADDRESSES | JSON
流具有模式:
ksql> DESCRIBE CUSTOMERS_WITH_ADDRESSES;
Name : CUSTOMERS_WITH_ADDRESSES
Field | Type
------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
CUSTOMER_ID | INTEGER (key)
FULL_NAME | VARCHAR(STRING)
FIRST_NAME | VARCHAR(STRING)
ADDRESS_TYPE | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
STREET | VARCHAR(STRING)
CITY | VARCHAR(STRING)
STATE | VARCHAR(STRING)
ZIP | VARCHAR(STRING)
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
我们可以查询流:
ksql> SELECT * FROM CUSTOMERS_WITH_ADDRESSES WHERE CUSTOMER_ID=1002;
1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | LIVING | Kerrod | 55795 Derek Avenue | Temple | Texas | 76505
1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | SHIPPING | Kerrod | 164 Continental Plaza | Modesto | California | 95354
我们还可以使用Kafka Connect将其流式传输到Elasticsearch:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d '{
"name": "sink-elastic-customers_with_addresses-00",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "CUSTOMERS_WITH_ADDRESSES",
"connection.url": "http://elasticsearch:9200",
"type.name": "type.name=kafkaconnect",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
结果:
$ curl -s http://localhost:9200/customers_with_addresses/_search | jq '.hits.hits[0]'
{
"_index": "customers_with_addresses",
"_type": "type.name=kafkaconnect",
"_id": "CUSTOMERS_WITH_ADDRESSES+0+2",
"_score": 1,
"_source": {
"ZIP": "76505",
"CITY": "Temple",
"ADDRESS_TYPE": "LIVING",
"CUSTOMER_ID": 1002,
"FULL_NAME": "Rebeca Kerrod",
"STATE": "Texas",
"STREET": "55795 Derek Avenue",
"LAST_NAME": "Kerrod",
"FIRST_NAME": "Rebeca"
}
}
我们在Debezium Blog上构建了一个非常有用的情况(流向Elasticsearch的流式汇总(的演示和博客文章。
要记住的一个问题是,该解决方案(基于Kafka流,但我认为KSQL是相同的(,很容易公开中介加入结果。例如。假设您在一次交易中插入客户和10个地址。流联接方法可能首先会产生客户及其前五个地址的汇总,此后不久将所有10个地址提供完整的汇总。对于您的特定用例,这可能是或不可能的。我还记得,处理删除并不是微不足道(例如,如果您删除了10个地址之一,那么您必须再次使用其余9个地址来制作汇总,但可能会不会被触及(。
。可以考虑的替代方案可以是您本质上产生明确的事件的输出模式,并从应用程序本身内部进行了预先计算的汇总。IE。它需要该应用程序的一点帮助,但随后它避免了事实之后产生加入结果的微妙之处。