如何使用线程在两个字典之间进行比较



我目前正在进行一项比较,我试图解决如何在两个字典之间进行比较,其中第一个请求执行GET并将数据刮到字典中,然后我想使用相同的方法与下一个请求进行比较,并查看网页上是否有任何更改。我目前已经完成:

import random
import threading
import time
from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
import requests
from bs4 import BeautifulSoup
URLS = [
'https://github.com/search?q=hello+world',
'https://github.com/search?q=python+3',
'https://github.com/search?q=world',
'https://github.com/search?q=i+love+python',
'https://github.com/search?q=sport+today',
'https://github.com/search?q=how+to+code',
'https://github.com/search?q=banana',
'https://github.com/search?q=android+vs+iphone',
'https://github.com/search?q=please+help+me',
'https://github.com/search?q=batman',
]

def doRequest(url):
response = requests.get(url)
time.sleep(random.randint(10, 30))
return response, url

def doScrape(response):
soup = BeautifulSoup(response.text, 'html.parser')
return {
'title': soup.find("input", {"name": "q"})['value'],
'repo_count': soup.find("span", {"data-search-type": "Repositories"}).text.strip()
}

def checkDifference(parsed, url):

def threadPoolLoop():
with ThreadPoolExecutor(max_workers=1) as executor:
future_tasks = [
executor.submit(
doRequest,
url
) for url in URLS]
for future in as_completed(future_tasks):
response, url = future.result()
if response.status_code == 200:
checkDifference(doScrape(response), url)

while True:
t = threading.Thread(target=threadPoolLoop, )
t.start()
print('Joining thread and waiting for it to finish...')
t.join()

我的问题是,每当title或/和repo_count发生更改时,我不知道如何打印出来(重点是我将全天候运行此脚本,并且每当发生更改时,我总是希望它打印出来)

如果您正在寻找一种比较两个字典的简单方法,那么有几个不同的选项。

一些好的资源开始:

  • mCoding:压缩Python dicts
  • StackOverflow:比较两个字典并检查有多少(键、值)对相等

让我们从两个字典开始比较一些添加的元素,一些删除的,一些更改的,一些相同的。

dict1 = {
"value_2": 2,
"value_3": 3,
"value_4": 4,
"value_5": "five",
"value_6": "six",
}
dict2 = {
"value_1": 1, 
"value_2": 2, 
"value_4": 4
}

您可能会使用unittest库。像这样:

>>> from unittest import TestCase
>>> TestCase().assertDictEqual(dict1, dict1)  # <-- No output, because they are the same
>>> TestCase().assertDictEqual(dict1, dict2)  # <-- Will raise error and display elements which are different
AssertionError: {'value_2': 2, 'value_3': 3, 'value_4': 4, 'value_5': 'five', 'value_6': 'six'} != {'value_1': 1, 'value_2': 3, 'value_4': 4}
- {'value_2': 2, 'value_3': 3, 'value_4': 4, 'value_5': 'five', 'value_6': 'six'}
+ {'value_1': 1, 'value_2': 3, 'value_4': 4}

但面临的挑战是,当它们不同时,会引发错误;这可能不是你想要的。你只是想看看他们什么时候不同。

另一种方法是deepdiff库。像这样:

>>> from deepdiff import DeepDiff
>>> from pprint import pprint
>>> pprint(DeepDiff(dict1, dict2))
{'dictionary_item_added': [root['value_1']],
'dictionary_item_removed': [root['value_3'], root['value_5'], root['value_6']],
'values_changed': {"root['value_2']": {'new_value': 3, 'old_value': 2}}}

或者,你可以很容易地制作自己的功能。像这样(从这里复制的功能)

>>> from pprint import pprint
>>> def compare_dict(d1, d2):
...    return {k: d1[k] for k in d1 if k in d2 and d1[k] == d2[k]}
>>> pprint(compare_dict(dict1, dict2))
{'value_4': 4}
>>> def dict_compare(d1, d2):
...     d1_keys = set(d1.keys())
...     d2_keys = set(d2.keys())
...     shared_keys = d1_keys.intersection(d2_keys)
...     added = d1_keys - d2_keys
...     removed = d2_keys - d1_keys
...     modified = {o: {"old": d1[o], "new": d2[o]} for o in shared_keys if d1[o] != d2[o]}
...     same = set(o for o in shared_keys if d1[o] == d2[o])
...     return {"added": added, "removed": removed, "modified": modified, "same": same}
>>> pprint(dict_compare(dict1, dict2))
{'added': {'value_6', 'value_3', 'value_5'},
'modified': {'value_2': {'old': 2, 'new': 3}},
'removed': {'value_1'},
'same': {'value_4'}}

最新更新