将数据从 Spark 插入到 cassandra :如何验证一切正常



我正在尝试使用pyspark将数据从csv文件插入Cassandra。

这是代码:

我阅读了数据:

df =spark.read.format("csv") 
.option("header","true") 
.option("inferSchema","true") 
.option("nullValue","NA") 
.option("timestampFormat","ddMMMyyyy:HH:mm:ss") 
.option("quote", """) 
.option("delimiter", ";") 
.option("mode","failfast") 
.load("gs://tidy-centaur-b1/data/PRESCRIPTIONS_ANO.csv")

编辑:我把整个代码放在显示唯一键

dfi = df.withColumn("id", F.monotonically_increasing_id()) 
.withColumnRenamed("CHAIPRAT", "chaiprat") 
.withColumnRenamed("PRE_PRE_DTD", "pre_pre_dtd") 
.withColumnRenamed("NbMol", "nbmol") 
.withColumnRenamed("NumAno", "numano")

dfi.createOrReplaceTempView("prescription")

我计算行数并将数据保存到 cassandra 中

dfi.count()
> 4169826
dfi.write.format("org.apache.spark.sql.cassandra") 
.mode("overwrite") 
.option("confirm.truncate","true") 
.option("spark.cassandra.connection.host","10.142.0.4") 
.option("spark.cassandra.connection.port","9042") 
.option("keyspace","uasb03") 
.option("table","prescription") 
.save()

现在我从 cassandra 读取数据并计算行数。

presc = sql.read 
.format("org.apache.spark.sql.cassandra") 
.option("spark.cassandra.connection.host","10.142.0.4") 
.option("spark.cassandra.connection.port","9042") 
.load(table="prescription", keyspace="uasb03")
presc.count()
> 2148762

只有第一次计数的一半。

我在日志文件中找不到任何显示出错的内容。 有人有线索吗?

编辑:我试图更改cassandra.yaml中的所有超时值,但presc.count保持不变

编辑这里是卡桑德拉表的描述

cqlsh:uasb03> desc prescription;
CREATE TABLE uasb03.prescription (
id int PRIMARY KEY,
chaiprat int,
nbmol int,
numano int,
pre_pre_dtd timestamp
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';

为了执行验证,我还在csv文件中编写了输出,我得到了

chaiprat;pre_pre_dtd;nbmol;numano;id
29100476;03Feb2017:00:00:00;5;378369;8589934592
29100476;24Feb2017:00:00:00;1;378369;8589934593
29100476;27Feb2017:00:00:00;2;378369;8589934594

ID 大于整数。

最可能的原因是您的数据没有可能成为分区键的真正唯一的行标识符,因此在存储数据时,某些值会被覆盖。可以通过在保存数据之前使用正确的分区键和聚类列显式创建表来解决此问题。这可以通过调用数据框来完成createCassandraTable

例如:
createCassandraTable(
"uasb03", "prescription", 
partitionKeyColumns = Some(Seq("columnA")), 
clusteringKeyColumns = Some(Seq("columnB")))

相关内容

  • 没有找到相关文章

最新更新