我有一个flink演示,以在其他数据集中找到一个数据集1的列。我写了Whit Flink SQL。代码似乎可以,但行不通。
我使用的版本是:
- flink.version:1.7.1
- java.version:1.8
- scala.binary.version:2.12
这是我的flink演示:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;
public class TestUnScoreItem {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<Tuple3<String, String, Integer>> leftList = new ArrayList<>();
leftList.add(new Tuple3<>("U1", "Item1", 4));
leftList.add(new Tuple3<>("U1", "Item3", 7));
leftList.add(new Tuple3<>("U1", "Item5", 2));
leftList.add(new Tuple3<>("U2", "Item2", 9));
leftList.add(new Tuple3<>("U2", "Item3", 3));
leftList.add(new Tuple3<>("U3", "Item1", 3));
List<Tuple1<String>> rightList = new ArrayList<>();
rightList.add(new Tuple1<>("Item1"));
rightList.add(new Tuple1<>("Item2"));
rightList.add(new Tuple1<>("Item3"));
rightList.add(new Tuple1<>("Item4"));
rightList.add(new Tuple1<>("Item5"));
DataSource<Tuple3<String, String, Integer>> userScoreSet = env.fromCollection(leftList);
DataSource<Tuple1<String>> allItemSet = env.fromCollection(rightList);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerDataSet("userScoreTable", userScoreSet, "user,item,score");
tableEnv.registerDataSet("allItemTable", allItemSet, "item2");
Table unScoreTable = tableEnv.sqlQuery("select user, item from userScoreTable where item not in (select item2 from allItemTable) ");
DataSet<Row> result = tableEnv.toDataSet(unScoreTable, Row.class);
result.print();
}
}
我得到了这个例外
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/ProcessFunction
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.flink.table.plan.nodes.dataset.DataSetAggregate.translateToPlan(DataSetAggregate.scala:107)
at org.apache.flink.table.plan.nodes.dataset.DataSetSingleRowJoin.translateToPlan(DataSetSingleRowJoin.scala:99)
at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:147)
at com.jychan.easycode.recommend.training.TestUnScoreItem.main(TestUnScoreItem.java:65)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.functions.ProcessFunction
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 21 more
Process finished with exit code 1
有些人知道如何适应它吗?还是还有其他方法可以得到相同的答案?谢谢!
添加依赖项
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.1</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
哦,我找到了什么问题
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
删除"提供的" IT工作。谢谢大家。