请求的 stream=True 选项如何一次流式传输一个块的数据?



我使用以下代码来测试HTTP连接可以保持活动的秒数:

start_time = time.time()
try:
r = requests.get(BIG_FILE_URL, stream=True)
total_length = r.headers['Content-length']
for chunk in r.iter_content(chunk_size=CHUNK_SIZE): 
time.sleep(1)
# ... except and more logic to report total time and percentage downloaded

为了确保Python不会一次下载所有内容并创建生成器,我使用了tcpdump。它确实每秒(大约(发送一个数据包,但我没有发现是什么让服务器一次发送一个块,以及请求库是如何做到这一点的。

我已经检查了几个SOF问题,并查看了请求库文档,但所有资源都解释了如何使用库下载大型文件,而没有一个资源解释stream=True选项的内部内容。

我的问题是:在tcp协议或HTTP请求头中,是什么让服务器一次发送一个块,而不是一次发送整个文件?

编辑+可能的答案:

在使用Wireshark之后,我发现Python使用TCP的滑动窗口来实现它。也就是说,在不调用下一个区块时,它不会发送ack。

这可能会导致一些意外的行为,因为滑动窗口可能比块大得多,并且代码中的块可能不代表实际的数据包
示例:如果您将块设置为1000字节,则默认的64K滑动窗口(我在Ubuntu 18上的默认窗口(将导致立即发送64个块。如果主体大小小于64K,则连接可能会立即关闭。因此,这不是保持在线连接的好主意。

用户文档中没有对此进行解释。通过查看requests的源代码,我发现如果我们在requests.get(...)中设置stream=True,那么在HTTP头中就设置了headers['Transfer-Encoding'] = 'chunked'。从而指定分块传输编码。在分块传输编码中,数据流被划分为一系列不重叠的"块"。块由服务器彼此独立地发送。希望这能回答问题。

这个问题引起了我的好奇心,所以我决定去这个研究兔子洞。以下是我的一些(有待更正!(发现:

  • 客户端到服务器的通信是由开放系统互连模型(OSI(标准化的

  • 数据传输由第4层(传输层(处理。TCP/IP总是将数据分成数据包。IP数据包的最大长度约为65.5K字节。

    现在,是什么阻止Python在返回原始文件之前将所有这些数据包重新组合到原始文件中

    请求iter_content方法有一个嵌套的生成器,它封装了一个urllib3生成器方法:class urllib3.response.HTTPResponse(...).stream(...)

    "chunk_size"参数似乎设置了一个缓冲区,用于在将数据写入文件系统之前,将多少数据从打开的套接字读取到内存中。

    以下是一份有用的iter_content方法:
def iter_content(self, chunk_size=1, decode_unicode=False):
"""Iterates over the response data.  When stream=True is set on the
request, this avoids reading the content at once into memory for
large responses.  The chunk size is the number of bytes it should
read into memory.  This is not necessarily the length of each item
returned as decoding can take place.
chunk_size must be of type int or None. A value of None will
function differently depending on the value of `stream`.
stream=True will read data as it arrives in whatever size the
chunks are received. If stream=False, data is returned as
a single chunk.
If decode_unicode is True, content will be decoded using the best
available encoding based on the response.
"""
def generate():
# Special case for urllib3.
if hasattr(self.raw, 'stream'):
try:
for chunk in self.raw.stream(chunk_size, decode_content=True):
yield chunk
except ProtocolError as e:
raise ChunkedEncodingError(e)
except DecodeError as e:
raise ContentDecodingError(e)
except ReadTimeoutError as e:
raise ConnectionError(e)
else:
# Standard file-like object.
while True:
chunk = self.raw.read(chunk_size)
if not chunk:
break
yield chunk
self._content_consumed = True
if self._content_consumed and isinstance(self._content, bool):
raise StreamConsumedError()
elif chunk_size is not None and not isinstance(chunk_size, int):
raise TypeError("chunk_size must be an int, it is instead a %s." % type(chunk_size))
# simulate reading small chunks of the content
reused_chunks = iter_slices(self._content, chunk_size)
stream_chunks = generate()
chunks = reused_chunks if self._content_consumed else stream_chunks
if decode_unicode:
chunks = stream_decode_response_unicode(chunks, self)

return chunks

最新更新