我正在尝试从Cassandra表中提取数据以用作数据集,但是遇到了两个问题。
首先是 cassandraInputFormat 只返回一个元组,我宁愿没有元组 12,而只是使用 pojo 来定义它期望返回的内容。所以我不知道这是否只是我必须接受的事情,是否有一种方法可以使用pojo代替cassandraConnector(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html),或者使用cassandraInputFormat不是获取数据的最佳方式。
另一个问题是,即使我从cassandraInputFormat(无论是否元组)中提取数据,我也不知道将其设置为数据源的方法。对于文件,csv和HDFS,有很多方法(https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/ExecutionEnvironment.html#ExecutionEnvironment--),但没有一个是明确用于cassandra的。所以我的猜测是,我需要使用 cassandraInputFormat 提取数据,并使用 .fromElements() 或 .fromCollecton() 之类的东西,以及正确的方法是什么。
提前感谢您的任何帮助!
更新:
这"有效"(并感谢Chesnay Schepler的帮助):
DataSet<Tuple2<String, String>> testSet =
exEnv.createInput(cassandraInputFormat, TypeInformation.of(newTypeHint<Tuple2<String, String>>(){}));
但是这个错误现在正在发生...
Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Error translating node 'Data Source "at execute(CodeBatchProcessorImpl.java:85)
(org.apache.flink.batch.connectors.cassandra.CassandraInputFormat)" : NONE
[[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]':
Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: flink.streaming.code.CodeBatchProcessorImpl
再往下,它包括:
Caused by: java.io.NotSerializableException: org.apache.flink.api.java.LocalEnvironment
更新 2:
必须将环境设置为瞬态。现已修复!
您可以通过调用 ExecutionEnvironment#createInput(InputFormat) 来使用 CassandraInputFormat 和所有 InputFormat。
目前没有将元素直接读取为 POJO 的选项。最简单的解决方法是在接收器之后添加一个 MapFunction,将元组转换为所需的 POJO。