对于措辞不佳的问题,我提前道歉。
我想用不同的处理器拆分以下计算。使用 1 个处理器花费的时间太长。我可以使用 48 个处理器来拆分此过程。
基本上,每 2 小时我想扫描日志文件以查找在 Airflow 上运行的进程/任务。 为了找到需要扫描的文件夹,我使用bsh_cmd_find
。 输出为我提供了字符串形式的路径。此路径将添加到列表中,paths
。然后打开列表中的每个路径,以查看文件夹中是否存在.log
文件。这些日志文件路径将添加到名为files
的新列表中。 还有很多多余的文件,所以我找到每个文件的修改时间并将其过滤到前 2 小时。这总共需要~10分钟左右。上下文不需要扫描日志的实际代码。
是否可以使用PySpark
或multiprocessing
分发此任务?我也没有经验。
two_hours_ago = dt.datetime.now() - dt.timedelta(minutes=125) ## using 125 for delay.
print("Running bash command to find processes that ran within the past 2 hours...")
## Setting modification time for 30 minutes
mtime = -2/24;
bsh_cmd_find = "find /home/storage/user/airflow/logs -maxdepth 3 -type d -mtime {0} -wholename *_*/[A-Za-z]*".format(mtime)
args = shlex.split(bsh_cmd_find)
for out in Popen(args,stdout = PIPE).stdout:
out = out.decode("utf-8") # Converting the output path from a byte to a string
out = out.rstrip() # Removing trailing whitespace from the path string
paths.append(out)
print("Extracting all the logs...")
#d = directories, f = files, r = roots
for path in paths:
for r, d, f in os.walk(path):
for file in f:
if '.log' in file:
files.append(os.path.join(r, file))
files = [i for i in files if 'dag_processor_manager' not in i]
files = [i for i in files if 'scheduler' not in i]
print("Extracting modification time of the logs...")
for f in files:
mod_time = dt.datetime.fromtimestamp(os.path.getmtime(f))
mod_time_list.append(mod_time)
tuple_list = list(zip(files, mod_time_list))
my_df = pd.DataFrame(tuple_list, columns = ['Files', 'Date'])
my_df = my_df[(my_df['Date'] >= two_hours_ago) & (my_df['Date'] <= current_time)]
files = my_df['Files'].tolist()
是的,这是可能的。您从spark官方网站开始,在线有大量文档。 我不确定您是否希望有人将您的代码直接转换为可在 Spark 上运行的东西,但这不太可能发生。