我不太明白如何注册架构以在 jdbc 源或接收器连接器中使用它并在 Spark 中读取数据
这是我想用来从 MS SQL 数据库中检索记录的 avro 架构
{
"type": "record",
"name": "myrecord",
"fields": [
{ "name": "int1", "type": "int" },
{ "name": "str1", "type": "string" },
{ "name": "str2", "type": "string" }
]
}
我想将此架构用于此源连接器
{"name": "mssql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"incrementing.column.name": "int1",
"tasks.max": "1",
"table.whitelist": "Hello",
"mode": "incrementing",
"topic.prefix": "mssql-",
"name": "mssql-source",
"connection.url":
"jdbc:sqlserver://XXX.XXX.X;databaseName=XXX;username=XX;password=XX"
}
这是我正在使用的Spark Consumer。
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class SparkAvroConsumer {
private static Injection<GenericRecord, byte[]> recordInjection;
private static final String USER_SCHEMA = "{"
+ ""type":"record","
+ ""name":"myrecord","
+ ""fields":["
+ " { "name":"int1", "type":"int" },"
+ " { "name":"str1", "type":"string" },"
+ " { "name":"str2", "type":"string" }"
+ "]}";
static {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
recordInjection = GenericAvroCodecs.toBinary(schema);
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Set<String> topics = Collections.singleton("mssql-Hello");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("schema.registry.url", "http://localhost:8081");
JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
directKafkaStream
.map(message -> recordInjection.invert(message._2).get())
.foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println("int1= " + record.get("int1")
+ ", str1= " + record.get("str1")
+ ", str2=" + record.get("str2"));
});
});
ssc.start();
ssc.awaitTermination();
}
}
每个模式都有一个schemaId
,当您将模式注册到 Confluent Schema Registry 时,它会为其创建 Int ID。该 ID 将附加到源系统发送的消息中。(检查此链接(。您可以使用CachedSchemaRegistryClient
从SchemaRegistry
获取模式,您可以执行以下操作(其 Scala 代码(:
var schemaRegistry: SchemaRegistryClient = null
val url = "http://schema.registry.url:8181"
schemaRegistry = new CachedSchemaRegistryClient(url, 10)
val schema = schemaRegistry.getByID(schemaId) // consult the Schema Registry if you know the `SchemaId` in advance (you get this while registering your Schema)
//CachedSchemaRegistryClient have getAllSubjects API that will return all the schemas in your registry.
println(schema)
如果要从传入消息中获取架构 ID,请执行以下操作:
def getSchema(buffer: Array[Byte]): String = { //buffer is your incoming binary Avro message
val url = "http://schema.registry.url:8181"
val schemaRegistry = new CachedSchemaRegistryClient(url, 10)
val bb = ByteBuffer.wrap(buffer)
bb.get() // consume MAGIC_BYTE
val schemaId = bb.getInt // consume schemaId //println(schemaId.toString)
//println(schemaId.toString)
schemaRegistry.getByID(schemaId) // consult the Schema Registry
}
我希望这有所帮助。