我需要在SourceConnector.taskConfigs()
方法中读取偏移量,有没有办法做到这一点?
有一张票和公关
https://issues.apache.org/jira/browse/KAFKA-4794
现在,我设法使用反射(JOOR(获得offsetReader:
private OffsetStorageReader getOffsetStorageReader() {
try {
Object innerContext = on(context).get("this$0");
Object ctx = on(innerContext).get("ctx");
Object herder = on(ctx).get("herder");
String connectorName = on(ctx).get("connectorName");
Object worker = on(herder).get("worker");
Object internalKeyConverter = on(worker).get("internalKeyConverter");
Object internalValueConverter = on(worker).get("internalValueConverter");
Object offsetBackingStore = on(worker).get("offsetBackingStore");
return (OffsetStorageReader)
on(Class.forName("org.apache.kafka.connect.storage.OffsetStorageReaderImpl"))
.create(offsetBackingStore, connectorName, internalKeyConverter, internalValueConverter)
.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}