Libcurl + rapidjson for Streaming Deserialization?



我一直在扯头发(虽然一开始并不多),试图找出一种方法来访问 Web 服务,然后在块中解析和反序列化 json,同时解析到我的对象中而不存储整个文档 (500mb+)。 实际上,我正在尝试使用 SAX 风格的解析将 libcurl 和 rapidjson 直接连接到我的类中。

也许我想错了,但我目前的想法是,在curl_writeback期间将接收到的数据添加到 json 文本流中。 我正在努力解决的是如何启动然后让 rapidjson 解析引擎等待下一个数据块从 curl 写回中传入。 在处理未完成的令牌之前停止,直到收到新的流缓冲区。 我不想附加缓冲区,因为我想释放以前的缓冲区。

以前有人这样做过吗?

这里有一些代码来说明这个概念...但显然不完整(可能全错了!

下面是将信息传递给curl_write的结构,以便它知道如何对解析器进行回调。

struct CurlHandler {
    stringstream *data;
    Reader *reader;
    void *parsehandler;
};

然后在启动请求之前,我按如下方式填充结构:

rapidjson::Reader reader;
CurlHandler handler;
ParseHandler parser;
handler.reader = &reader;
handler.parsehandler = &parser;
SetOption(CURLOPT_WRITEDATA, (void *) &handler);

ParseHandler是rapidjson进行SAX解析所需的类。 这个想法是在curl_write期间调用 rapidjson 解析器(以某种方式):

static size_t curl_write(void *ptr, size_t size, size_t nmemb, void *CurlHandlerPtr) {
    CurlHandler *handler = (CurlHandler *) CurlHandlerPtr;
//  handler->reader.Parse(handler->data, handler->parsehandler);   ????????
    return 1;
}

我当然希望有比这更简单的答案。

以下是上下文的完整代码:

#pragma once
#include <string>
#include <iostream>
#include "curlcurl.h"
#include "rapidjsonrapidjson.h"
#include "rapidjsonreader.h"
using namespace std;
using namespace rapidjson;
struct CurlHandler {
    stringstream *data;
    Reader *reader;
    void *parsehandler;
};
static size_t curl_write(void *ptr, size_t size, size_t nmemb, void *CurlHandlerPtr) {
    CurlHandler *handler = (CurlHandler *) CurlHandlerPtr;
//  handler->reader.Parse(handler->data, handler->parsehandler);   ????????
    return 1;
}

class ParseHandler {
    bool Null() { cout << "Null()" << endl; return true; }
    bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
    bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
    bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; }
    bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
    bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
    bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
    bool String(const char* str, SizeType length, bool copy) {
        cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
        return true;
    }
    bool StartObject() { cout << "StartObject()" << endl; return true; }
    bool Key(const char* str, SizeType length, bool copy) {
        cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
        return true;
    }
    bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
    bool StartArray() { cout << "StartArray()" << endl; return true; }
    bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
};
class StreamTest {
private:
    string _username;
    string _password;
    CURL *_curl;
    CURLcode _error;
public:
    StreamTest() {
        curl_global_init(CURL_GLOBAL_ALL);
        _curl = curl_easy_init();
    };
    ~StreamTest() {
        curl_global_cleanup();
    };
    int Initialize(string username, string password) {
        _username = username;
        _password = password;
        return 0;
    }
    int Get(std::string url) {
        CURLcode result;
        rapidjson::Reader reader;
        CurlHandler handler;
        ParseHandler parser;
        handler.reader = &reader;
        handler.parsehandler = &parser;
        std::string s = _username; s += ":"; s += _password;
        SetOption(CURLOPT_USERPWD, s.c_str());
        SetOption(CURLOPT_URL, url.c_str());
        SetOption(CURLOPT_WRITEFUNCTION, curl_write);
        SetOption(CURLOPT_WRITEDATA, (void *) &handler);
        result = curl_easy_perform(_curl);
        return 0;
    }
    template <typename ValueType>
    StreamTest& SetOption(CURLoption opt, ValueType val) {
        if(_curl && _error == CURLE_OK) {
            _error = curl_easy_setopt(_curl, opt, val);
        }
        return *this;
    }
};
int _tmain(int argc, _TCHAR* argv[]) {
    StreamTest http;
    http.Initialize("guest", "passwd");
    http.Get("http://localhost/GetData");
    cin.get();
}

编辑:

理想情况下是 http。_tmain中的 get 函数将传入对要反序列化的对象的引用。 该对象将以各种回调方式接收来自 rapidjson 的所有 SAX 事件。

RapidJSON 的设计本身不支持在解析过程中暂停/恢复。因为这会增加备份/还原解析状态的大量开销。

但是,上述需求可以通过使用 Libcurl 的多接口来解决,该接口实际上使用多线程。

libcurl 中的一个例子 fopen.c 展示了如何模拟 HTTP GET 作为标准文件 I/O API。使用此示例应该可以方便地修改rapidjson::FileReadStream以支持从 HTTP 解析 JSON。

一般来说,开发人员在使用 RapidJSON 时可能会使用多线程来解决此类问题。

下面是一个概念:创建一个具有读取和写入端的匿名管道。在传递给 WRITEFUNCTION 的回调中,将从 curl 接收的字节写入管道的写入端。然后在另一个线程中,不是将rapidjson::StringStream对象传递给reader.Parse<>(),而是传递从管道读取端创建的rapidjson::FileReadStream对象。当回调写入管道时,SAX 读取器会读取和分析管道中的数据。

相关内容

  • 没有找到相关文章

最新更新