Apache Flink typed KafkaSource



我实现了一个到kafka流的连接,如下所述。现在我尝试使用Jdbc接收器将数据写入postgres数据库。

现在,带有Kafka的源代码似乎没有类型。因此,当为SQL编写语句时,它看起来都像Nothing类型。

我如何使用fromSource,我实际上有一个Kafka的类型化源?

到目前为止,我所尝试的是:

object Main {
def main(args: Array[String]) {
val builder = KafkaSource.builder
builder.setBootstrapServers("localhost:29092")
builder.setProperty("partition.discovery.interval.ms", "10000")
builder.setTopics("created")
builder.setBounded(OffsetsInitializer.latest)
builder.setStartingOffsets(OffsetsInitializer.earliest)
builder.setDeserializer(KafkaRecordDeserializationSchema.of(new CreatedEventSchema))
val source = builder.build()

val env = StreamExecutionEnvironment.getExecutionEnvironment
val streamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks, "Kafka Source")
streamSource.addSink(JdbcSink.sink(
"INSERT INTO conversations (timestamp, active_conversations, total_conversations) VALUES (?,?,?)",
(statement, event) => {
statement.setTime(1, event.date)
statement.setInt(1, event.a)
statement.setInt(3, event.b)
},JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/reporting")
.withDriverName("org.postgresql.Driver")
.withUsername("postgres")
.withPassword("veryverysecret:-)")
.build()
))
env.execute()
}
}

不能编译,因为event的类型是Nothing。但我认为它一定不是,因为与CreatedEventSchema Flink应该能够反序列化。也许还需要注意的是,实际上我只想处理kafka消息的值。

在Java中可以这样做:

KafkaSource<Event> source =
KafkaSource.<Event>builder()
.setBootstrapServers("localhost:9092")
.setTopics(TOPIC)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new EventDeserializationSchema())
.build();

的值反序列化器如下:

public class EventDeserializationSchema extends AbstractDeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) {
objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
}
@Override
public Event deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, Event.class);
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}

对不起,我手边没有Scala的例子,但希望这能给你指明正确的方向。

最新更新