读取CSV文件并验证输入参数是否在CSV文件中,然后绕过清除过程,否则使用python启动清除过程



CSV文件示例:

test.csv
process_cd
ramsize
dbsize
protocal
random

函数将使用以下参数调用

self.complete_stage_purge_process(self.targetCnxn, self.targetTable, self.processCode)

程序代码示例:

test
protocal
forensic

每次调用time函数都需要读取CSV文件中的进程代码,如果进程代码匹配,则绕过内部delete调用

def complete_stage_purge_process(self, target_cnxn, stage_table, process_cd):

delete_sql = "delete from " + schemaName.STAGE.value + "." + 
stage_table + " where run_pk in (" + run_pk_sql + ")"
try:
trgt_cursor = target_cnxn.cursor()
trgt_cursor.execute(delete_sql)
target_cnxn.commit()
self.logger.debug("deletes processed successfully ")
target_cnxn.close()
except:
self.logger.exception('Error in processing deletes')
raise
else:
self.logger.debug('purge process is not required for this process')

如何实现CSV读取循环

尝试使用下面的代码片段,但代码仍然要清除进程,而不是在循环

中运行进程代码搜索
non_purge_process_file = open(self.file_path)
reader = csv.reader(non_purge_process_file)
for row in reader:
if process_cd in row:
self.logger.debug("Do not perform stage purge process.")
return
else:
delete_dt = datetime.today() - timedelta(days=30)
delete_dt = str(delete_dt) 

我用下面的方法解决了上面的问题:

def check_process_cd(self, process_cd):
self.logger.debug(datetime.now())
self.logger.debug('check_process_cd')
purge_process_flag = 0
reader = csv.reader(purge_process_codes_file)
for row in reader:
if process_cd == row[0]:
self.logger.debug("Perform stage purge process.")
purge_process_flag = 1
return purge_process_flag

基于标志执行函数调用

if self.purge_process_flag == 1:
self.complete_stage_purge_process(self.targetCnxn, self.targetTable, self.processCode)
else:
self.logger.debug('Do not perform purge process')
import csv

def validate_and_purge(input_param, csv_file):
found = False  # flag to indicate whether input_param is found in the CSV file
with open(csv_file, 'r') as f:
reader = csv.reader(f)
for row in reader:
if input_param in row:
found = True
break
if found:
return True

return False

最新更新