让 SQLite3 与多个线程一起工作



我正在用Python制作一个网络爬虫,它收集重定向/链接,将它们添加到数据库中,如果链接不存在,则将它们作为新行输入。我想使用多线程但遇到麻烦,因为我必须实时检查是否有具有给定 URL 的条目。

我最初使用的是sqlite3但意识到我不能在不同的线程上同时使用它。我真的不想使用 MySQL(或类似的东西(,因为它需要更多的磁盘空间并作为单独的服务器运行。有没有办法使sqlite3与多个线程一起工作?

Python sqlite3 模块的threadsafety级别为 1,这意味着尽管您无法在线程之间共享数据库连接,但多个线程可以同时使用该模块。 因此,您可以让每个线程创建自己的数据库连接。

这种方法的问题在于SQLite的写入并发性很差,因此让多个线程一次执行大量INSERT会给你可怕的"数据库被锁定"错误。 你可以通过使用PRAGMA JOURNAL_MODE = 'WAL'来改善事情,但这只能到此为止。

如果性能是一个问题,并且无法切换到客户端-服务器数据库,那么您可能必须做的是保留 URL 的内存中缓存,并安排程序,以便有一个线程将此缓存与 SQLite 数据库同步。

一种解决方案是获取一个锁以直接从程序访问数据库。这样,多个线程或进程将等待其他进程在执行请求之前插入链接。

这就是使 sqlite 在多个线程中工作的方法。

将 BlockingCollection 与 ThreadPool.QueueUserWorkItem 结合使用。任何数据库查询都按 FIFO(先进先出(顺序排队和执行。现在,从任何线程执行任何 SQL 事务时,数据库永远不会被锁定。这是 C# 中的一个示例。

public class DatabaseQueueBus
{
    private BlockingCollection<TransportBean> _dbQueueBus = new BlockingCollection<TransportBean>(new ConcurrentQueue<TransportBean>());
    private CancellationTokenSource __dbQueueBusCancelToken;
    public CancellationTokenSource _dbQueueBusCancelToken { get => __dbQueueBusCancelToken; set => __dbQueueBusCancelToken = value; }
    public DatabaseQueueBus()
    {
        _dbQueueBusCancelToken = new CancellationTokenSource();
        DatabaseQueue();
    }
    public void AddJob(TransportBean dto)
    {
        _dbQueueBus.Add(dto);
    }
    private void DatabaseQueue()
    {
        ThreadPool.QueueUserWorkItem((param) =>
        {
            try
            {
                do
                {
                    string job = "";
                    TransportBean dto = _dbQueueBus.Take(_dbQueueBusCancelToken.Token);
                    try
                    {
                        job = (string)dto.DictionaryTransBean["job"];
                        switch (job)
                        {
                            case "SaveClasse":
                                //Save to table here
                                break;
                            case "SaveRegistrant":
                                //Save Registrant here
                                break;
                          
                        }
                    }
                    catch (Exception ex)
                    {//TODO: Handle this exception or not
                    }
                } while (_dbQueueBusCancelToken.Token.IsCancellationRequested != true);
            }
            catch (OperationCanceledException)
            {
            }
            catch (Exception ex)
            {
            }
        });
    }
}

相关内容

  • 没有找到相关文章

最新更新