我一直在更新一个Flink处理器(Flink 1.9版(,它从Kafka中读取,然后写入Kafka。我们已经编写了这个处理器来运行Kafka 0.10.2集群,现在我们部署了一个运行2.2版本的新Kafka集群。因此,我开始更新处理器,以使用最新的FlinkKafkaConsumer和FlinkKavkaProducer(正如Flink文档所建议的那样(。然而,我遇到了一些问题与卡夫卡的制片人。我无法使用不推荐使用的构造函数使其序列化数据(这并不奇怪(,也无法在网上找到任何关于如何实现序列化程序的实现或示例(所有示例都使用旧的Kafka连接器(
当前的实现(对于Kafka 0.10.2(如下
FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
(FlinkKafkaPartitioner) null
);
当尝试实现以下FlinkKafkaProducer 时
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
null
);
我得到以下错误:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)
我一直不知道为什么。FlinkKafkaProducer的构造函数也被弃用,当我尝试实现非弃用的构造函数时,我不知道如何序列化数据。以下是它的外观:
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
return null;
}
},
producerProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
但我不知道如何实现KafkaSerializationSchema,我在网上或Flink文档中都找不到这样的例子。
有没有人有实现这一点的经验,或者关于FlinkProducer为什么在步骤中得到NullPointerException的提示?
如果您只是将字符串发送到Kafka:
public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{
private String topic;
public ProducerStringSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
}
}
用于发送Java对象:
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{
private String topic;
private ObjectMapper mapper;
public ObjSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
byte[] b = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
b= mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
// TODO
}
return new ProducerRecord<byte[], byte[]>(topic, b);
}
}
在您的代码中
.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic),
params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
要处理FlinkKafkaProducer.Semantic.EXACTLY_ONCE情况下的超时,您应该阅读https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-011及更新版本,尤其是此部分:
语义.EXACTLY_ONCE模式依赖于提交在执行检查点之前、从所述检查点恢复之后启动的事务的能力。如果Flink应用程序崩溃到完成重启之间的时间大于Kafka的事务超时,则会出现数据丢失(Kafka会自动中止超过超时时间的事务(。考虑到这一点,请根据您的预期停机时间适当配置您的事务超时。
默认情况下,Kafka代理将transaction.max.timeout.ms设置为15分钟。此属性不允许为生产者设置大于其值的事务超时。FlinkKafkaProducer011默认情况下会将生产者配置中的transaction.timeout.ms属性设置为1小时,因此在使用Semantic.EXACTLY_ONCE模式之前,transaction.max.timeout.ms应该增加。