我目前正在尝试将我的应用程序升级到Spark 3.0.1。对于表创建,我使用cassandra驱动程序(Python-cassandra连接器(拖放并创建一个表。然后,我使用spark-cassandra连接器将数据帧写入表中。没有一个很好的替代方案可以只使用spark-cassandra连接器来创建和删除表。
使用Spark 2.4时,drop-create写入流没有任何问题。但是使用Spark 3.0,应用程序似乎不按特定顺序执行这些操作,通常在删除和创建之前尝试进行编写。我不知道如何确保先删除并创建表。我知道即使应用程序在写入时出错,删除和创建也会发生,因为当我通过cqlsh查询Cassandra时,我可以看到表被删除并重新创建。对Spark 3.0中的这种行为有什么想法吗?
注意:由于模式发生了更改,因此需要删除并重新创建此特定表,而不是直接覆盖。
请求的代码片段:
session = self._get_python_cassandra_session(self.env_conf, self.database)
# build drop table query
drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database, tablename)
session.execute(drop_table_query)
df, table_columns, table_keys = self._create_table_metadata(df, keys=keys)
# build create query
create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}), );'.format(self.database, tablename, table_columns, table_keys)
# execute table creation
session.execute(create_table_query)
session.shutdown()
# spark-cassandra connection options
copts = _cassandra_cluster_spark_options(self.env_conf)
# set write mode
copts['confirm.truncate'] = overwrite
mode = 'overwrite' if overwrite else 'append'
# write dataframe to cassandra
get_dataframe_writer(df, 'cassandra', keyspace=self.database,
table=tablename, mode=mode, copts=copts).save()
在Spark Cassandra Connector 3.0+中,您可以使用新的功能-操作keyspace&表通过目录API。您可以创建/更改/删除密钥空间&表使用Spark SQL。例如,您可以使用以下命令在Cassandra中创建一个表:
CREATE TABLE casscatalog.ksname.table_name (
key_1 Int,
key_2 Int,
key_3 Int,
cc1 STRING,
cc2 String,
cc3 String,
value String)
USING cassandra
PARTITIONED BY (key_1, key_2, key_3)
TBLPROPERTIES (
clustering_key='cc1.asc, cc2.desc, cc3.asc',
compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}'
)
正如您在这里看到的,您可以指定相当复杂的主键,也可以指定表选项。casscatalog
片段是链接到特定Cassandra集群的前缀(您可以同时使用多个(-它是在启动Spark作业时指定的,例如:
spark-shell --packages com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
--conf spark.sql.catalog.casscatalog=com.datastax.spark.connector.datasource.CassandraCatalog
更多的例子可以在文档中找到:
我最终构建了一个time.sleep(5(延迟,超时100秒,以周期性地ping Cassandra查找表,然后在找到表时写入。