由于我处理许多.csv文件并且必须经常将它们转换为不同的格式,因此我真的很想在Python中为此制作一个应用程序,这样我就不必每次在记事本++中手动执行此操作。
下面的代码在单核模式下确实有效。但是,我想合并多处理以一次处理几个线程。
我的问题是,当我使用多处理时,代码完成没有任何错误,也没有打印出来。
你能帮我解决它吗?
import pandas as pd
from multiprocessing import Process
import os
import time
start_time = time.time()
thread_count = 2
def timer():
print('Script ran for ' + time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)))
class Imported:
def __init__(self, filename, convert_type):
self.filename, self.convert_type = filename, convert_type
if self.convert_type == 'to_hana':
self.delim = ';'
self.decim = ','
if self.convert_type == 'from_hana':
self.delim = ','
self.decim = '.'
if self.convert_type not in ['to_hana' , 'from_hana']:
raise ValueError('Convert type is wrong!')
def convert(self):
if self.convert_type == 'to_hana':
self.file = pd.read_csv(self.filename, encoding = 'utf-8', sep=self.delim, decimal=self.decim)
self.error_check()
self.delim = ','
self.decim = '.'
process_id = os.getpid()
print(f'Process ID: {process_id}')
self.file.to_csv(self.filename, index = None, header=True, decimal=self.decim, sep=self.delim)
print('test1')
timer()
if self.convert_type == 'from_hana':
self.file = pd.read_csv(self.filename, encoding = 'utf-8', sep=self.delim, decimal=self.decim)
self.error_check()
self.delim = ';'
self.decim = ','
process_id = os.getpid()
print(f'Process ID: {process_id}')
self.file.to_csv(self.filename, index = None, header=True, decimal=self.decim, sep=self.delim)
print('test2')
timer()
def error_check(self):
if len(list(self.file.columns.values)[0]) > 20:
raise ValueError('Pravědpodobně poškozený soubor. Název prvního sloupce je příliš dlouhý.')
if __name__ == '__main__':
filenames = [['data.csv','from_hana'],['klasifikace.csv','to_hana'],['klasifikace_statistiky.csv','to_hana']]
processes = []
#for i in enumerate(filenames):
# Imported.convert(Imported(filenames[i[0]][0], filenames[i[0]][1]))
for i in enumerate(filenames):
process = Process(target=Imported.convert, args=(filenames[i[0]][0], filenames[i[0]][1]))
processes.append(process)
process.start()
for process in processes:
process.join()
print('DONE')
您可以通过创建类的对象,然后通过将目标指定为 obj.convert 来启动一个进程来解决它
import pandas as pd
from multiprocessing import Process
import os
import time
start_time = time.time()
thread_count = 2
def timer():
print('Script ran for ' + time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)))
class Imported:
def __init__(self, filename, convert_type):
self.filename, self.convert_type = filename, convert_type
if self.convert_type == 'to_hana':
self.delim = ';'
self.decim = ','
if self.convert_type == 'from_hana':
self.delim = ','
self.decim = '.'
if self.convert_type not in ['to_hana', 'from_hana']:
raise ValueError('Convert type is wrong!')
def convert(self):
if self.convert_type == 'to_hana':
self.file = pd.read_csv(self.filename, encoding='utf-8', sep=self.delim, decimal=self.decim)
self.error_check()
self.delim = ','
self.decim = '.'
process_id = os.getpid()
print('Process ID:', process_id)
self.file.to_csv(self.filename, index=None, header=True, decimal=self.decim, sep=self.delim)
print('test1')
timer()
if self.convert_type == 'from_hana':
self.file = pd.read_csv(self.filename, encoding='utf-8', sep=self.delim, decimal=self.decim)
self.error_check()
self.delim = ';'
self.decim = ','
process_id = os.getpid()
print('Process ID', process_id)
self.file.to_csv(self.filename, index=None, header=True, decimal=self.decim, sep=self.delim)
print('test2')
timer()
def error_check(self):
if len(list(self.file.columns.values)[0]) > 20:
raise ValueError('Pravědpodobně poškozený soubor. Název prvního sloupce je příliš dlouhý.')
if __name__ == '__main__':
filenames = [['data.csv', 'from_hana'], ['klasifikace.csv', 'to_hana'], ['klasifikace_statistiky.csv', 'to_hana']]
processes = []
# for i in enumerate(filenames):
# Imported.convert(Imported(filenames[i[0]][0], filenames[i[0]][1]))
for i in enumerate(filenames):
obj = Imported(filenames[i[0]][0], filenames[i[0]][1])
process = Process(target=obj.convert)
processes.append(process)
process.start()
for process in processes:
process.join()
print('DONE')
import subprocess
from multiprocessing import Pool, cpu_count
import pandas as pd
def multi_processor(function_name, file_path, output_format):
file_list = []
#file_list = str(subprocess.check_output(f"find {file_path} -type f -iname "*.csv*" ", shell=True)).split('\n')
#file_list = sorted(file_list)
# Test, put 6 strings in the list so your_function should run six times with 6 processors in parallel
file_list.append("file_path1")
file_list.append("file_path2")
file_list.append("file_path3")
file_list.append("file_path4")
file_list.append("file_path5")
file_list.append("file_path6")
# Use max number of system processors - 1
pool = Pool(processes=cpu_count()-1)
pool.daemon = True
results = {}
# for every file in the file list, start a new process
for each_file in file_list:
results[each_file] = pool.apply_async(your_function, args=(output_format, each_file))
# Wait for all processes to finish before proceeding
pool.close()
pool.join()
# Results and any errors are returned
return {your_function: result.get() for your_function, result in results.items()}
def your_function(output_format, file_name):
try:
df = pd.read_csv(file_name)
writer = pd.ExcelWriter(f"{file_name}{output_format}")
df.to_excel(writer)
writer.save()
return "Success!"
except Exception as e:
return str(e)
if __name__ == "__main__":
some_results = multi_processor("your_function", "some_path_to_csv_files", ".xlsx")
print(some_results)