运行路易吉管道时的依赖项错误消息



我试图创建一个管道,该管道从一个类开始,该类根据用户所处的状态将一个文件分成多个csvs,然后查看代表不同状态创建的文件,并尝试确定用户是否从一个状态移动到另一个状态,如果用户这样做,则返回1,如果他/她没有,则返回0, 使用高斯 KDE 拟合这些"概率",将其保存为泡菜,然后从泡菜中获取样本并将它们保存为 CSV。

我正在使用 luigi 来构建此管道,但在尝试运行我的代码时不断遇到错误消息。运行 state_to_state 类时,管道似乎失败。

这是我编写的代码:

separate_csv.py:

import luigi
import pandas as pd
import numpy as np
import os
import state_to_state_transitions2 as sst
class data_filter(luigi.Task):
file = pd.read_csv('/Users/emmanuels/Desktop/Attribution/finalcleanattributiondata.csv')
actions = file.state.unique()
def run(self):
for current in self.actions:
filter_file = self.file.loc[self.file.state.str.contains(current,na=False)]
filter_file.to_csv('/Users/emmanuels/Documents/AttributionData/Data/'+str(current)+'.csv')
def requires(self):
return []
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/'+str(self.actions)+'.csv')

state_to_state_transitions2;

import luigi
import pandas as pd
import separate_csv
class state_to_state(luigi.Task):
first_file = luigi.Parameter()
second_file = luigi.Parameter()
def run(self):
#iterate through states and find probability of anonymous id existing in next state
first = pd.read_csv(self.first_file)
second = pd.read_csv(self.second_file)
first['probability'] = first.anonymous_id.isin(second.anonymous_id).astype(int)
#save anonymous id along with probability (1,0) of whether or not it exists in the next state
first[['anonymous_id','probability']].to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.first_file.split('/')[6][:-4]+'to'+self.second_file.split('/')[7][:-4]+'.csv'))
def requires(self):
return separate_csv.data_filter()
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.first_file.split('/')[6][:-4]+'to'+self.second_file.split('/')[6][:-4]+'.csv'))

gaussian_kdefit;

import pandas as pd
import pickle
from scipy import stats
import luigi
import state_to_state_transitions2 as sst
class save_distributions(luigi.Task):
file_tag = luigi.Parameter()
path = '/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'
def run(self):
data = pd.read_csv(path+self.file_tag)
kernel = stats.gaussian_kde(data['probability'])
#we fit the distribution and save as a pickle
pickle.dump(kernel,open('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck','wb'))
def requires(self):
files = ['Session.csv','lead.csv','opportunity.csv','complete.csv']
task_list = []
for i in range(1,len(files)):
one = self.path+str(files[i-1])
two = self.path+str(files[i])
task_list.append(sst.state_to_state(first_file=one,second_file=two))
return task_list
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck')

get_samples:

import pandas as pd
import luigi
import gaussian_kdefit as gkde
#takes n samples and saves sample in csv
class sample_output(luigi.Task):
file_tag = luigi.Parameter()
size = luigi.Parameter()
def run(self):
kernel = pd.read_pickle('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck')
kernel = kernel.resample(int(self.size))
pd.DataFrame(kernel).transpose().to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'+sampleprobabs'+'.csv')
def requires(self):
files = ['Sessiontolead.csv', 'leadtoopportunity.csv', 'opportunitytocomplete.csv']
return [gkde.save_distributions(file_tag=file) for file in files]
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'+sampleprobabs'+'.csv')

和我的包装类:

