Apache Flink:左连接桌上函数不会返回预期的结果



flink版本:1.3.1

我创建了两个表,一个是来自内存,另一个来自UDTF。当我测试加入并离开加入时,他们返回了相同的结果。我期望的是加入比加入更多的行。

我的测试代码是:

public class ExerciseUDF {
        public static void main(String[] args) throws Exception {
            test_3();
        }
        public static void test_3() throws Exception {
                // 1. set up execution environment
                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
                DataSet<WC> input = env.fromElements(
                        new WC("Hello", 1),
                        new WC("Ciao", 1),
                        new WC("Hello", 1));
                // 2. register the DataSet as table "WordCount"
                tEnv.registerDataSet("WordCount", input, "word, frequency");
                Table table;
                DataSet<WC> result;
                        DataSet<WCUpper> resultUpper;
                table = tEnv.scan("WordCount");
                // 3. table left join user defined table
                System.out.println("table left join user defined table");
                tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
                table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, S.word as myupper FROM WordCount as S left join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word");
                resultUpper = tEnv.toDataSet(table, WCUpper.class);
                resultUpper.print(); // out put —— WCUpper Ciao 1 CIAO, however, without the row having Hello
                // 4. table join user defined table
                System.out.println("table join user defined table");
                tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
                table = tEnv.scan("WordCount");
                table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, T.myupper as myupper FROM WordCount as S join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word"
                );
                resultUpper = tEnv.toDataSet(table, WCUpper.class);
                resultUpper.print();
            }
            public static class WC {
                public String word;
                public long frequency;
                // public constructor to make it a Flink POJO
                public WC() {
                }
                public WC(String word, long frequency) {
                    this.word = word;
                    this.frequency = frequency;
                }
                @Override
                public String toString() {
                    return "WC " + word + " " + frequency;
                }
            }

            // user defined table function
            public static class MyTableFunc_2 extends TableFunction<Tuple2<String,String>>{
                public void eval(String str){ // hello --> hello HELLO
                    System.out.println("upper func executed for "+str);
                    if(str.equals("Hello")){
                        return;
                    }
                    collect(new Tuple2<String,String>(str,str.toUpperCase()));
                    // collect(new Tuple2<String,String>(str,str.toUpperCase()));
                }
            }
    }

左联接和联接查询的输出相同。在这两种情况下,仅返回一排。

wcupper ciao 1 ciao

但是,我认为左联接查询应保留" Hello"行。

是的,你是对的。

这是桌面函数外部连接的谓词的错误,需要修复。

谢谢,Fabian

相关内容

  • 没有找到相关文章

最新更新