我有一个可以随时启动或停止的程序。该程序用于从网页下载数据。首先,用户将在.csv
文件中定义一堆网页,然后保存该.csv
文件,然后启动程序。该程序将读取.csv
文件并将其变成作业列表。接下来,这些作业分为5个单独的downloader
函数,这些功能并行起作用,但可能需要不同的时间才能下载。
downloader
(有5个(完成下载网页后,我需要它打开.csv
文件并删除链接。这样,随着时间的流逝,.csv
文件将变得越来越小。问题是有时两个download
功能会尝试同时更新.csv
文件,并导致程序崩溃。我该如何处理?
如果这是您项目的延续退出"下载器"后,通过输入文件。没有理由不断写下更改。
如果您想知道(从外部流程说(即使在"下载器"运行时下载URL时,请在downloaded.dat
中写入新线条,每次返回下载成功的过程。
当然,在这两种情况下,都会在主过程/线程中写入,因此您不必担心静音。
更新 - 使用与昨天相同的代码库来使用附加文件进行操作:
def init_downloader(params): # our downloader initializator
downloader = Downloader(**params[0]) # instantiate our downloader
downloader.run(params[1]) # run our downloader
return params # job finished, return the same params for identification
if __name__ == "__main__": # important protection for cross-platform use
downloader_params = [ # Downloaders will be initialized using these params
{"port_number": 7751},
{"port_number": 7851},
{"port_number": 7951}
]
downloader_cycle = cycle(downloader_params) # use a cycle for round-robin distribution
with open("downloaded_links.dat", "a+") as diff_file: # open your diff file
diff_file.seek(0) # rewind the diff file to the beginning to capture all lines
diff_links = {row.strip() for row in diff_file} # load downloaded links into a set
with open("input_links.dat", "r+") as input_file: # open your input file
available_links = []
download_jobs = [] # store our downloader parameters + a link here
# read our file line by line and filter out downloaded links
for row in input_file: # loop through our file
link = row.strip() # remove the extra whitespace to get the link
if link not in diff_links: # make sure link is not already downloaded
available_links.append(row)
download_jobs.append([next(downloader_cycle), link])
input_file.seek(0) # rewind our input file
input_file.truncate() # clear out the input file
input_file.writelines(available_links) # store back the available links
diff_file.seek(0) # rewind the diff file
diff_file.truncate() # blank out the diff file now that the input is updated
# and now let's get to business...
if download_jobs:
download_pool = Pool(processes=5) # make our pool use 5 processes
# run asynchronously so we can capture results as soon as they ar available
for response in download_pool.imap_unordered(init_downloader, download_jobs):
# since it returns the same parameters, the second item is a link
# add the link to our `diff` file so it doesn't get downloaded again
diff_file.write(response[1] + "n")
else:
print("Nothing left to download...")
正如我在评论中所写的那样,整个想法是使用文件在下载时存储下载的链接,然后在下一次运行中以滤除下载的链接并更新输入文件。这样,即使您强行杀死它,它也会始终恢复其关闭的位置(部分下载除外(。
使用多处理库中的'锁定'与文件序列化操作。
您将需要将锁定到每个过程中。每个过程都应在打开文件之前"获取"锁关闭文件后"释放"锁。
https://docs.python.org/2/library/multiprocessing.html
查看python中的锁定文件。锁定文件将使下一个过程等到文件解锁以对其进行修改。锁定文件是特定于平台的,因此您必须使用适用于所在操作系统的方法。如果您需要弄清楚操作系统使用这样的开关语句。
import os
def my_lock(f):
if os.name == "posix":
# Unix or OS X specific locking here
elif os.name == "nt":
# Windows specific locking here
else:
print "Unknown operating system, lock unavailable"
然后,我会查看这篇文章,并确切地弄清楚您要如何实现锁。