通过扭曲的FTP流式传输HTTP正文



我有一个自定义的FTP服务器,它与文件夹列表等的api进行通信,文件从api返回为url。我试图打开一个指向这些url的http流,并通过ftp将数据反馈给客户端(以非阻塞的方式),但我不知道如何将其连接起来。

我试着用一个最小的例子来更好地解释我的问题。在该示例中,它在端口2121上启动本地FTP服务器,该服务器列出了本地文件系统,但在下载文件时,它返回www.yahoo.com的内容主体,而不是文件数据。

我尝试通过io.BytesIO对象缓冲数据,但没有数据发送。我想知道这是不是正确的方法,或者可能是因为读取指针总是在文件对象的末尾?

下面是示例代码:

import io
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell, _FileReader
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess, FilePasswordDB
from twisted.internet import defer
agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
class StreamWriter(Protocol):
    def __init__(self, finished, stream):
        self.finished = finished
        self.stream = stream
    def dataReceived(self, bytes):
        self.stream.write(bytes)
    def connectionLost(self, reason):
        print 'Finished receiving body:', reason.type, reason.value
        self.finished.callback(None)

def streamBody(response, stream):
    finished = Deferred()
    response.deliverBody(StreamWriter(finished, stream))
    return finished
def openForReading(self, path):
    d = agent.request("GET", "http://www.yahoo.com")
    stream = io.BytesIO()
    d.addCallback(lambda resp: streamBody(resp, stream))
    d.addErrback(log.err)
    return defer.succeed(_FileReader(stream))
def main():
    FTPAnonymousShell.openForReading = openForReading
    p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])
    f = FTPFactory(p)
    reactor.listenTCP(2121, f)
    reactor.run()
if __name__ == "__main__":
    main()

编辑

class FinishNotifier(ProtocolToConsumerAdapter, Protocol):
    def __init__(self, original):
        ProtocolToConsumerAdapter.__init__(self, original)
        self.finished = defer.Deferred()
    def connectionLost(self, reason=connectionDone):
        reason.trap(ConnectionDone, ResponseDone)
        self.finished.callback(None)
class HTTP2FTP(object):
    def __init__(self, response):
        self.response = response
    def send(self, consumer):
        protocol = FinishNotifier(consumer)
        self.response.deliverBody(protocol)
        return protocol.finished
def openForReading(self, path):
    d = agent.request("GET", "http://www.yahoo.com")
    d.addCallback(HTTP2FTP)
    d.addErrback(log.err)
    return d

更新的可运行示例:

from twisted.python import log
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ConsumerToProtocolAdapter, connectionDone
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder, HTTPConnectionPool, HTTPClientFactory
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess
from twisted.internet import defer
from twisted.internet.error import ConnectionDone
from twisted.web._newclient import ResponseDone
agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
# pool= HTTPConnectionPool(reactor,persistent=True)
# pool.maxPersistentPerHost = 2
# agent= Agent(reactor,pool=pool,connectTimeout=5)
class FinishNotifier(ConsumerToProtocolAdapter):
    def __init__(self, original):
        ConsumerToProtocolAdapter.__init__(self, original)
        self.finished = defer.Deferred()
    def connectionLost(self, reason=connectionDone):
        reason.trap(ConnectionDone, ResponseDone)
        print "finished"
        self.finished.callback(None)
class HTTP2FTP(object):
    def __init__(self, response):
        self.response = response
    def send(self, consumer):
        print consumer
        protocol = FinishNotifier(consumer)
        self.response.deliverBody(protocol)
        return protocol.finished
def openForReading(self, path):
    d = agent.request("GET", "http://www.testtest.com")
    d.addCallback(HTTP2FTP)
    d.addErrback(log.err)
    return d
def main():
    FTPAnonymousShell.openForReading = openForReading
    p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])
    f = FTPFactory(p)
    reactor.listenTCP(2121, f)
    reactor.run()
if __name__ == "__main__":
    main()

如果是因为读取指针总是在文件对象的末尾,也许?

可能是这样。你有两件事同时发生。HTTP客户端正在向BytesIO实例写入,而FTP客户端正在从中读取。_FileReader(一个专用的API,Twisted的FTP库的实现细节,而不是您实际应该使用的东西)用于从已经完成的文件中读取,而不是在读取时正在增长的文件。

幸运的是,不需要通过异步不友好的file接口。看看openForReading应该返回的类型——IReadFile提供程序。IReadFile有一个方法send,它接受一个提供IConsumer的对象。

另一方面,您有接受IProtocoldeliverBody。此协议具有传递给它的HTTP响应主体。这是您要提供给传递给IReadFile.sendIConsumer的数据。

因此,与其试图让这两个部分与BytesIO一起工作,不如让它们使用所涉及的两个接口:IProtocolIConsumer一起工作。这是一个草图(里面有很多错误,但总体形状是正确的):

from twisted.internet.protocol import ConsumerToProtocolAdapter
from twisted.internet.interfaces import IPushProducer
from twisted.protocols.ftp import IReadFile
class FinishNotifier(ConsumerToProtocolAdapter):
    def connectionLost(self, reason):
        reason.trap(ConnectionDone)
        self.finished.callback(None)
@implementer(IReadFile, IPushProducer)
class HTTP2FTP(object):
    def send(self, consumer):
        protocol = FinishNotifier(consumer)
        response.deliverBody(protocol)
        # Lazy hack.
        # This code probably belongs in protocol.connectionMade instead.
        self._producer = protocol.transport
        consumer.registerProducer(self._producer, streaming=True)
        protocol.finished.addCallback(
            lambda ignored: consumer.unregisterProducer()
        )
        return protocol.finished
    def pauseProducing(self):
        self._producer.pauseProducing()
    def resumeProducing(self):
        self._producer.resumeProducing()
    def stopProducing(self):
        self._producer.stopProducing()

请注意,通过在这里实现IPushProducer,我们可以在HTTP和FTP连接之间进行流控制(这样,即使HTTP连接传输数据的速度比FTP连接快得多,服务器上的内存使用率也会受到限制)。这是一件非常酷的事情,很高兴它只需要几行额外的代码就可以实现。稍微不那么酷的是,必须在正确的时间进行unregisterProducer调用。FTP协议实现将此作为数据已完全传输的指示。这可能在Twisted中没有得到充分的记录(这是一个应该纠正的疏忽)。

最新更新