Kafka Connect.OffsetStorageReader只在第一个poll()处引发异常.在下一次poll()



线路

context.offsetStorageReader().offset(sourcePartition());

在第一次轮询时产生异常。在下一次投票中,也不例外。是否可以在不围绕getLatestSourceOffset()进行额外检查的情况下修复它,比如添加字段以确定它是否是第一次轮询?还是没有办法避免,我们应该增加检查?

kafka-connect-api版本:0.10.2.0-cp1

2022-06-19 05:52:34,538 ERROR [pool-1-thread-1] (OffsetStorageReaderImpl.java:102) - CRITICAL: Failed to deserialize offset data when getting offsets for task with namespace CryptoPanicSourceConnector. No value for this data will be returned, which may break the task or cause it to skip some data. This could either be due to an error in the connector implementation or incompatible schema.
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:54)
at com.delphian.bush.CryptoPanicSourceTask.getLatestSourceOffset(CryptoPanicSourceTask.java:97)
at com.delphian.bush.CryptoPanicSourceTask.poll(CryptoPanicSourceTask.java:61)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

worker.properties

bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true
rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000

源任务

public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
Optional<Long> sourceOffset = getLatestSourceOffset();
CryptoNewsResponse newsResponse = // getNewsFromApi
// Filter which news add to records based on sourceOffset. Shortened for brevity 
for (CryptoNews news : filteredNews) {
records.add(generateRecordFromNews(news));
}
return records;
}


private Optional<Long> getLatestSourceOffset() {
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition());
if (offset != null) {
Object id = offset.get("id");
if (id != null) {
Long latestOffset = Long.valueOf((String) id);
return Optional.of(latestOffset);
}
}
return Optional.empty();
}

private SourceRecord generateRecordFromNews(CryptoNews cryptoNews) {
return new SourceRecord(
sourcePartition(),
sourceOffset(cryptoNews),
config.getString(TOPIC_CONFIG),
null,
CryptoNewsSchema.NEWS_KEY_SCHEMA,
buildRecordKey(cryptoNews),
CryptoNewsSchema.NEWS_SCHEMA,
buildRecordValue(cryptoNews),
Instant.now().toEpochMilli()
);
}


private Map<String, String> sourceOffset(CryptoNews cryptoNews) {
Map<String, String> map = new HashMap<>();
map.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId());
return map;
}

更新

我不使用AvroProtobuf。我的新闻模式:

public static final Schema NEWS_SCHEMA = SchemaBuilder.struct()
.name(SCHEMA_NAME)
.version(FIRST_VERSION)
.field(NewsSourceSchema.SCHEMA_NAME,  SOURCE_SCHEMA)
.field(CurrencySchema.SCHEMA_NAME, SchemaBuilder.array(CURRENCY_SCHEMA).optional())
.field(KIND_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(DOMAIN_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(PUBLISHED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(ID_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(CREATED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.build();

public Struct toConnectData(CryptoNews cryptoNews) {
Struct struct = new Struct(CryptoNewsSchema.NEWS_SCHEMA)
.put(NewsSourceSchema.SCHEMA_NAME, NewsSourceConverter.INSTANCE.toConnectData(cryptoNews.getSource()))
.put(CryptoNewsSchema.KIND_FIELD, cryptoNews.getKind())
.put(CryptoNewsSchema.DOMAIN_FIELD, cryptoNews.getDomain())
.put(CryptoNewsSchema.TITLE_FIELD, cryptoNews.getTitle())
.put(CryptoNewsSchema.PUBLISHED_AT_FIELD, cryptoNews.getPublishedAt())
.put(CryptoNewsSchema.SLUG_FIELD, cryptoNews.getSlug())
.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId())
.put(CryptoNewsSchema.URL_FIELD, cryptoNews.getUrl())
.put(CryptoNewsSchema.CREATED_AT_FIELD, cryptoNews.getCreatedAt());
List<Currency> currencies = Optional.ofNullable(cryptoNews.getCurrencies()).orElse(new ArrayList<>());
final List<Struct> items = currencies.stream()
.map(CONVERTER::toConnectData)
.collect(Collectors.toList());
struct.put(CurrencySchema.SCHEMA_NAME, items);
return struct;
}

更新2

连接器属性

name=CryptoPanicSourceConnector
tasks.max=1
connector.class=com.delphian.bush.CryptoPanicSourceConnector
topic=crypto-news

启动命令:

connect-standalone config/worker.properties config/custom-connector.properties

在Connect中使用纯JSON数据时,您可能会看到以下错误消息:org.apache.kafka.Connect.errors.DataException:JsonDeserializer with schemas.enable requires"模式";以及";有效载荷";字段,并且可能不包含其他字段。

对于没有架构的纯文本,您需要将转换器的schemas.enable参数设置为false。

bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

最新更新