在本地模式和纱线群集中运行flink时,结果不同



我使用 flink java api运行代码,从 kafka 获取一些字节,然后通过插入 cassandra来解析其跟随它数据库使用另一个库 static 方法(解析和插入结果均由库完成(。在IDE中的本地运行代码,我得到了所需的答案,但是在 yarn 群集上运行parse方法无法正常工作!

public class Test {
    static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();
    public static void main(String[] args) throws Exception {
        CassandraConnection.connect();
        Parser.setInsert(true);
        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}

Parser类中有一个静态哈希图字段,该字段的配置基于其信息,并且数据将在执行过程中插入。在纱线上运行的问题是该数据不适合taskmanagers,它们只是打印config is not available!

所以我重新定义了哈希图作为 parse方法的参数,但结果没有差异!

如何解决问题?

我更改了 static 方法和字段到非静态,并使用RichFlatMapFunction解决了问题。

stream.flatMap(new RichFlatMapFunction<byte[], Void>() {
            CassandraConnection con = new CassandraConnection();
            int i = 0 ;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                con.connect();
            }
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                ByteBuffer tb = ByteBuffer.wrap(value);
                np.parse(tb, ConfigHashMap, con);
            }
        });

相关内容

  • 没有找到相关文章

最新更新