路易吉任务和包装器失败



问过类似的问题,但决定将我的管道分解为更少的步骤,以便更深入地了解我出错的地方并使调试尽可能简单。

在我的第一堂课中,我正在学习一个巨大的 csv,并根据用户的当前状态将其分成多个 csv。然后,我创建了另一个任务,然后查看给定用户是否从一个状态移动到下一个状态,返回 1 和 0,具体取决于是否发生这种情况。

然后,我有一个包装类,该包装类应将参数值动态分配给前一个类。但是,我的管道似乎没有运行,我不确定我做错了什么

这是我所拥有的:

separate_csv.py:

import luigi
import pandas as pd
class data_filter(luigi.Task):
file = luigi.Parameter()
def run(self):
for current in actions:
file_pd = pd.read_csv(self.file)
actions = file_pd.state.unique()
filter_file = file_pd.loc[file_pd.state.str.contains(current,na=False)]
filter_file.to_csv('/Users/emm/Documents/AttributionData/Data/'+str(current)+'.csv')
def requires(self):
return []
def output(self):
return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/complete.csv')

state_to_state_transitions.py:

import luigi
import pandas as pd
import separate_csv as sep
class state_to_state(luigi.Task):
first_file = luigi.Parameter()
second_file = luigi.Parameter()
def run(self):
#iterate through states and find probability of anonymous id existing in next state
path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
first = pd.read_csv(path+self.first_file)
second = pd.read_csv(path+self.second_file)
first['probability'] = first.anonymous_id.isin(second.anonymous_id).astype(int)
#save anonymous id along with probability (1,0) of whether or not it exists in the next state
with self.output().open('w') as out_csv:
out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))
def requires(self):
files_two = [sep.data_filter(file='/Users/emm/Desktop/Attribution/finalcleanattributiondata.csv')]
return files_two
def output(self):
return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file))

wrapper.py

import state_to_state_transitions2 as sst
import pandas as pd
import luigi
class wrapper(luigi.WrapperTask):
def requires(self):
files = ['Session.csv', 'lead.csv', 'opportunity.csv', 'complete.csv']
task_list = []
for i in range(1, len(files)):
task_list.append(sst.state_to_state(first_file=files[i-1],second_file=files[i]))
return task_list
def run(self):
print('Wrapper ran')
pd.DataFrame().to_csv('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
if __name__ == '__main__':
luigi.build([wrapper()],workers=8,local_scheduler=True)

以下是我的错误消息的一部分:

File "/Users/emm/Documents/GitHub/AttributionModel/Capstone/state_to_state_transitions2.py", line 15, in run
out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))
TypeError: write() argument must be str, not None
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   state_to_state_lead_csv_opportunity_csv_b31ac9d110   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 4 pending tasks possibly being run by other workers
DEBUG: There are 4 pending tasks unique to this worker
DEBUG: There are 4 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=152474850, workers=1, host=Emms-MacBook-Pro.local, username=***, pid=***) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 1 present dependencies were encountered:
- 1 data_filter(file=/Users/emm/Desktop/Attribution/finalcleanattributiondata.csv)
* 3 failed:
- 3 state_to_state(first_file=Session.csv, second_file=lead.csv) ...
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 wrapper()
This progress looks :( because there were failed tasks

你的问题在这里:

with self.output().open('w') as out_csv:
out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))

而不是first[['anonymous_id','probability']].to_csv(file_name)写入文件file_name,你需要有first[['anonymous_id','probability']].to_csv(),它返回一个包含csv数据的字符串。

所以总的来说,你应该有:

with self.output().open('w') as out_csv:
out_csv.write(first[['anonymous_id','probability']].to_csv())

相关内容

最新更新