假设我有一个列表,datalist
其中包含几个示例(对于我的用例而言,这些示例属于torch_geometric.data.Data
类型)。每个示例都有一个属性num_nodes
出于演示目的,可以使用以下代码片段创建此类datalist
import torch
from torch_geometric.data import Data # each example is of this type
import networkx as nx # for creating random data
import numpy as np
# the python list containing the examples
datalist = []
for num_node in [9, 11]:
for _ in range(1024):
edge_index = torch.from_numpy(
np.array(nx.fast_gnp_random_graph(num_node, 0.5).edges())
).t().contiguous()
datalist.append(
Data(
x=torch.rand(num_node, 5),
edge_index=edge_index,
edge_attr=torch.rand(edge_index.size(1))
)
)
从上面的datalist
对象中,我可以通过使用DataLoader构造函数天真地(没有任何约束)创建一个torch_geometric.loader.DataLoader
(torch.utils.data.DataLoader
子类
from torch_geometric.loader import DataLoader
dataloader = DataLoader(
datalist, batch_size=128, shuffle=True
)
我的问题是,如何使用DataLoader
类来确保给定批处理中的每个示例对num_nodes
属性具有相同的值?
附注: 我试图解决它,并想出了一个黑客解决方案,通过使用此处的combine_iterators
功能片段组合多个DataLoader
对象,如下所示:
def get_combined_iterator(*iterables):
nexts = [iter(iterable).__next__ for iterable in iterables]
while nexts:
next = random.choice(nexts)
try:
yield next()
except StopIteration:
nexts.remove(next)
datalists = defaultdict(list)
for data in datalist:
datalists[data.num_nodes].append(data)
dataloaders = (
DataLoader(data, batch_size=128, shuffle=True) for data in datalists.values()
)
batches = get_combined_iterator(*dataloaders)
但是,我认为必须有一些优雅/更好的方法来做到这一点,因此这个问题。
如果基础数据集是映射样式的,则可以使用 define 一个torch.utils.data.Sampler
,该返回要一起批处理的示例的索引。此实例将作为batch_sampler
kwarg 传递给您的DataLoader
,您可以删除batch_size
kwarg,因为采样器将根据您的实现方式为您形成批次。
按照 erip 的建议,我对torch.utils.data.sampler.Sampler
进行了子类化以创建一个新的采样器:BucketSampler
它使用torch.utils.data.sampler.SubsetRandomSampler
和torch.utils.data.sampler.BatchSampler
来实现对给定属性具有相同值的示例的批处理。
import torch
from torch.utils.data.sampler import Sampler, BatchSampler, SubsetRandomSampler
class BucketSampler(Sampler):
def __init__(self, dataset, batch_size, start_pos_data, generator=None) -> None:
self.dataset = dataset
self.batch_size = batch_size
self.generator = generator
start_pos_data = start_pos_data
start_end_indices = []
for i in range(len(start_pos_data) - 1):
start_end_indices.append((start_pos_data[i], start_pos_data[i+1]))
start_end_indices.append((start_pos_data[-1], len(self.dataset)))
ranges = [range(start, end) for start, end in start_end_indices]
subset_samplers = [SubsetRandomSampler(range_, generator=generator) for range_ in ranges]
self.samplers = [
BatchSampler(subset_sampler, batch_size, drop_last=False) for subset_sampler in subset_samplers
]
self._len = 0
for sampler in self.samplers:
self._len += len(sampler)
def __iter__(self):
iterators = [iter(sampler) for sampler in self.samplers]
while iterators:
randint = torch.randint(0, len(iterators),size=(1,), generator=self.generator)[0]
try:
yield next(iterators[randint])
except StopIteration:
iterators.pop(randint)
def __len__(self):
return self._len
除了通常的参数外,此类还将start_pos_data
作为参数,该参数是一个列表,其中包含datalist
中的第一个索引(来自相关示例的数据集),属性值在该索引处更改。因此,对于上面的示例,我们可以借助以下代码片段创建这样的列表:
# sort datalist to ensure that data items with the same number of nodes are grouped together
sorted_datalist = sorted(datalist, key = lambda data: data.num_nodes)
# initialize the start_pos_data by 0
start_pos_data = [0]
for i in range(1,len(sorted_datalist)):
if sorted_datalist[i].num_nodes != sorted_datalist[i-1].num_nodes:
# append when the number of nodes changes
start_pos_data.append(i)
现在,start_pos_data
可以传递给初始化采样器的构造函数BucketSampler
bucketSampler = BucketSampler(sorted_datalist, batch_size = 128, start_pos_data = start_pos_data)
在此之后,可以将bucketSampler
作为kwarg
传递给构造函数DataLoader
为:
from torch_geometric.loader import DataLoader
dataloader = DataLoader(sorted_datalist, batch_sampler = bucketSampler)
此dataloader
(迭代时)将以所需的方式生成批处理。