不确定这是否可能,但这就是我要做的: -
我想提取函数的部分(步骤(作为单个节点(到目前为止还行(,但问题是我在步骤之上有一个迭代器,它依赖于数据集上的一些逻辑,即重复相同的操作(独立(在数据集的逻辑分区上。
示例代码
def single_node(list_of_numbers):
modified_list = [] # to store all output
for x in list_of_numbers: # iteration logic
x+=1 # Step 1
x=str(x) # Step 2
x+="_suffix" # Step 3
modified_list.append(x) # append to final output
return modified_list # return
上下文
- 在提供的示例中,假设当前我有一个执行所有步骤的节点。
- 因此,当前管道有一个节点,该节点接受 1 个输入并返回 1 个输出。
- 随着步骤复杂性的增加,我想将它们公开为单个节点。因此,我创建了另一个管道,将这 3 个步骤作为单独的节点,并将它们连接在一起。(他们的输入和输出(
- 但是我的总体要求没有变化,我想遍历
list_of_numbers
中的所有值,对于此列表中的每个元素,我想调用此新管道。最后,我想合并所有运行的输出并生成一个输出。
似乎有点类似于基于数据集扩展的动态图(管道的多个动态实例(。
需要考虑的其他要点,
- 我的输入是一个文件。假设我根据定义为节点的某些逻辑对数据集进行分区。因此,此节点可以有多个输出。(确切的计数完全取决于数据集,这里是列表的大小(
- 对于数据迭代器节点的每个输出,我需要"生成"一个管道。
- 最后,合并所有"生成"管道的输出。(此逻辑可以再次在具有多个动态输入的合并节点中定义(。
有没有办法做到这一点?谢谢!
看起来分区数据集或增量数据集可能对您有用。
它们允许您将类似的数据隔离为由文件确定的单独块,并根据需要对这些块重复操作。
因此,与其踢出包含 y 个节点的 x 个管道,不如拥有一个包含 y 个节点的管道来处理 x 个数据块。
有关增量数据集的更多信息,请观看此视频:https://www.youtube.com/watch?v=v7JSSiYgqpg
# nodes.py
from typing import Any, Dict, Callable
def _dict_mapper(dictionary: Dict[str, Any], fun: Callable):
# Apply your function to the dictionary mapping
return {k: fun(v) for k, v in dictionary.items()}
def node_0(list_of_strings: Dict[str, str]):
return _dict_mapper(list_of_strings, lambda x: int(x))
def node_1(list_of_numbers: Dict[str, int]):
return _dict_mapper(list_of_numbers, lambda x: x+1)
def node_2(list_of_numbers: Dict[str, int]):
return _dict_mapper(list_of_numbers, lambda x: str(x))
def node_3(list_of_strings: Dict[str, str]):
return _dict_mapper(list_of_strings, lambda x: f'{x}_suffix')
# catalog.yml
data:
type: IncrementalDataSet
dataset: text.TextDataSet
path: folder/with/text_files/each/containing/single/number/
filename_suffix: .txt
# pipeline.py
Pipeline([
node(node_0, inputs='data', outputs='0'),
node(node_1, inputs='0', outputs='1'),
node(node_2, inputs='1', outputs='2'),
node(node_3, inputs='2', outputs='final_output'),
])