我正在尝试开发Job程序,该程序反过来调用Service程序。ChangeDetectorService
确实具有名为process
的过程。我不知道为什么我收到警告信息:
AttributeError:"ChangeDetectorService"对象没有属性"process">
"change_detector_job.py";以及CCD_ 3源代码。
# change_detector_job.py
import argparse
import datetime
# import multiprocessing
import os
# import sys
# import traceback
import sys
import traceback
from common.utils.config import Config, log_start, log_end
from ReplicationService.module.ChangeDetectorService import ChangeDetectorService
args = None
main_config = None
main_logger = None
app_name = os.path.basename(sys.argv[0]).replace('.py', '') + '_main'
class ChangeDetectorJob:
def __init__(self):
self.config = None
self.logger = None
self.env = 'test'
def setup(self):
self.env = args.env
self.config = Config(args.config, app_name)
self.logger = self.config.get_logger()
def process(self):
self.setup()
t1 = log_start("ChangeDetector job started (UTC): %s" % (datetime.datetime.utcnow()), with_memory=True,
logger=self.logger)
service = ChangeDetectorService(args.config, app_name)
success = service.process()
log_end("ChangeDetector job completed (UTC): %s" % (datetime.datetime.utcnow()), success=success, start_time=t1,
logger=self.logger)
if __name__ == '__main__':
try:
parser = argparse.ArgumentParser(description="Data ChangeDetector Job")
parser.add_argument("-e", "--env", default="test", choices=['test', 'dev', 'uat', 'prd'],
help="environment this job to be deployed (default %default)")
parser.add_argument("-c", "--config", default="../config/cmo_backend_local_config.ini",
help="config file (test only default %default)")
args = parser.parse_args()
ini_file = args.config
main_config = Config(ini_file, app_name)
main_logger = main_config.get_logger()
# multiprocessing.set_start_method('spawn')
main_logger.info("**** ChangeDetector job started (UTC): %s" % datetime.datetime.utcnow())
# service = ChangeDetectorService(args.config, app_name)
# success = service.process()
ChangeDetectorJob().process()
main_logger.info("**** ChangeDetector job completed (UTC): %s" % datetime.datetime.utcnow())
except Exception as error:
if main_logger is not None:
main_logger.error("Exception encountered: " + str(error))
main_logger.error(traceback.format_exc())
traceback.print_exc()
print("FINAL EXCEPTION OUT .... ")
# ChangeDetectorService.py
# import mysql.connector
from datetime import datetime
from datetime import timedelta
from common.database.mysql_manager import MysqlManager
from common.utils.config import Config, log_start, log_end
# mysql.connector.connect(host='localhost',port="3306",user='user',password='pass',database='dbname')
import mysql.connector
# rows_inserted = 0;
class ChangeDetectorService:
#
def __init__(self, ini_file, app_name):
self.config_object = Config(ini_file, app_name)
self.logger = self.config_object.get_logger()
self.mysql_manager = MysqlManager(self.config_object)
try:
def get_connection(self):`enter code here`
connection = mysql.connector.connect(user='user', password='zzzz',
host='host',
database='dbname'
)
return connection
def init_change_detector(self):
print("Change Detector started at " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
mysqlManager = MysqlManager
# conn = self.get_connection()
conn = self.mysqlManager.get_db_connection()
cursor = conn.cursor()
query = ('SELECT table_name, column_name, key_name '
'FROM cmooptimizationsvc.csj_chngdtct_tab_col '
'WHERE is_active=1 '
'ORDER BY table_name, column_name')
cursor.execute(query)
# get all records
records = cursor.fetchall()
for record in records:
self.process_col_tab_chg(record[0], record[1], record[2])
if conn.is_connected():
conn.close()
cursor.close()
def insert_change_log(self, table_name, key_name, attr_name, old_attr_value, new_attr_value):
# global rows_inserted;
insert_query = """INSERT INTO csj_shipment_changelog(table_name, key_name,
attr_name, old_attr_value,
new_attr_value)
VALUES (%s, %s, %s, %s, %s)"""
conn = self.get_connection()
cursor = conn.cursor()
tuple1 = (table_name, key_name, attr_name, old_attr_value, new_attr_value)
# tuples.append(tuple1)
cursor.execute(insert_query, tuple1)
# rows_inserted += 1
# print( rows_inserted );
# if (rows_inserted%10==0):
# cursor.executemany(insert_query, tuples)
conn.commit()
rows_inserted = 0
# tuples = []
# quit()
cursor.close()
conn.close()
# Look for Shipment, in past date
def find_past_shipment(self,
table_name,
key_name,
column_name,
before_date,
curr_key
):
saved_col_name = column_name
saved_key_name = key_name
conn = self.get_connection()
cursor = conn.cursor()
query = 'SELECT ' + saved_key_name + ' , ' + saved_col_name + ' FROM ' + table_name
+ ' where rec_cre_dt_utc < ' + "'" + before_date.strftime('%Y-%m-%d 00:00:00') + "'"
+ ' and shipment_num = ' + "'" + curr_key + "'" + ' order by rec_cre_dt_utc desc LIMIT 1'
cursor.execute(query)
records = cursor.fetchone()
cursor.close()
conn.close()
if records is not None:
past_attr_val = records[1]
return past_attr_val
else:
return 0
def process_col_tab_chg(self,table_name, column_name, key_name):
saved_key_name = key_name
saved_col_name = column_name
old_val = 0
ini_time_for_now = datetime.now()
date_before_1day = ini_time_for_now - timedelta(days=1)
query = 'SELECT ' + key_name + ' , ' + saved_col_name + ' , ' + ' rec_cre_dt_utc FROM ' + table_name
+ ' where rec_cre_dt_utc >= ' + "'" + date_before_1day.strftime('%Y-%m-%d 00:00:00') + "'"
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute(query)
for (key_name, column_name, rec_cre_dt_utc) in cursor:
curr_attr_val = column_name
curr_key_val = key_name
old_val = self.find_past_shipment(table_name,
saved_key_name,
saved_col_name,
rec_cre_dt_utc,
curr_key_val
)
if curr_attr_val != old_val
and old_val != 0:
self.insert_change_log(table_name, key_name, saved_col_name, old_val, curr_attr_val )
else:
continue
cursor.close
conn.close()
def cleanup():
print("Change Detector stopped " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def process():
start = datetime.now()
init_change_detector()
# cleanup()
# mysqlManager = MysqlManager
# conn = get_connection(self)
# conn = self.mysql_manager.get_db_connection()
# cursor = conn.cursor()
# query = ('SELECT table_name, column_name, key_name '
# 'FROM cmooptimizationsvc.csj_chngdtct_tab_col '
# 'WHERE is_active=1 '
# 'ORDER BY table_name, column_name')
# cursor.execute(query)
# # get all records
# records = cursor.fetchall()
# for record in records:
# self.process_col_tab_chg(record[0], record[1], record[2])
# if conn.is_connected():
# conn.close()
# cursor.close()
end = datetime.now()
time_diff = (end - start)
execution_time = time_diff.total_seconds()
print("Elapsed time(secs): " + str(execution_time))
except Exception as e:
print("Exception " + e)
finally:
cleanup()
# if __name__ == "__main__":
# main()
当您尝试使用对象(类(不存在的属性(方法或属性(时,会得到AttributeError
。在这种情况下,错误表明ChangeDetectorService
处于问题中:
AttributeError:"ChangeDetectorService"对象没有属性"process">
在您的错误消息中,就在上述行之前,您应该看到导致此错误的代码行:
success = service.process()
service
被分配给这个表达式:
service = ChangeDetectorService(args.config, app_name)
因此,service
属于类型(类(ChangeDetectorService
。它一定是…检查源代码,我们看到这个:
class ChangeDetectorService:
#
def __init__(self, ini_file, app_name):
self.config_object = Config(ini_file, app_name)
self.logger = self.config_object.get_logger()
self.mysql_manager = MysqlManager(self.config_object)
try:
...
CCD_ 9具有与CCD_ 10相同的缩进。因此,try
已经关闭了ChangeDetectorService
类的定义。这意味着,ChangeDetectorService
类只定义了构造函数(__init__
(。ChangeDetectorService
类没有定义任何其他方法。这意味着,它也没有process
方法。这就是你出错的原因。如果希望ChangeDetectorService
类具有process
方法,请确保try
或其他任何东西不会干扰该类的所有方法定义。