Confluent JDBC连接器和Flink消费者



我们正在尝试使用kafkaavroserializer使用SQL-Server JDBC连接器,并在将其发送到Kafka之前,还将自定义的ProducterInterceptor提供加密数据。

在消费者方面,我们希望使用Flink Connector进行解密,然后使用适当的避难所。

我们有几个问题以实现这一目标:

1(如果我们为解密数据提供自定义的ConsumerInterceptor,那么当我们在Flink中创建DataStream时,应该通过属性文件传递?

Properties properties = new Properties();
        ...
    `properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
    ...
    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));

在配置上正确或我需要设置任何其他属性,以便我可以将消费者互动器传递给flink?

2(另一个问题是关于Flink中的Deserializer。我在网上查找了它,发现很少的代码片段如下:

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> { 
    private final String schemaRegistryUrl; 
    private final int identityMapCapacity; 
    private KafkaAvroDecoder kafkaAvroDecoder; 
    public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) { 
        this(schemaRegistyUrl, 1000); 
    }

因此,如果我们使用JDBC连接器将数据传递给KAFKA而没有任何修改(除了加密数据之外(,那么我们在测试时应该提供什么数据类型?我们将在避免之前解密数据。

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> { 

预先感谢

只是添加最终结果,以便可以帮助任何正在寻找相同的人:

public class ConfluentAvroDeserializationSchema
            implements DeserializationSchema<GenericRecord> {
        private final String schemaRegistryUrl;
        private final int identityMapCapacity;
        private transient KafkaAvroDecoder kafkaAvroDecoder;

        public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
            this(schemaRegistyUrl, 1000);
        }
        public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
                identityMapCapacity) {
            this.schemaRegistryUrl = schemaRegistryUrl;
            this.identityMapCapacity = identityMapCapacity;
        }
        @Override
        public GenericRecord deserialize(byte[] bytes) throws IOException {
            if (kafkaAvroDecoder == null) {
                SchemaRegistryClient schemaRegistry = new
                        CachedSchemaRegistryClient(this.schemaRegistryUrl,
                        this.identityMapCapacity);
                this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
            }
            return (GenericRecord) this.kafkaAvroDecoder.fromBytes(bytes);
        }
        @Override
        public boolean isEndOfStream(GenericRecord string) {
            return false;
        }
        @Override
        public TypeInformation<GenericRecord> getProducedType() {
            return TypeExtractor.getForClass(GenericRecord.class);
        }
    }

最新更新