如何转换 PySpark 或多处理的代码?



对于措辞不佳的问题,我提前道歉。

我想用不同的处理器拆分以下计算。使用 1 个处理器花费的时间太长。我可以使用 48 个处理器来拆分此过程。

基本上,每 2 小时我想扫描日志文件以查找在 Airflow 上运行的进程/任务。 为了找到需要扫描的文件夹,我使用bsh_cmd_find。 输出为我提供了字符串形式的路径。此路径将添加到列表中,paths。然后打开列表中的每个路径,以查看文件夹中是否存在.log文件。这些日志文件路径将添加到名为files的新列表中。 还有很多多余的文件,所以我找到每个文件的修改时间并将其过滤到前 2 小时。这总共需要~10分钟左右。上下文不需要扫描日志的实际代码。

是否可以使用PySparkmultiprocessing分发此任务?我也没有经验。

two_hours_ago = dt.datetime.now() - dt.timedelta(minutes=125)  ## using 125 for delay.

print("Running bash command to find processes that ran within the past 2 hours...")
## Setting modification time for 30 minutes
mtime = -2/24;
bsh_cmd_find = "find /home/storage/user/airflow/logs -maxdepth 3 -type d -mtime {0} -wholename *_*/[A-Za-z]*".format(mtime)
args = shlex.split(bsh_cmd_find)
for out in Popen(args,stdout = PIPE).stdout:
out = out.decode("utf-8")  # Converting the output path from a byte to a string
out = out.rstrip() # Removing trailing whitespace from the path string
paths.append(out)
print("Extracting all the logs...")
#d = directories, f = files, r = roots
for path in paths:
for r, d, f in os.walk(path):
for file in f:
if '.log' in file:
files.append(os.path.join(r, file))
files = [i for i in files if 'dag_processor_manager' not in i]
files = [i for i in files if 'scheduler' not in i]
print("Extracting modification time of the logs...")
for f in files:
mod_time = dt.datetime.fromtimestamp(os.path.getmtime(f))
mod_time_list.append(mod_time)
tuple_list = list(zip(files, mod_time_list))
my_df = pd.DataFrame(tuple_list, columns = ['Files', 'Date'])
my_df = my_df[(my_df['Date'] >= two_hours_ago) & (my_df['Date'] <= current_time)]
files = my_df['Files'].tolist()

是的,这是可能的。您从spark官方网站开始,在线有大量文档。 我不确定您是否希望有人将您的代码直接转换为可在 Spark 上运行的东西,但这不太可能发生。

相关内容

  • 没有找到相关文章

最新更新