用python接收UDP数据包,数据包丢失



我在python中使用UDP时有很多数据包丢失。我知道如果我不想丢失数据包,我应该使用TCP,但我不能(完全)控制发送者。

这是一款使用UDP多播每秒发送15张图像的相机。

下面是我现在编写的代码。它使用多处理来允许生产者和消费者功能并行工作。生产者函数捕获数据包,消费者函数处理数据包并将图像写入.bmp文件。

我已经编写了一个类PacketStream,它将包中的字节写入.bmp文件。

当相机发送新图像时,它首先发送一个数据包,第一个字节=0x01。其中包含有关图像的信息。然后发送612个分组,其中第一个字节=0x02。这些包含图像中的字节(508字节/包)。

由于每秒发送15个图像,因此每秒发送约9000个数据包。尽管这种情况在每个图像的突发中发生的速度更快,约为22个数据包/毫秒。

我可以使用tcpdump或wireshark完美地接收所有数据包。但是使用下面的代码,数据包会丢失。我的Windows7电脑肯定能处理这个问题吗?我还在树莓派3上使用它,但或多或少会丢失相同数量的数据包。因此,我认为这是代码的问题。

我尝试了很多不同的方法,比如线程而不是多处理,管道而不是队列。

我还尝试使用增加套接字缓冲区

sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000)

但无济于事。

这在python中可能吗?

提前感谢

import time
from multiprocessing import Process, Queue
import socket
import struct
from PIL import Image

class PacketStream:
def __init__(self, output_path):
self.output_path = output_path
self.data_buffer = ''
self.img_id = -1  # -1 = waiting for start of new image
def process(self, data):
message_id = data[0]
if message_id == 'x01':
self.wrap_up_last_image()
self.img_id = ord(data[3])
self.data_buffer = ''
if message_id == 'x02':
self.data_buffer += data[6:]
def wrap_up_last_image(self):
if self.img_id > 0:
n_bytes = len(self.data_buffer)
if n_bytes == 307200:
global i
write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp',
self.data_buffer)
i += 1
else:
print 'Image lost: %s bytes missing.' % (307200 - n_bytes)

def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path

def producer(q):
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))
mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
q.put(sock.recv(512))

def consumer(q):
packet_stream = PacketStream('D:/bmpdump/')
while True:
data = q.get()
packet_stream.process(data)
i = 0
if __name__ == '__main__':
q = Queue()
t1 = Process(target=producer, args=(q,))
t1.daemon = True  # so they stop when the main prog stops
t1.start()
t2 = Process(target=consumer, args=(q,))
t2.daemon = True
t2.start()
time.sleep(10.0)
print 'Program finished.'

编辑

谢谢你的建议。

1) 我已经尝试了线程+队列,还有'.join(),似乎没有太大区别。我确信现在的问题是生产者线程没有得到足够的优先级。我找不到如何使用Python来增加这一点?这可能吗?

2) 使用下面的代码,我只损失了大约10%。处理器处于~25%(在树莓π上)关键是当数据包流中出现暂停时,即当最后一个数据包到达时,消耗数据

import time
import socket
import struct
from PIL import Image

def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def consume(data_buffer):
img_id = ord(data_buffer[0][1])
real_data_buffer = [data[6:] for data in data_buffer]
data_string = ''.join(real_data_buffer)
global i
write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string)
i += 1
def producer(sock):
print 'Producer start'
data_buffer = []
while True:
data = sock.recvfrom(512)[0]
if data[0] == 'x01':
data_buffer = []
else:
data_buffer.append(data)
if len(data_buffer) == 612:
consume(data_buffer)

# image counter
i = 0
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((MCAST_GRP, MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000)
producer(sock)

一些改进代码的建议,但首先要问一个问题:你衡量过什么可能会减慢速度吗?例如,您查看过系统的CPU使用情况吗。如果你达到100%,那很可能是数据包丢失的原因。如果它大部分是空闲的,那么就有其他事情发生,并且问题与代码的性能无关。

现在,一些改进代码的建议:

  • 在处理UDP套接字时使用socket.recvfrom而不是sock.recv
  • 不要在进程中使用多处理,如果我们谈论的是每秒9000次调用,那么将数据从一个进程发送到另一个进程所必须进行的序列化很可能会成为性能瓶颈。请尝试使用线程(threading+queue模块)。但由于你没有提供任何观察到的数字,所以很难说真的
  • 在接收数据包时,不要使用字符串串联来建立接收器的缓冲区。这会创建大量新的临时字符串对象,并一直复制数据。相反,将每个数据包附加在一个列表中,当您收到所有数据时,在最后将它们"".join(packets)一起

最新更新