我正在使用Spark 2.1和Cassandra(3.9)作为数据源。C* 有一个包含 50 列的大表,这对于我的用例来说不是一个很好的数据模型。因此,我为每个传感器创建了拆分表以及分区键和群集键列。
All sensor table
-----------------------------------------------------
| Device | Time | Sensor1 | Sensor2 | Sensor3 |
| dev1 | 1507436000 | 50.3 | 1 | 1 |
| dev2 | 1507436100 | 90.2 | 0 | 1 |
| dev1 | 1507436100 | 28.1 | 1 | 1 |
-----------------------------------------------------
Sensor1 table
-------------------------------
| Device | Time | value |
| dev1 | 1507436000 | 50.3 |
| dev2 | 1507436100 | 90.2 |
| dev1 | 1507436100 | 28.1 |
-------------------------------
现在我正在使用 Spark 将数据从旧表复制到新表。
df = spark.read
.format("org.apache.spark.sql.cassandra")
.options(table="allsensortables", keyspace="dataks")
.load().cache()
df.createOrReplaceTempView("data")
query = ('''select device,time,sensor1 as value from data ''' )
vgDF = spark.sql(query)
vgDF.write
.format("org.apache.spark.sql.cassandra")
.mode('append')
.options(table="sensor1", keyspace="dataks")
.save()
对于单个表,逐个复制数据需要花费大量时间 (2.1) 小时。 有什么方法可以为每个传感器select *
和创建多个DF并一次保存吗?(甚至按顺序)。
代码中的一个问题是缓存
df = spark.read
.format("org.apache.spark.sql.cassandra")
.options(table="allsensortables", keyspace="dataks")
.load().cache()
在这里,除了保存之外,我没有看到 df 是如何多次使用的。所以这里的缓存适得其反。您正在读取数据,对其进行过滤并将其保存到单独的 cassandra 表中。现在,数据帧上发生的唯一操作是保存,没有其他操作。
因此,在此处缓存数据没有任何好处。删除缓存会给你一些速度。
按顺序创建多个表。我建议使用 partitionBy 并首先将数据作为分区数据 w.r.t 传感器写入 HDFS,然后将其写回 cassandra。