线路
context.offsetStorageReader().offset(sourcePartition());
在第一次轮询时产生异常。在下一次投票中,也不例外。是否可以在不围绕getLatestSourceOffset()进行额外检查的情况下修复它,比如添加字段以确定它是否是第一次轮询?还是没有办法避免,我们应该增加检查?
kafka-connect-api版本:0.10.2.0-cp1
2022-06-19 05:52:34,538 ERROR [pool-1-thread-1] (OffsetStorageReaderImpl.java:102) - CRITICAL: Failed to deserialize offset data when getting offsets for task with namespace CryptoPanicSourceConnector. No value for this data will be returned, which may break the task or cause it to skip some data. This could either be due to an error in the connector implementation or incompatible schema.
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:54)
at com.delphian.bush.CryptoPanicSourceTask.getLatestSourceOffset(CryptoPanicSourceTask.java:97)
at com.delphian.bush.CryptoPanicSourceTask.poll(CryptoPanicSourceTask.java:61)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
worker.properties
bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true
rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000
源任务
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
Optional<Long> sourceOffset = getLatestSourceOffset();
CryptoNewsResponse newsResponse = // getNewsFromApi
// Filter which news add to records based on sourceOffset. Shortened for brevity
for (CryptoNews news : filteredNews) {
records.add(generateRecordFromNews(news));
}
return records;
}
private Optional<Long> getLatestSourceOffset() {
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition());
if (offset != null) {
Object id = offset.get("id");
if (id != null) {
Long latestOffset = Long.valueOf((String) id);
return Optional.of(latestOffset);
}
}
return Optional.empty();
}
private SourceRecord generateRecordFromNews(CryptoNews cryptoNews) {
return new SourceRecord(
sourcePartition(),
sourceOffset(cryptoNews),
config.getString(TOPIC_CONFIG),
null,
CryptoNewsSchema.NEWS_KEY_SCHEMA,
buildRecordKey(cryptoNews),
CryptoNewsSchema.NEWS_SCHEMA,
buildRecordValue(cryptoNews),
Instant.now().toEpochMilli()
);
}
private Map<String, String> sourceOffset(CryptoNews cryptoNews) {
Map<String, String> map = new HashMap<>();
map.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId());
return map;
}
更新
我不使用Avro和Protobuf。我的新闻模式:
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct()
.name(SCHEMA_NAME)
.version(FIRST_VERSION)
.field(NewsSourceSchema.SCHEMA_NAME, SOURCE_SCHEMA)
.field(CurrencySchema.SCHEMA_NAME, SchemaBuilder.array(CURRENCY_SCHEMA).optional())
.field(KIND_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(DOMAIN_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(PUBLISHED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(ID_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(CREATED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.build();
public Struct toConnectData(CryptoNews cryptoNews) {
Struct struct = new Struct(CryptoNewsSchema.NEWS_SCHEMA)
.put(NewsSourceSchema.SCHEMA_NAME, NewsSourceConverter.INSTANCE.toConnectData(cryptoNews.getSource()))
.put(CryptoNewsSchema.KIND_FIELD, cryptoNews.getKind())
.put(CryptoNewsSchema.DOMAIN_FIELD, cryptoNews.getDomain())
.put(CryptoNewsSchema.TITLE_FIELD, cryptoNews.getTitle())
.put(CryptoNewsSchema.PUBLISHED_AT_FIELD, cryptoNews.getPublishedAt())
.put(CryptoNewsSchema.SLUG_FIELD, cryptoNews.getSlug())
.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId())
.put(CryptoNewsSchema.URL_FIELD, cryptoNews.getUrl())
.put(CryptoNewsSchema.CREATED_AT_FIELD, cryptoNews.getCreatedAt());
List<Currency> currencies = Optional.ofNullable(cryptoNews.getCurrencies()).orElse(new ArrayList<>());
final List<Struct> items = currencies.stream()
.map(CONVERTER::toConnectData)
.collect(Collectors.toList());
struct.put(CurrencySchema.SCHEMA_NAME, items);
return struct;
}
更新2
连接器属性
name=CryptoPanicSourceConnector
tasks.max=1
connector.class=com.delphian.bush.CryptoPanicSourceConnector
topic=crypto-news
启动命令:
connect-standalone config/worker.properties config/custom-connector.properties
在Connect中使用纯JSON数据时,您可能会看到以下错误消息:org.apache.kafka.Connect.errors.DataException:JsonDeserializer with schemas.enable requires"模式";以及";有效载荷";字段,并且可能不包含其他字段。
对于没有架构的纯文本,您需要将转换器的schemas.enable参数设置为false。
bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false