我试图在pyspark中获取cassandra表的分区键的不同值。然而,pyspark似乎不理解我,并完全迭代所有数据(这是一个很大的数据),而不是查询索引。
这是我使用的代码,对我来说很简单:
from pyspark.sql import SparkSession
spark = SparkSession
.builder
.appName("Spark! This town not big enough for the two of us.")
.getOrCreate()
ct = spark.read
.format("org.apache.spark.sql.cassandra")
.options(table="avt_sensor_data", keyspace="ipe_smart_meter")
.load()
all_sensors = ct.select("machine_name", "sensor_name")
.distinct()
.collect()
列"machine_name"one_answers"sensor_name"一起构成分区键(有关完整的架构,请参阅下文)。在我看来,这应该非常快,事实上,如果我在cql中执行这个查询,只需要几秒钟:
select distinct machine_name,sensor_name from ipe_smart_meter.avt_sensor_data;
然而,火花作业需要大约10个小时才能完成。从spark告诉我的计划来看,它似乎真的想迭代所有数据:
== Physical Plan ==
*HashAggregate(keys=[machine_name#0, sensor_name#1], functions=[], output=[machine_name#0, sensor_name#1])
+- Exchange hashpartitioning(machine_name#0, sensor_name#1, 200)
+- *HashAggregate(keys=[machine_name#0, sensor_name#1], functions=[], output=[machine_name#0, sensor_name#1])
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@2ee2f21d [machine_name#0,sensor_name#1] ReadSchema: struct<machine_name:string,sensor_name:string>
我不是专家,但对我来说,这不像是"使用卡桑德拉指数">
我做错了什么?有没有什么方法可以告诉spark委托从卡桑德拉那里获得独特价值观的任务?如有任何帮助,我们将不胜感激!
如果这有帮助的话,下面是底层cassandra表的模式描述:
CREATE KEYSPACE ipe_smart_meter WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'} AND durable_writes = true;
CREATE TABLE ipe_smart_meter.avt_sensor_data (
machine_name text,
sensor_name text,
ts timestamp,
id bigint,
value double,
PRIMARY KEY ((machine_name, sensor_name), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = '[PRODUCTION] Table for raw data from AVT smart meters.'
AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
自动cassandra服务器端下推谓词似乎只有在选择、筛选或排序时才能工作。
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
所以,在distinct()
的情况下,spark得到所有行,然后,得到distinct()
。
解决方案1
你说你的cqlselect distinct...
已经超快了。我想分区键的数量相对较少(machine_name和sensor_name的组合),但有很多"t"。
所以,最简单的解决方案就是使用cql(例如,cassandra驱动程序)。
解决方案2
由于cassandra是一个查询优先的数据库,只需再创建一个表,该表只有您的独特查询所需的分区键。
CREATE TABLE ipe_smart_meter.avt_sensor_name_machine_name (
machine_name text,
sensor_name text,
PRIMARY KEY ((machine_name, sensor_name))
);
然后,每次在原始表中插入一行时,都要在新表中插入machine_name和sensor_name。由于它只有分区键,因此对于查询来说,这是一个自然的不同表。获取所有行。也许超级快。无需区分流程。
解决方案3
我认为解决方案2是最好的。但是,如果您不想对一条记录进行两次插入,还有一个解决方案是更改表并创建一个物化视图表。
CREATE TABLE ipe_smart_meter.ipe_smart_meter.avt_sensor_data (
machine_name text,
sensor_name text,
ts timestamp,
id bigint,
value double,
dist_hint_num smallint,
PRIMARY KEY ((machine_name, sensor_name), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
;
CREATE MATERIALIZED VIEW IF NOT EXISTS ipe_smart_meter.avt_sensor_data_mv AS
SELECT
machine_name
,sensor_name
,ts
,dist_hint_num
FROM ipe_smart_meter.avt_sensor_data
WHERE
machine_name IS NOT NULL
AND sensor_name IS NOT NULL
AND ts IS NOT NULL
AND dist_hint_num IS NOT NULL
PRIMARY KEY ((dist_hint_num), machine_name, sensor_name, ts)
WITH
AND CLUSTERING ORDER BY (machine_name ASC, sensor_name DESC, ts DESC)
;
dist_hint_num
列用于限制查询要迭代和分发记录的分区总数。
例如,从0到15。随机整数random.randint(0, 15)
或基于散列的整数hash_func(machine_name + sensor_name) % 16
是可以的。然后,当您按照以下方式进行查询时。cassandra只从16个分区获取所有记录,这可能比您当前的情况更有效率。
但是,无论如何,必须读取所有记录,然后读取distinct()
(发生混洗)。空间效率不高。我认为这不是一个好的解决方案。
functools.reduce(
lambda df, dist_hint_num: df.union(
other=spark_session.read.format(
'org.apache.spark.sql.cassandra',
).options(
keyspace='ipe_smart_meter',
table='avt_sensor_data_mv',
).load().filter(
col('dist_hint_num') == expr(
f'CAST({dist_hint_num} AS SMALLINT)'
)
).select(
col('machine_name'),
col('sensor_name'),
),
),
range(0, 16),
spark_session.createDataFrame(
data=(),
schema=StructType(
fields=(
StructField(
name='machine_name',
dataType=StringType(),
nullable=False,
),
StructField(
name='sensor_name',
dataType=StringType(),
nullable=False,
),
),
),
),
).distinct().persist().alias(
'df_all_machine_sensor',
)