我使用 flink java api运行代码,从 kafka 获取一些字节,然后通过插入 cassandra来解析其跟随它数据库使用另一个库 static 方法(解析和插入结果均由库完成(。在IDE中的本地运行代码,我得到了所需的答案,但是在 yarn 群集上运行parse方法无法正常工作!
public class Test {
static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();
public static void main(String[] args) throws Exception {
CassandraConnection.connect();
Parser.setInsert(true);
stream.flatMap(new FlatMapFunction<byte[], Void>() {
@Override
public void flatMap(byte[] value, Collector<Void> out) throws Exception {
Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
// Parser.parse(ByteBuffer.wrap(value));
}
});
env.execute();
}
}
Parser
类中有一个静态哈希图字段,该字段的配置基于其信息,并且数据将在执行过程中插入。在纱线上运行的问题是该数据不适合taskmanagers
,它们只是打印config is not available!
所以我重新定义了哈希图作为 parse
方法的参数,但结果没有差异!
如何解决问题?
我更改了 static 方法和字段到非静态,并使用RichFlatMapFunction
解决了问题。
stream.flatMap(new RichFlatMapFunction<byte[], Void>() {
CassandraConnection con = new CassandraConnection();
int i = 0 ;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
con.connect();
}
@Override
public void flatMap(byte[] value, Collector<Void> out) throws Exception {
ByteBuffer tb = ByteBuffer.wrap(value);
np.parse(tb, ConfigHashMap, con);
}
});