使用pyspark将数据从pyspark数据帧插入到另一个cassandra表中



我有一个cassandra表-测试

+----+---------+---------+
| id | country | counter |
+====+=========+=========+
|  A |      RU |       1 |
+----+---------+---------+
|  B |      EN |       2 |
+----+---------+---------+
|  C |      IQ |       1 |
+----+---------+---------+
|  D |      RU |       3 |
+----+---------+---------+

此外,我在同一空间中有一个表main,其中列为"country_main"one_answers"main_id"。在main_id列中,我有和测试表中相同的id,而且我有一些唯一的id。country_main的值为空,与测试中的值相同。例如:

+---------+--------------+---------+
| main_id | country_main |      ...|
+=========+==============+=========+
|  A      |              |      ...|
+---------+--------------+---------+
|  B      |      EN      |      ...|
+---------+--------------+---------+
|  Y      |      IQ      |      ...|
+---------+--------------+---------+
|  Z      |      RU      |      ...|
+---------+--------------+---------+

如何使用pyspark根据id在country_main中填充空值,将测试表中的数据插入main?

具有以下模式&数据:

create table test.ct1 (
id text primary key,
country text,
cnt int);
insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);

create table test.ct2 (
main_id text primary key,
country_main text,
cnt int);
insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);

应该是这样的:

from pyspark.sql.functions import *
ct1 = spark.read.format("org.apache.spark.sql.cassandra")
.option("table", "ct1").option("keyspace", "test").load()
ct2 = spark.read.format("org.apache.spark.sql.cassandra")
.option("table", "ct2").option("keyspace", "test").load()
.where(col("country_main").isNull())
res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"), 
col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")
.option("table", "ct2").option("keyspace", "test")
.mode("append").save()

代码的作用:

  1. ct2(对应于您的main表(中选择所有行,其中country_mainnull
  2. 执行与ct1(对应于您的test表(的联接,从中获取country的值(优化可以是从两个表中只选择必要的列(。此外,请注意,连接是由Spark完成的,而不是在Cassandra级别上-Cassandra级连接仅在即将发布的Spark Cassandra Connector版本(3.0,但已发布的alpha版本(中受支持
  3. 重命名列以匹配ct2表的结构
  4. 写回数据

结果:

cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C |   1 |           IQ
B |   2 |           EN
A |   1 |           RU
D |   3 |           RU

对于源数据:

cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------                                       
C |   1 |           IQ                                  
B |   2 |           EN                                                                                         
A |   1 |         null                                      
D |   3 |           RU

相关内容

  • 没有找到相关文章

最新更新