投掷消耗卡夫卡,下沉到点击屋
dataStreamSource.addSink(sinkInstance);
错误
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: org.apache.http.protocol.HttpRequestExecutor@6ea2bc93 is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
at org.data.dataflow.KafkaToFlink.main(KafkaToFlink.java:36)
Caused by: java.io.NotSerializableException: org.apache.http.protocol.HttpRequestExecutor
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
... 11 more
public class KafkaToFlink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment
.getExecutionEnvironment().setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.227.89.202:9092");
properties.setProperty("group.id", "test-consumer-group");
SingleOutputStreamOperator<String> dataStreamSource = streamExecutionEnvironment
.addSource(new FlinkKafkaConsumer<>(
"my-topic",
new CustomKafkaDeserializationSchema(),
properties).setStartFromEarliest()
).map((MapFunction<String, String>) s -> {
Thread.sleep(ThreadLocalRandom.current().nextInt(0, 500));
return s;
});
SinkFunction<String> sinkInstance = new FlinkToCK("xxx", 8123 + "", "default", "");
dataStreamSource.addSink(sinkInstance);
streamExecutionEnvironment.execute("flink consume kafka topic");
}
}
class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<String> {
@Override
public boolean isEndOfStream(String s) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
return consumerRecord.toString();
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
class FlinkToCK extends RichSinkFunction<String> {
FlinkToCK(String host, String port, String user, String pwd) throws SQLException {
super();
this.connection_ = DriverManager.getConnection("jdbc:clickhouse://" + host + ":" + port, user, pwd);
this.statement_ = connection_.createStatement();
statement_.execute("CREATE DATABASE IF NOT EXISTS test");
}
@Override
public void invoke(String value, SinkFunction.Context context) throws SQLException {
statement_.execute("INSERT INTO test VALUES ('${value}')");
}
public void close() throws SQLException {
connection_.close();
}
private final Connection connection_;
private final Statement statement_;
}
这是因为Connection
不可序列化,因为它包含对HttpRequestExecutor
的引用,您可以尝试做的是使Connection和语句transient
。然后,与其在构造函数中分配它们,不如重写open()
方法,然后打开DB连接并创建一个新的Statement
,但这意味着在这种情况下它们不能是final
。