当在另一台机器上运行时,会出现一个普通代码java.lang.NullPointerException



我写了一段代码来使用Kafka中的流,然后将其汇到MySQL中。

代码在我的IDE上正常运行,可以按预期将数据插入到目标表中。

然而,当我将jar提交给Flink(部署在192.168.95.2网页上(时,它会在preparedStatement = connection.prepareStatement(sql);语句上抛出NullPointerException。这些部分的其他代码是:

private static PreparedStatement preparedStatement;
sql = "insert into kafka_ccu values(?,?)";

我很困惑。。。如果连接或访问有任何问题,为什么代码可以在我的IDE上正常运行?感谢您的帮助。

完全的例外是:

java.lang.NullPointerException
at DemoKafka2Mysql$2.invoke(DemoKafka2Mysql.java:92)
at DemoKafka2Mysql$2.invoke(DemoKafka2Mysql.java:72)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)

我的代码(流源不是Kinesis,但相同的问题…(:

public class DemoKinesis2Mysql {
private static  Connection connection;
private static PreparedStatement preparedStatement;
private static String sql = "";
private static String username;
private static String password;
private static String drivername;
private static String dburl;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
getConn(); 
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aaa");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "ddd");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "xxx");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
"target_stream",
new SimpleStringSchema(),
consumerConfig));
kinesis.print();
DataStream<JSONObject> mapStream = kinesis.map(new MapFunction<String, JSONObject>() {
public JSONObject map(String s) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
return jsonObject;
}
}).filter(new FilterFunction<JSONObject>() {
public boolean filter(JSONObject jsonObject) throws Exception {
if (jsonObject.containsKey("Body")) {
return true;
}
return false;
}
});

mapStream.addSink(new RichSinkFunction<JSONObject>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
}

public void invoke(JSONObject value, Context context) throws Exception  {
JSONArray arr = JSON.parseObject(value.getString("Body")).getJSONArray("metrics");
log.info("json array:" + arr);
for (int i=0;i<arr.size();i++) {
JSONObject o = arr.getJSONObject(i);
preparedStatement = connection.prepareStatement(sql);
String k = "";
if (o.containsKey("key")) {
k = o.getString("key");
}
String d = o.getJSONObject("properties").getString("datetime");
if (k.equals("ccu")) {
String t = o.getJSONObject("properties").getString("ccu");
preparedStatement.setObject(1, t);
preparedStatement.setObject(2, d);
preparedStatement.executeUpdate();
}
}
}
});
see.execute();
}
private static void getConn() throws SQLException {
username = "user";
password = "123456";
drivername = "com.mysql.jdbc.Driver";
dburl = "jdbc:mysql://192.168.95.2:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
sql = "insert into tmp_stream_kinesis_ccu (ccu,data_time) values(?,?)";
try {
Class.forName(drivername);
}catch (ClassNotFoundException e) {
e.printStackTrace();
}
connection = DriverManager.getConnection(dburl, username, password);
}

}

试试这个:

if (connection == null || connection.isClosed()) return;
PreparedStatement statement = connection.prepareStatement(sql);

如果您想在用户代码中使用副作用,请确保初始化用户函数中的所有内容。这是一个粗略的草图。

public class MyMap extends RichMapFunction {
public void open() {
connection = ...;
statement = connection.prepareStatement(sql);
}
public void close() {
statement.close();
connection.close();
}
...
}

相关内容

最新更新