我有一个从postgres表中提取数据的流,定义如下:
CREATE TABLE "user" (
"_uid" UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY,
"_created" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
"_updated" TIMESTAMP(3) NULL,
"_disabled" TIMESTAMP(3) NULL,
"display_name" VARCHAR(100) NOT NULL,
"email" VARCHAR(100) NOT NULL UNIQUE,
"password" TEXT NOT NULL
);
在ksqldb中我创建了一个SOURCE CONNECTOR,如下所示:
CREATE SOURCE CONNECTOR "source-postgres-api_auth" WITH (
"connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
"connection.url"='jdbc:postgresql://postgres:5432/api_auth',
"connection.user"='postgres',
"connection.password"='postgres',
"mode"='bulk',
"topic.prefix"='source-postgres-api_auth-',
"table.blacklist"='_changelog, _changelog_lock'
);
这样我就可以检测更改并生成历史记录,我有一个这样的STREAM:
CREATE STREAM "stream-api_auth-user" (
"_uid" STRING,
"_created" TIMESTAMP,
"_updated" TIMESTAMP,
"_disabled" TIMESTAMP,
"display_name" STRING,
"email" STRING,
"password" STRING
) WITH (
KAFKA_TOPIC = 'source-postgres-api_auth-user',
VALUE_FORMAT = 'AVRO'
);
我从这个STREAM创建了一个TABLE:
CREATE TABLE "table-api_auth-user" WITH (
KAFKA_TOPIC = 'table-api_auth-user',
VALUE_FORMAT = 'AVRO'
) AS SELECT
"_uid",
LATEST_BY_OFFSET("_created") AS "_created",
LATEST_BY_OFFSET("_updated") AS "_updated",
LATEST_BY_OFFSET("_disabled") AS "_disabled",
LATEST_BY_OFFSET("display_name") AS "display_name",
LATEST_BY_OFFSET("email") AS "email",
LATEST_BY_OFFSET("password") AS "password"
FROM "stream-api_auth-user"
GROUP BY "_uid"
EMIT CHANGES;
最后,我有一个同步到elasticsearch像这样:
CREATE SINK CONNECTOR "sync-elasticsearch-user" WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = 'kafka-connect',
'topics' = 'table-api_auth-user'
);
我的问题是,当我在elasticsearch中查找时,TIMESTAMP类型的字段作为数字进入,我意识到TABLE正在使用的主题数据正在转换为数字,而不是ISO 8601:
ksql> print "table-api_auth-user";
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2022/12/01 21:13:36.844 Z, key: [a2d9ff97-2c95-4da0-98e0-5492@7293921773168638261/-], value: {"_created":1669926069726,"_updated":null,"_disabled":null,"display_name":"Super User","email":"superuser@email.com","password":"4072d7365233d8ede7ca8548543222dfb96b17780aa8d6ff93ab69c0985ef21fc8105d03590a61b9"}, partition: 0
rowtime: 2022/12/01 21:13:36.847 Z, key: [b60448d2-e518-4479-9aff-2734@3631370472181359666/-], value: {"_created":1669916433173,"_updated":1669916803008,"_disabled":1669916803008,"display_name":"Cremin 7a8c281c4bed","email":"Byrne.8dd1dcf3bfa4@yahoo.com","password":"e89af05eae87f0667eba762fdd382ce942bb76b796b8fe20d9e71f142bac9f7a6fbbfc6b51d4527e"}, partition: 0
当表将数据发送到主题时,这些类型的时间戳字段转换为ISO 8601,我能做些什么吗?
有人能帮我一下吗?您可以通过摄取管道从Elasticsearch端转换字段:
https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html
我读到在接收连接器上没有指定摄取管道的选项:
https://github.com/confluentinc/kafka-connect-elasticsearch/issues/72
因此,您必须创建一个索引模板,它将捕获索引的名称,并应用管道。
步骤1:创建摄取管道
我将使用日期处理器将您的格式(UNIX_MS)转换为ISO8601
https://www.elastic.co/guide/en/elasticsearch/reference/current/date-processor.html
PUT _ingest/pipeline/parsedate
{
"processors": [
{
"date": {
"field": "date",
"formats": [
"UNIX_MS"
],
"target_field": "date_converted",
"ignore_failure": true
}
}
]
}
测试输出(日期字段vs日期转换:
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"date": 1669916803008,
"date_converted": "2022-12-01T17:46:43.008Z"
},
"_ingest": {
"timestamp": "2022-12-02T07:54:02.731666786Z"
}
}
}
]
}
步骤2:创建索引模板
假设索引名是table-api_auth-user*
PUT _index_template/template_1
{
"index_patterns": ["table-api_auth-user*"],
"template": {
"settings": {
"index.default_pipeline": "parsedate"
}
}
}
从现在开始,每次你发送一个文档到这个索引:table-api_auth-user*,将应用你在开始设置的摄取管道。