如何使用 PyTorch DataLoader 创建批处理,以便给定批处理中的每个示例都具有相同的属性值?



假设我有一个列表,datalist其中包含几个示例(对于我的用例而言,这些示例属于torch_geometric.data.Data类型)。每个示例都有一个属性num_nodes

出于演示目的,可以使用以下代码片段创建此类datalist

import torch
from torch_geometric.data import Data # each example is of this type
import networkx as nx # for creating random data
import numpy as np
# the python list containing the examples
datalist = []
for num_node in [9, 11]:
for _ in range(1024):
edge_index = torch.from_numpy(
np.array(nx.fast_gnp_random_graph(num_node, 0.5).edges())
).t().contiguous()
datalist.append(
Data(
x=torch.rand(num_node, 5), 
edge_index=edge_index, 
edge_attr=torch.rand(edge_index.size(1))
)
)

从上面的datalist对象中,我可以通过使用DataLoader构造函数天真地(没有任何约束)创建一个torch_geometric.loader.DataLoader(torch.utils.data.DataLoader子类

):
from torch_geometric.loader import DataLoader
dataloader = DataLoader(
datalist, batch_size=128, shuffle=True
)

我的问题是,如何使用DataLoader类来确保给定批处理中的每个示例对num_nodes属性具有相同的值?

附注: 我试图解决它,并想出了一个黑客解决方案,通过使用此处的combine_iterators功能片段组合多个DataLoader对象,如下所示:

def get_combined_iterator(*iterables):
nexts = [iter(iterable).__next__ for iterable in iterables]
while nexts:
next = random.choice(nexts)
try:
yield next()
except StopIteration:
nexts.remove(next)
datalists = defaultdict(list)
for data in datalist:
datalists[data.num_nodes].append(data)
dataloaders = (
DataLoader(data, batch_size=128, shuffle=True) for data in datalists.values()
)
batches = get_combined_iterator(*dataloaders)

但是,我认为必须有一些优雅/更好的方法来做到这一点,因此这个问题。

如果基础数据集是映射样式的,则可以使用 define 一个torch.utils.data.Sampler,该返回要一起批处理的示例的索引。此实例将作为batch_samplerkwarg 传递给您的DataLoader,您可以删除batch_sizekwarg,因为采样器将根据您的实现方式为您形成批次。

按照 erip 的建议,我对torch.utils.data.sampler.Sampler进行了子类化以创建一个新的采样器:BucketSampler它使用torch.utils.data.sampler.SubsetRandomSamplertorch.utils.data.sampler.BatchSampler来实现对给定属性具有相同值的示例的批处理。

import torch
from torch.utils.data.sampler import Sampler, BatchSampler, SubsetRandomSampler
class BucketSampler(Sampler):
def __init__(self, dataset, batch_size, start_pos_data, generator=None) -> None:
self.dataset = dataset
self.batch_size = batch_size
self.generator = generator
start_pos_data = start_pos_data
start_end_indices = []
for i in range(len(start_pos_data) - 1):
start_end_indices.append((start_pos_data[i], start_pos_data[i+1]))
start_end_indices.append((start_pos_data[-1], len(self.dataset)))
ranges  = [range(start, end) for start, end in start_end_indices]
subset_samplers = [SubsetRandomSampler(range_, generator=generator) for range_ in ranges]
self.samplers = [
BatchSampler(subset_sampler, batch_size, drop_last=False) for subset_sampler in subset_samplers
]
self._len = 0
for sampler in self.samplers:
self._len += len(sampler)

def __iter__(self):
iterators = [iter(sampler) for sampler in self.samplers]
while iterators:
randint = torch.randint(0, len(iterators),size=(1,), generator=self.generator)[0]
try:
yield next(iterators[randint])
except StopIteration:
iterators.pop(randint)
def __len__(self):
return self._len

除了通常的参数外,此类还将start_pos_data作为参数,该参数是一个列表,其中包含datalist中的第一个索引(来自相关示例的数据集),属性值在该索引处更改。因此,对于上面的示例,我们可以借助以下代码片段创建这样的列表:

# sort datalist to ensure that data items with the same number of nodes are grouped together
sorted_datalist = sorted(datalist, key = lambda data: data.num_nodes)
# initialize the start_pos_data by 0
start_pos_data = [0]
for i in range(1,len(sorted_datalist)):
if sorted_datalist[i].num_nodes != sorted_datalist[i-1].num_nodes:
# append when the number of nodes changes 
start_pos_data.append(i)

现在,start_pos_data可以传递给初始化采样器的构造函数BucketSampler

bucketSampler = BucketSampler(sorted_datalist, batch_size = 128, start_pos_data = start_pos_data)

在此之后,可以将bucketSampler作为kwarg传递给构造函数DataLoader为:

from torch_geometric.loader import DataLoader
dataloader = DataLoader(sorted_datalist, batch_sampler = bucketSampler)

dataloader(迭代时)将以所需的方式生成批处理。

相关内容

  • 没有找到相关文章

最新更新