Spark 3.0和Cassandra Spark/Python Conenctors:在编写之前没有创建表



我目前正在尝试将我的应用程序升级到Spark 3.0.1。对于表创建,我使用cassandra驱动程序(Python-cassandra连接器(拖放并创建一个表。然后,我使用spark-cassandra连接器将数据帧写入表中。没有一个很好的替代方案可以只使用spark-cassandra连接器来创建和删除表。

使用Spark 2.4时,drop-create写入流没有任何问题。但是使用Spark 3.0,应用程序似乎不按特定顺序执行这些操作,通常在删除和创建之前尝试进行编写。我不知道如何确保先删除并创建表。我知道即使应用程序在写入时出错,删除和创建也会发生,因为当我通过cqlsh查询Cassandra时,我可以看到表被删除并重新创建。对Spark 3.0中的这种行为有什么想法吗?

注意:由于模式发生了更改,因此需要删除并重新创建此特定表,而不是直接覆盖。

请求的代码片段:

session = self._get_python_cassandra_session(self.env_conf, self.database)
# build drop table query
drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database, tablename)
session.execute(drop_table_query)
df, table_columns, table_keys = self._create_table_metadata(df, keys=keys)
# build create query
create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}), );'.format(self.database, tablename, table_columns, table_keys)
# execute table creation
session.execute(create_table_query)
session.shutdown()

# spark-cassandra connection options
copts = _cassandra_cluster_spark_options(self.env_conf)
# set write mode
copts['confirm.truncate'] = overwrite
mode = 'overwrite' if overwrite else 'append'
# write dataframe to cassandra
get_dataframe_writer(df, 'cassandra', keyspace=self.database, 
table=tablename, mode=mode, copts=copts).save()

在Spark Cassandra Connector 3.0+中,您可以使用新的功能-操作keyspace&表通过目录API。您可以创建/更改/删除密钥空间&表使用Spark SQL。例如,您可以使用以下命令在Cassandra中创建一个表:

CREATE TABLE casscatalog.ksname.table_name (
key_1 Int, 
key_2 Int, 
key_3 Int, 
cc1 STRING, 
cc2 String, 
cc3 String, 
value String) 
USING cassandra
PARTITIONED BY (key_1, key_2, key_3)
TBLPROPERTIES (
clustering_key='cc1.asc, cc2.desc, cc3.asc',
compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}'
)

正如您在这里看到的,您可以指定相当复杂的主键,也可以指定表选项。casscatalog片段是链接到特定Cassandra集群的前缀(您可以同时使用多个(-它是在启动Spark作业时指定的,例如:

spark-shell --packages com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 
--conf spark.sql.catalog.casscatalog=com.datastax.spark.connector.datasource.CassandraCatalog

更多的例子可以在文档中找到:

我最终构建了一个time.sleep(5(延迟,超时100秒,以周期性地ping Cassandra查找表,然后在找到表时写入。

最新更新