我有一个像下面的卡桑德拉桌子,想使用某些条件从卡桑德拉获取记录并将其放在蜂巢桌中。
Cassandra Table(员工(条目:
Id Name Amount Time
1 abc 1000 2017041801
2 def 1000 2017041802
3 ghi 1000 2017041803
4 jkl 1000 2017041804
5 mno 1000 2017041805
6 pqr 1000 2017041806
7 stu 1000 2017041807
假设此表列是数据类型字符串的。我们在Hive也有相同的模式。
现在,我想将2017041801至2017041804之间的Cassandra记录进口到Hive或HDFS。在第二次运行中,我将根据上一条运行来取增量记录。
我能够使用以下语法将Cassandra数据加载到RDD中。
val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("mydb", "Employee")
现在我的问题是如何根据条件之间的条件过滤此记录,并在Hive或Hive外部表路径中持续过滤后的记录。
不幸的是,我的时间列没有在Cassandra表中群集键。所以我无法使用.where((条款。
我是Scala和Spark的新手。因此,请在此过滤器逻辑或使用数据框架实现此逻辑的任何其他更好的方法中提供帮助,请告诉我。
预先感谢。
- 我建议使用Connector DataFrame API从C* https://github.com/datastax/spark-cassandra-connector/blob/blob/master/master/doc/14_data_frames.md加载。
- 使用df.filter((呼叫predicates
- Safeastable((方法将数据存储在Hive中。
这是您的情况的Spark 2.0示例
val df = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "Employee", "keyspace" -> "mydb" ))
.load()
df.filter("time between 2017041801 and 2017041804")
.write.mode("overwrite").saveAsTable("hivedb.employee");