使用数据加载器连接 Kafka 数据



我使用数据加载器来推断 Kafka 中的数据,但它不起作用

这是我的代码

class kfkdataset(Dataset):
def __init__(self,consumer,image_size):
super(kfkdataset).__init__()
self.image_size=image_size
self.consumer = consumer
def __getitem__(self, index):
info = json.loads(next(self.consumer).value)
image_osspath = info['path']
image = prep_image_batch(image_osspath,self.image_size)
return image,image_osspath

def __len__(self):
# You should change 0 to the total size of your dataset.
return 9000000

consumer = KafkaConsumer('my-topic',bootstrap_servers=[])
prodataset = kfkdataset(consumer,image_size=608)#)
k = DataLoader(prodataset,
batch_size=batch_size,
num_workers=16)
for inputimage,osspath in k:
inputimage = inputimage.to(device)
detections,_ = model(inputimage)
detections = non_max_suppression(detections, 0.98, 0.4)

当num_workers为 1 时有效

当num_workers>1: 错误出来了

File "batch_upload.py", line 80, in <module>
for inputimage,osspath in k:
File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py", line 801, in__next__
return self._process_data(data)
File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py", line 846,in_process_data
data.reraise()
File "/usr/local/lib/python3.6/dist-packages/torch/_utils.py", line 369, in reraise
raise self.exc_type(msg)
FileExistsError: Caught FileExistsError in DataLoader worker process 1.
Original Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/worker.py", line 178, in _worker_loop
data = fetcher.fetch(index)
File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/fetch.py", line 44, in fetch
data = [self.dataset[idx] for idx in possibly_batched_index]
File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/fetch.py", line 44, in <listcomp>
data = [self.dataset[idx] for idx in possibly_batched_index]
File "/appbatch/utils/utils.py", line 49, in __getitem__
info = json.loads(next(self.consumer).value)
File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 1192, in __next__
return self.next_v2()
File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 1200, in next_v2
return next(self._iterator)
File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 1115, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 654, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 701, in _poll_once
self._client.poll(timeout_ms=timeout_ms)
File "/usr/local/lib/python3.6/dist-packages/kafka/client_async.py", line 600, in poll
self._poll(timeout / 1000)
File "/usr/local/lib/python3.6/dist-packages/kafka/client_async.py", line 629, in _poll
self._register_send_sockets()
File "/usr/local/lib/python3.6/dist-packages/kafka/client_async.py", line 619, in _register_send_sockets
self._selector.modify(key.fileobj, events, key.data)
File "/usr/lib/python3.6/selectors.py", line 261, in modify
key = self.register(fileobj, events, data)
File "/usr/lib/python3.6/selectors.py", line 412, in register
self._epoll.register(key.fd, epoll_events)
FileExistsError: [Errno 17] File exists

我想知道如何让它工作

基本上,在 PyTorch 的 DataLoader 中设置num_workers > 1是创建多个工作进程,这些进程又竞标到同一个套接字端口,因为只有一个消费者。

并行化和改进从 Kafka 导入数据的一种方法是在同一使用者组中为该主题创建多个使用者。

相关内容

  • 没有找到相关文章