import get_samples as getsamps
import pandas as pd
import luigi
class wrapper(luigi.WrapperTask):
def requires(self):
file_tag = ['Sessiontolead', 'leadtoopportunity', 'opportunitytocomplete']
task_list = []
size = 10
for i in range(0,len(file_tag)):
for k in range(1,size):
task_list.append(getsamps.sample_output(file_tag=file_tag[i],size=size))
return task_list
def run(self):
print('Wrapper ran')
pd.DataFrame().to_csv('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
if __name__ == '__main__':
luigi.build([wrapper()],workers=8,local_scheduler=True)

以下是最终干净归因文件的示例:

{'Unnamed: 0': {0: 0, 1: 1, 2: 2, 3: 3, 4: 4},
'uniques': {0: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/businessDetails/',
1: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/businessDetails/',
2: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/userDetails/',
3: '2018-05-17 20:00:000000c924-5959-4e2d-8757-0d10f96ca462http://m.facebook.com/https://www.yoc.com/signup/',
4: '2019-02-24 16:00:000002269a-1e39-4cdf-a43e-cecf0a277c1chttps://signup.yoc.com/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
'anonymous_id': {0: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
1: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
2: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
3: '0000c924-5959-4e2d-8757-0d10f96ca462',
4: '0002269a-1e39-4cdf-a43e-cecf0a277c1c'},
'user_id': {0: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
1: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
2: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
3: nan,
4: nan},
'ts': {0: '2019-06-18 09:11:14.409000',
1: '2019-06-18 09:11:15.028000',
2: '2019-06-18 09:12:03.118000',
3: '2018-05-17 20:31:32.203000',
4: '2019-02-24 16:08:32.661000'},
'url': {0: 'https://signup.yoc.com/signup/v1/step/businessDetails/',
1: 'https://signup.yoc.com/signup/v1/step/businessDetails/',
2: 'https://signup.yoc.com/signup/v1/step/userDetails/',
3: 'https://www.yoc.com/signup/',
4: 'https://signup.yoc.com/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
'path': {0: '/za/signup/v1/step/businessDetails/',
1: '/za/signup/v1/step/businessDetails/',
2: '/za/signup/v1/step/userDetails/',
3: '/za/signup/',
4: '/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
'referrer_domain': {0: 'signup.yoc.com',
1: 'signup.yoc.com',
2: 'signup.yoc.com',
3: 'm.facebook.com',
4: nan},
'utm_campaign': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
'utm_content': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
'utm_medium': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
'utm_source': {0: nan, 1: nan, 2: nan, 3: 'facebook', 4: nan},
'user_agent': {0: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
1: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
2: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
3: 'Mozilla/5.0 (Linux; Android 8.0.0; SM-G965F Build/R16NW; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/66.0.3359.158 Mobile Safari/537.36 [FB_IAB/FB4A;FBAV/172.0.0.66.93;]',
4: 'Opera/9.80 (Android; Opera Mini/38.1.2254/131.123; U; en) Presto/2.12.423 Version/12.16'},
'rank': {0: 1, 1: 2, 2: 3, 3: 1, 4: 1},
'state': {0: 'lead',
1: 'lead',
2: 'opportunity',
3: 'Session',
4: 'opportunity'}}

这是我得到的回溯:

Scheduled 9 tasks of which:
* 1 ran successfully:
- 1 data_filter(file=/Users/emmanuels/Desktop/Attribution/finalcleanattributiondata.csv)
* 1 failed:
- 1 state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
* 7 were left pending, among these:
* 7 had failed dependencies:
- 3 sample_output(file_tag=Sessiontolead, size=10) ...
- 3 save_distributions(file_tag=Sessiontolead.csv,leadtoopportunity.csv,opportunitytocomplete.csv)
- 1 wrapper()

....

INFO: [pid 45306] Worker Worker(salt=271561701, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=45306) running   state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
WARNING: Using wildcards in path /Users/emmanuels/Documents/AttributionData/Data/['lead' 'opportunity' 'Session' 'complete'].csv might lead to processing of an incomplete dataset; override exists() to suppress the warning.
ERROR: [pid 45306] Worker Worker(salt=271561701, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=45306) failed    state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
Traceback (most recent call last):
File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: data_filter__Users_emmanuels_c87d333278
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   state_to_state__Users_emmanuels__Users_emmanuels_c95a12621e   has status   FAILED

查看您的更新后,我注意到您的第一个任务实际上没有任何参数。你只有几个对象。不应在变量声明中运行pd.read_csv。相反,您应该将其放在run方法中(除非您需要基于数据要求内容,否则也在requires方法中读取它(。相反,file更改为具有default值的luigi.Parameter。此外,很难说self.actions应该是什么。参数(和自变量(应该是基元或可序列化为基元。

此外,您有一个递归导入,这可能会搞砸事情。在您的separate_csv中,您可以导入state_to_state_transition2,反之亦然。

老实说,这里有很多东西,可能是很多东西。我会努力简化您的工作流程并一次处理一个组件。此外,您可以使用luigi的输入/输出方法来更好地沿管道传输数据。

可能仍然使事情成为问题的上一个问题:

您的问题可能是您没有使用 Luigi 的内部原子文件系统。而不是在state_to_state_transision2中打开文件:

class state_to_state(luigi.Task):
def run(self):
...
first[['anonymous_id','probability']].to_csv(...)

与其自己写入文件,不如使用 luigi 的输出命令打开文件:

class state_to_state(luigi.Task):
def run(self):
...
with self.output().open('w') as out_csv:
out_csv.write(first[['anonymous_id','probability']].to_csv())

通过使用原子文件系统,即使您只是错误地写入文件,也会创建该文件。这向 luigi 发出信号,表明由于文件的存在,任务已完成。

相关内容

最新更新