如何正确实现过滤数据输入的Neuraxle管道步骤



我正试图在neuralaxe(0.5.2(中实现一个BaseStep,用于过滤data_input(以及相应的expected_output(。

class DataFrameQuery(NonFittableMixin, InputAndOutputTransformerMixin, BaseStep):
def __init__(self, query):
super().__init__()
self.query = query

def transform(self, data_input):
data_input, expected_output = data_input
# verify that input and output are either pd.DataFrame or pd.Series
# ... [redacted] ...
new_data_input = data_input.query(self.query)
if all(output is None for output in expected_output):
new_expected_output = [None] * len(new_data_input)
else:
new_expected_output = expected_output.loc[new_data_input.index]
return new_data_input, new_expected_output

这自然(在大多数情况下(将导致len(data_inputs)(和expected_outputs(的变化。在最近版本的neuraxle中,我得到了一个AssertionError:

data_input = pd.DataFrame([{"A": 1, "B": 1}, {"A": 2, "B": 2}], index=[1, 2])
expected_output = pd.Series([1, 2], index=[1, 2])
pipeline = Pipeline([
DataFrameQuery("A == 1")
])
pipeline.fit_transform(data_input, expected_output)
AssertionError: InputAndOutputTransformerMixin: 
Caching broken because there is a different len of current ids, and data inputs. 
Please use InputAndOutputTransformerWrapper if you plan to change the len of the data inputs.

根据我的理解,这就是Neuraxle的Handler方法应该发挥作用的地方。然而,到目前为止,我还没有找到一个可以使用的方法来更新转换后输入和输出的current_id(我想它应该是_did_transform,但似乎没有被调用(。

一般情况:

  • 对于转换后的输入和预期输出,更新current_ids的正确方法是什么(在同一步骤中(
  • data_container上应用副作用时需要注意哪些方面?标识符是否用于分割SIMD并行性的数据?新的标识符通常是整数序列吗

Edit:我也尝试过设置savers和使用InputAndOutputTransformerWrapper,如下所述。仍然收到以下错误(可能是因为我不确定在哪里调用handle_transform(:

AssertionError: InputAndOutputTransformerWrapper: 
Caching broken because there is a different len of current ids, and data inputs.
Please resample the current ids using handler methods, or create new ones by setting the wrapped step saver to HashlibMd5ValueHasher using the BaseStep.set_savers method.

编辑:目前我已经解决了如下问题:


class OutputShapeChangingStep(NonFittableMixin, InputAndOutputTransformerMixin, BaseStep):
def __init__(self, idx):
super().__init__()
self.idx = idx

def _update_data_container_shape(self, data_container):
assert len(data_container.expected_outputs) == len(data_container.data_inputs)
data_container.set_current_ids(range(len(data_container.data_inputs)))
data_container = self.hash_data_container(data_container)
return data_container

def _set_data_inputs_and_expected_outputs(self, data_container, new_inputs, new_expected_outputs) -> DataContainer:
data_container.set_data_inputs(new_inputs)
data_container.set_expected_outputs(new_expected_outputs)
data_container = self._update_data_container_shape(data_container)
return data_container

def transform(self, data_inputs):
data_inputs, expected_outputs = data_inputs
return data_inputs[self.idx], expected_outputs[self.idx]

我很可能";错误地";在这种情况下,覆盖InputAndOutputTransformerMixin_set_data_inputs_and_expected_outputs(_transform_data_container是更好的选择吗?(,但像这样更新current_ids(并重新哈希容器(似乎是可能的。然而,我仍然对如何更符合Neuraxle的API期望感兴趣。

就我个人而言,我最喜欢的方法是只使用处理程序方法。在我看来它要干净得多。

处理程序方法的用法示例:

class WindowTimeSeries(ForceHandleMixin, BaseTransformer):
def __init__(self):
BaseTransformer.__init__(self)
ForceHandleMixin.__init__(self)
def _transform_data_container(self, data_container: DataContainer, context: ExecutionContext) -> DataContainer:
di = data_container.data_inputs
new_di, new_eo = np.array_split(np.array(di), 2)
return DataContainer(
summary_id=data_container.summary_id,
data_inputs=new_di,
expected_outputs=new_eo
)

通过这种方式,当前的id将被重新创建,并使用默认行为进行散列。注意:摘要id是最重要的。它是在一开始就被创造出来的,它被超参数重新刷新。。。如果需要,您还可以使用自定义保护程序(如HashlibMd5ValueHasher(生成新的当前id。

编辑,确实有一个错误。此处已修复:https://github.com/Neuraxio/Neuraxle/pull/379

用法示例:

step = InputAndOutputTransformerWrapper(WindowTimeSeriesForOutputTransformerWrapper()) 
.set_hashers([HashlibMd5ValueHasher()])
step = StepThatInheritsFromInputAndOutputTransformerMixin() 
.set_hashers([HashlibMd5ValueHasher()])

最新更新