Kafka Connect.当指定模式和构建SourceRecord值时,如何处理自定义对象的列表



我没有CryptoNews。它包含

List<Currencies> currencies

我想保存"currencies">字段到SourceRecord在构建它的时候。Can't figure out how to:

  1. 在schema中声明
  2. 在创建值时传递给Struct对象。

我的尝试以这个异常结束:模式类型STRUCT的无效Java对象:类com.dto.Currencies

当列表中的对象需要自己的Schema时,Kafka Connect没有提供明确的例子如何处理case。我也尝试过在Kafka测试用例中应用类似的方法,但它不起作用。https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java L95-L98

如何做到这一点?

kafka-connect-api version: 0.10.2.0-cp1
value and key converter: org.apache.kafka.connect.json.JsonConverter
no avro used

CryptoNews implements Serializable {
// omitted fields
private List<Currencies> currencies;
}
class Currencies {
private String code;
private String title;
private String slug;
private String url;
}

SchemaConfiguration

public static final Integer FIRST_VERSION = 1;
public static final String CURRENCIES_SCHEMA_NAME = "currencies";



public static final Schema NEWS_SCHEMA = SchemaBuilder.struct().name("News")
.version(FIRST_VERSION)
.field(CURRENCIES_SCHEMA_NAME, CURRENCIES_SCHEMA)
// simple fields ommited for brevity.
.build();



public static final Schema CURRENCIES_SCHEMA = SchemaBuilder.array(
SchemaBuilder.struct()
.field(CODE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build()
)
.optional()
.name(CURRENCIES_SCHEMA_NAME)
.version(FIRST_VERSION)
.build();

SourceTask

return new SourceRecord(
sourcePartition(),
sourceOffset(cryptoNews),
config.getString(TOPIC_CONFIG),
null,
CryptoNewsSchema.NEWS_KEY_SCHEMA,
buildRecordKey(cryptoNews),
CryptoNewsSchema.NEWS_SCHEMA,
buildRecordValue(cryptoNews),
Instant.now().toEpochMilli()
);


public Struct buildRecordValue(CryptoNews cryptoNews){
Struct valueStruct = new Struct(CryptoNewsSchema.NEWS_SCHEMA);

// Produces Invalid Java object for schema type STRUCT: class com.dto.Currencies
List<Currencies> currencies = cryptoNews.getCurrencies();
if (currencies != null) {
valueStruct.put(CurrenciesSchema.CURRENCIES_SCHEMA_NAME, currencies);
}

return valueStruct;
}

更新:

worker.properties

bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true
rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000

您需要提供一个List<Struct>

下面是一个完整的单元测试示例

首先,一个可以帮助

的接口
public interface ConnectPOJOConverter<T> {
Schema getSchema();
T fromConnectData(Struct s);
Struct toConnectData(T t);
}
class ArrayStructTest {
public static final Schema CURRENCY_ITEM_SCHEMA = SchemaBuilder.struct()
.version(1)
.name(Currency.class.getName())
.doc("A currency item")
.field("code", Schema.OPTIONAL_STRING_SCHEMA)
.field("title", Schema.OPTIONAL_STRING_SCHEMA)
.field("slug", Schema.OPTIONAL_STRING_SCHEMA)
.field("url", Schema.OPTIONAL_STRING_SCHEMA)
.build();
static final ConnectPOJOConverter<Currency> CONVERTER = new CurrencyConverter();
@Test
void myTest() {
// Given
List<Currency> currencies = new ArrayList<>();
// TODO: Get from external source
currencies.add(new Currency("200", "Hello", "/slug", "http://localhost"));
currencies.add(new Currency("200", "World", "/slug", "http://localhost"));
// When: build Connect Struct data
Schema valueSchema = SchemaBuilder.struct()
.name("CryptoNews")
.doc("A record holding a list of currency items")
.version(1)
.field("currencies", SchemaBuilder.array(CURRENCY_ITEM_SCHEMA).required().build())
.build();
final List<Struct> items = currencies.stream()
.map(CONVERTER::toConnectData)
.collect(Collectors.toList());
// In the SourceTask, this is what goes into the SourceRecord along with the valueSchema
Struct value = new Struct(valueSchema);
value.put("currencies", items);
// Then
assertDoesNotThrow(value::validate);
Object itemsFromStruct = value.get("currencies");
assertInstanceOf(List.class, itemsFromStruct);
//noinspection unchecked
List<Object> data = (List<Object>) itemsFromStruct; // could also use List<Struct>
assertEquals(2, data.size(), "same size");
assertInstanceOf(Struct.class, data.get(0), "Object list still has type information");
Struct firstStruct = (Struct) data.get(0);
assertEquals("Hello", firstStruct.get("title"));
currencies = data.stream()
.map(o -> (Struct) o)
.map(CONVERTER::fromConnectData)
.filter(Objects::nonNull)  // in case converter has errors, could return null
.collect(Collectors.toList());
assertTrue(currencies.size() <= data.size());
assertEquals("World", currencies.get(1).getTitle(), "struct parsing data worked");
}
static class CurrencyConverter implements ConnectPOJOConverter<Currency> {
@Override
public Schema getSchema() {
return CURRENCY_ITEM_SCHEMA;
}
@Override
public Currency fromConnectData(Struct s) {
// simple conversion, but more complex types could throw errors
return new Currency(
s.getString("code"),
s.getString("title"),
s.getString("url"),
s.getString("slug")
);
}
@Override
public Struct toConnectData(Currency c) {
Struct s = new Struct(getSchema());
s.put("code", c.getCode());
s.put("title", c.getTitle());
s.put("url", c.getUrl());
s.put("slug", c.getSlug());
return s;
}
}
}

另一种方法是只使用String模式,并使用JacksonObjectMapper获取JSON字符串,然后让JSONConverter处理其余的。

final ObjectMapper om = new ObjectMapper();
final Schema valueSchema = Schema.STRING_SCHEMA;
output.put("schema", new TextNode("TODO")); // replace with JSONConverter schema
// for-each currency
Map<String, JsonNode> output = new HashMap<>();
try {
output.put("payload", om.readTree(om.writeValueAsBytes(currency))); // write and parse to not double-encode

String value = om.writeValueAsString(output);
SourceRecord r = new SourceRecord(...., valueSchema, value);
records.add(r); // poll return result
} catch (IOException e) {
// TODO: handle
}
// end for-each
return records;

最新更新