Data-MessagePack-Stream

 view release on metacpan or  search on metacpan

msgpack-3.3.0/example/x3/stream_unpack.cpp  view on Meta::CPAN

public:
    session(as::ip::tcp::socket socket)
        : socket_(std::move(socket)) {
    }

    void start() {
        sink_ = std::make_shared<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type>(
            [&, this](pull_type& source) {
                // *1 is started when the first sink is called.

                std::cout << "session started" << std::endl;
                do_read();
                source();

                // use buffered_iterator here
                // b is incremented in msgpack::unpack() and fetch data from sink
                // via coroutine2 mechanism
                auto b = boost::spirit::make_default_multi_pass(buffered_iterator(source));
                auto e = boost::spirit::make_default_multi_pass(buffered_iterator());

                // This is usually an infinity look, but for test, loop is finished when
                // two message pack data is processed.
                for (int i = 0; i != 2; ++i) {
                    auto oh = msgpack::unpack(b, e);
                    std::cout << oh.get() << std::endl;
                }
            }
        );
        // send dummy data to start *1
        (*sink_)({});
    }

private:
    void do_read() {
        std::cout << "session do_read() is called" << std::endl;
        auto self(shared_from_this());
        auto data = std::make_shared<std::vector<char>>(static_cast<std::size_t>(max_length));
        socket_.async_read_some(
            boost::asio::buffer(*data),
            [this, self, data]
            (boost::system::error_code ec, std::size_t length) {
                if (!ec) {
                    data->resize(length);
                    (*sink_)(data);
                    do_read();
                }
            }
        );
    }

    as::ip::tcp::socket socket_;
    static constexpr std::size_t const max_length = 1024;
    std::shared_ptr<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type> sink_;
};

class server {
public:
    server(
        as::io_service& ios,
        std::uint16_t port)
        : acceptor_(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port)),
          socket_(ios) {
        do_accept();
        std::cout << "server start accept" << std::endl;
        ios.run();
    }

private:
    void do_accept() {
        acceptor_.async_accept(
            socket_,
            [this](boost::system::error_code ec) {
                if (!ec) {
                    std::make_shared<session>(std::move(socket_))->start();
                }
                // for test, only one session is accepted.
                // do_accept();
            }
        );
    }

    as::ip::tcp::acceptor acceptor_;
    as::ip::tcp::socket socket_;
};

int main() {
    std::thread srv(
        []{
            boost::asio::io_service ios;
            server s(ios, 12345);
        }
    );

    std::thread cli(
        []{
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::cout << "client start" << std::endl;

            std::stringstream ss;
            std::map<std::string, std::vector<int>> v1 {
                { "ABC", { 1, 2, 3 } },
                { "DEFG", { 4, 5 } }
            };
            std::vector<std::string> v2 {
                "HIJ", "KLM", "NOP"
                    };
            msgpack::pack(ss, v1);
            msgpack::pack(ss, v2);

            auto send_data = ss.str();

            boost::asio::io_service ios;
            as::ip::tcp::resolver::query q("127.0.0.1", "12345");
            as::ip::tcp::resolver r(ios);
            auto it = r.resolve(q);

            std::cout << "client connect" << std::endl;
            as::ip::tcp::socket s(ios);
            as::connect(s, it);




( run in 1.417 second using v1.01-cache-2.11-cpan-524268b4103 )