为什么Cassandra Python驱动程序不返回所有行的COUNT() ?



我在Data Stax的cassandra数据库中插入了一个大约14k行的df。我使用的是Data Stax的免费版本,你可以获得25mb的存储限制。我的数据集大约有1.5 MB大小。我的代码显示插入和抓取后没有错误。但是在我计算抓取后的行数之后,我只得到大约1.5k行。我似乎想不出问题出在哪里。是插入代码还是抓取代码?我绞尽脑汁,在谷歌上搜索了很多次,还是想不出来。以下是我的代码-:

cassandraDBLoad.py

def progressbar(it, prefix="", size=60, out=sys.stdout): # Python3.3+
count = len(it)
def show(j):
x = int(size*j/count)
print("{}[{}{}] {}/{}".format(prefix, u"█"*x, "."*(size-x), j, count), 
end='r', file=out, flush=True)
show(0)
for i, item in enumerate(it):
yield item
show(i+1)
print("n", flush=True, file=out)
def cassandraDBLoad(config_path):
try:
config = read_params(config_path)
execution_profile = ExecutionProfile(request_timeout=10)
cassandra_config = {'secure_connect_bundle': config["connect_cassandra"]["cloud_config"]}
auth_provider = PlainTextAuthProvider(
config["connect_cassandra"]["client_id"],
config["connect_cassandra"]["client_secret"]
)
cluster = Cluster(cloud=cassandra_config, auth_provider=auth_provider)
session = cluster.connect()
session.default_timeout = None
connect_db = session.execute("select release_version from system.local")
set_keyspace = session.set_keyspace(config["cassandra_db"]["keyspace"])

table_ = config["cassandra_db"]["data_table"]
define_columns = config["cassandra_db"]["define_columns"]

create_table = f"CREATE TABLE IF NOT EXISTS {table_}({define_columns});"
start_create = time.process_time()
table_result = session.execute(create_table)

train = pd.read_csv(config["data_source"]["train_source"])
test = pd.read_csv(config["data_source"]["test_source"])

#Combine test and train into one file
train['source']='train'
test['source']='test'
df = pd.concat([train, test],ignore_index=True)
df = df.fillna('NA')
columns = list(df)
for col in columns:
df[col] = df[col].map(str)

columns = config["cassandra_db"]["columns"]
insert_qry = f"INSERT INTO {table_}({columns}) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) IF NOT EXISTS"
statement = session.prepare(insert_qry)

start_insert = time.process_time()
batch = BatchStatement()
for i in progressbar(range(len(df)), "Inserting: ", 40):
time.sleep(0.1)            
session.execute_async(
statement,
[
df.iat[i,0], df.iat[i,1], df.iat[i,2], df.iat[i,3], df.iat[i,4], df.iat[i,5], 
df.iat[i,6], df.iat[i,7], df.iat[i,8], df.iat[i,9], df.iat[i,10], df.iat[i,11], 
df.iat[i,12]
]
)
print("Time taken to insert df - " + str((time.process_time() - start_insert)/60) + " minutes")
except Exception as e:
raise Exception("(cassandraDBLoad): Something went wrong in the CassandraDB Load operationsn" + str(e))

上面的代码占用了30分钟插入行。我有12 GB RAM和2个CPU内核。

preprocess_data.py

def pandas_factory(colnames, rows):
return pd.DataFrame(rows, columns=colnames)
def preprocess_data(config_path):
try:
config = read_params(config_path)
cassandra_config = {'secure_connect_bundle': config["connect_cassandra"]["cloud_config"]}
auth_provider = PlainTextAuthProvider(
config["connect_cassandra"]["client_id"],
config["connect_cassandra"]["client_secret"]
)
cluster = Cluster(cloud=cassandra_config, auth_provider=auth_provider)
session = cluster.connect()
session.set_keyspace(config["cassandra_db"]["keyspace"])
session.row_factory = pandas_factory
#session.default_fetch_size = None
count_query = f"SELECT COUNT(*) from {config['cassandra_db']['data_table']} LIMIT 20000"
count_rslt = session.execute(count_query, timeout=None)
print(count_rslt._current_rows)
query = f"SELECT * from {config['cassandra_db']['data_table']}"
simple_statement = SimpleStatement(query, consistency_level=ConsistencyLevel.ONE, fetch_size=None)
execute_result = session.execute(simple_statement, timeout=None)
data = execute_result._current_rows

print(data.shape)

except Exception as e:
raise Exception("(preprocessData): " + str(e))

CSV文件链接- https://drive.google.com/drive/folders/1O03lNTMfSwhUKG61zOs7fNxXIRe44GRp?usp=sharing请帮助插入完整的数据框或根据问题所在获取所有行。

我的最佳猜测是,在您认为的正在计数的和表中的实际行之间存在不匹配。

我说这是一个"猜测"因为如果不知道表的CQL模式,就不可能知道自己在做什么。让我用一些例子来说明。

对于具有简单主键的表,每个分区只能有一行。例如:

CREATE TABLE users (
username text,
realname text,
email text,
PRIMARY KEY (username)
)

如果表中有500个用户,该查询也将返回500行:

SELECT COUNT(username) FROM users

对于具有复合主键的表(由分区键组成)+至少一个集群键),每个分区可以有一行或多行。例如:

CREATE TABLE user_emails (
username text,
email_type text,
email_address text
...
PRIMARY KEY (username, email_type)
)

一个用户(分区)可以有一个或多个(行)电子邮件——个人的,工作的,等等。如果user_emails表中有500个用户,该查询将简单地返回分区(用户)的数量,而不管每个分区有多少行:

SELECT COUNT(username) FROM user_emails

作为旁注,在Cassandra中计算行数与在传统关系数据库中计算记录数是不同的。我已经在这篇DBA Stack Exchange文章中详细解释了它——为什么COUNT()在Cassandra中不好?

正如我在你的其他帖子中提到的,如果你必须计数记录,然后使用DataStax Bulk Loader(DSBulk),这是为此目的而开发的。DSBulk有一个很好的特性,可以以分布式方式对大型表中的数据进行计数。它是开源的,所以Apache Cassandra用户可以免费使用它。干杯!