CSV文件示例:
test.csv
process_cd
ramsize
dbsize
protocal
random
函数将使用以下参数调用
self.complete_stage_purge_process(self.targetCnxn, self.targetTable, self.processCode)
程序代码示例:
test
protocal
forensic
每次调用time函数都需要读取CSV文件中的进程代码,如果进程代码匹配,则绕过内部delete调用
def complete_stage_purge_process(self, target_cnxn, stage_table, process_cd):
delete_sql = "delete from " + schemaName.STAGE.value + "." +
stage_table + " where run_pk in (" + run_pk_sql + ")"
try:
trgt_cursor = target_cnxn.cursor()
trgt_cursor.execute(delete_sql)
target_cnxn.commit()
self.logger.debug("deletes processed successfully ")
target_cnxn.close()
except:
self.logger.exception('Error in processing deletes')
raise
else:
self.logger.debug('purge process is not required for this process')
如何实现CSV读取循环
尝试使用下面的代码片段,但代码仍然要清除进程,而不是在循环
中运行进程代码搜索non_purge_process_file = open(self.file_path)
reader = csv.reader(non_purge_process_file)
for row in reader:
if process_cd in row:
self.logger.debug("Do not perform stage purge process.")
return
else:
delete_dt = datetime.today() - timedelta(days=30)
delete_dt = str(delete_dt)
我用下面的方法解决了上面的问题:
def check_process_cd(self, process_cd):
self.logger.debug(datetime.now())
self.logger.debug('check_process_cd')
purge_process_flag = 0
reader = csv.reader(purge_process_codes_file)
for row in reader:
if process_cd == row[0]:
self.logger.debug("Perform stage purge process.")
purge_process_flag = 1
return purge_process_flag
基于标志执行函数调用
if self.purge_process_flag == 1:
self.complete_stage_purge_process(self.targetCnxn, self.targetTable, self.processCode)
else:
self.logger.debug('Do not perform purge process')
import csv
def validate_and_purge(input_param, csv_file):
found = False # flag to indicate whether input_param is found in the CSV file
with open(csv_file, 'r') as f:
reader = csv.reader(f)
for row in reader:
if input_param in row:
found = True
break
if found:
return True
return False