Cassandra在阻塞同步请求的多个进程中的非同步执行



我有一个应用程序,它读取一系列XML文件,其中包含道路上车辆通行的日志。然后,应用程序处理每条记录,转换一些信息以匹配数据库列,并将其插入cassandra数据库(在远程服务器中运行单个节点[它在内部网络中,因此连接实际上不是问题](。在数据库中插入数据后,每个文件的进程将继续读取这些数据,并为汇总表生成信息,从而为在应用程序的不相关部分进行深入分析做好准备。

我使用多处理并行处理许多XML文件,遇到的问题是与cassandra服务器通信。从示意图上看,流程如下:

  1. 从XML文件读取记录
  2. 处理记录的数据
  3. 将处理后的数据插入数据库(使用.execute_async(query)(
  4. 重复1到3,直到XMl文件结束
  5. 等待我进行的所有插入查询的响应
  6. 从数据库中读取数据
  7. 处理读取的数据
  8. 在汇总表中插入已处理的数据

现在,这在多个并行进程中平稳运行,直到一个进程进入步骤6时,它的请求(使用.execute(query)发出,意味着我将等待响应(总是面临超时。我收到的错误是:

 Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
  File "C:UsersLucasMinicondalibmultiprocessingprocess.py", line 258, in _bootstrap
    self.run()
  File "C:UsersLucasPycharmProjectsnovo_importadorappcoreImportacaoArquivosPNCT.py", line 231, in run
    core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
  File "C:UsersLucasPycharmProjectsnovo_importadorappcoreCalculoIndicadoresPNCT.py", line 336, in processa_equipamento
    desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
  File "C:UsersLucasPycharmProjectsnovo_importadorappcoreCalculoIndicadoresPNCT.py", line 206, in desvio_medias
    veiculos = sessao_cassandra.execute(sql_pronto)
  File "C:UsersLucasMinicondalibsite-packagescassandracluster.py", line 1594, in execute
    result = future.result(timeout)
  File "C:UsersLucasMinicondalibsite-packagescassandracluster.py", line 3296, in result
    raise self._final_exception
ReadTimeout: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}

我已经将服务器中的超时更改为荒谬的时间量(例如500000000毫秒(,并且我还尝试在客户端中设置超时限制,使用.execute(query, timeout=3000),但仍然没有成功。

现在,当更多的进程遇到相同的问题,并且在多个进程中从步骤1-3开始的激烈写入停止时,进入步骤6的最后一个进程成功地遵循了该过程,这让我觉得问题是,cassandra优先处理我每秒请求的数万个插入请求,要么忽略我的读取请求,要么把它放回行中。

在我看来,解决这个问题的一种方法是,如果我能以任何方式要求卡桑德拉优先处理我的读取请求,这样我就可以继续处理,即使这意味着要放慢其他进程的速度。

现在,顺便说一句,你可能会认为我的流程建模不是最佳的,我很想听听对此的意见,但就这个应用程序的现实而言,在我们看来,这是最好的方法。因此,我们实际上已经广泛考虑过优化流程,但(如果cassandra服务器能够处理(这对我们的现实来说是最优的。

所以,TL;DR:在执行数以万计的非同步查询时,有没有一种方法可以优先考虑查询?如果没有,是否有一种方法可以每秒执行数以万计的插入查询和读取查询,使请求不会超时?另外,你建议我怎么解决这个问题?并行运行较少的进程显然是一个解决方案,但我正在努力避免。所以,我很想听听大家的想法。

在插入时存储数据,这样我就不需要再次阅读它来进行摘要了,这是不可能的,因为XML文件很大,内存也是一个问题。

我不知道有什么方法可以优先考虑读取查询。我相信Cassandra内部有用于读写操作的独立线程池,所以这些线程池是并行运行的。如果没有看到您正在进行的模式和查询,很难说您是在进行非常昂贵的读取操作,还是系统被写入淹没,无法跟上读取速度。

您可能需要在应用程序运行时尝试监视Cassandra中发生的情况。有几个工具可以用来监视正在发生的事情。例如,如果您ssh到Cassandra节点并运行:

watch -n 1 nodetool tpstats

这将显示线程池统计信息(每秒更新一次(。您将能够看到队列是否已满或操作是否被阻止。如果任何"丢弃的"计数器增加,这表明你没有足够的容量来完成你要做的事情。如果是这样,那么通过添加更多的节点来增加容量,或者改变你的模式和方法,使节点有更少的工作要做。

其他有用的监控(在linux上使用watch-n1来连续监控(:

nodetool compactionstats
nodetool netstats
nodetool cfstats <keyspace.table name>
nodetool cfhistograms <keyspace> <table name>

使用top和iostat等linux命令监视节点以检查CPU利用率和磁盘利用率也很好。

我从你所说的话中得到的印象是,你的单个节点没有足够的容量来完成你给它的所有工作,所以你要么需要在单位时间内处理更少的数据,要么添加更多的Cassandra节点来分散工作负载。

由于分区的行太多,我目前正面临自己的超时错误,因此我可能不得不向分区键添加基数,以使每个分区的内容更小。

最新更新