Data-MessagePack-Stream

 view release on metacpan or  search on metacpan

msgpack-3.3.0/example/x3/stream_unpack.cpp  view on Meta::CPAN

#include <thread>

// MSGPACK_USE_X3_PARSE should be defined before including msgpack.hpp
// It usually defined as a compiler option as -DMSGPACK_USE_X3_PARSE.

//#define MSGPACK_USE_X3_PARSE

#include <msgpack.hpp>

#include <boost/asio.hpp>
#include <boost/coroutine2/all.hpp>

#if defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#endif // defined(__clang__)

#include <boost/spirit/home/support/multi_pass.hpp>

#if defined(__clang__)
#pragma GCC diagnostic pop
#endif // defined(__clang__)

namespace as = boost::asio;
namespace x3 = boost::spirit::x3;
namespace coro2 = boost::coroutines2;

using pull_type = coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::pull_type;

// iterator fetching data from coroutine2.
class buffered_iterator : public std::iterator<std::input_iterator_tag, char> {
public:
    using pointer_t = typename iterator::pointer;
    using reference_t = typename iterator::reference;

    explicit buffered_iterator(pull_type& source) noexcept
        : source_{ &source } {
        fetch_();
    }
    buffered_iterator() = default;

msgpack-3.3.0/example/x3/stream_unpack.cpp  view on Meta::CPAN

};

// session class that corresponding to each client
class session : public std::enable_shared_from_this<session> {
public:
    session(as::ip::tcp::socket socket)
        : socket_(std::move(socket)) {
    }

    void start() {
        sink_ = std::make_shared<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type>(
            [&, this](pull_type& source) {
                // *1 is started when the first sink is called.

                std::cout << "session started" << std::endl;
                do_read();
                source();

                // use buffered_iterator here
                // b is incremented in msgpack::unpack() and fetch data from sink
                // via coroutine2 mechanism
                auto b = boost::spirit::make_default_multi_pass(buffered_iterator(source));
                auto e = boost::spirit::make_default_multi_pass(buffered_iterator());

                // This is usually an infinity look, but for test, loop is finished when
                // two message pack data is processed.
                for (int i = 0; i != 2; ++i) {
                    auto oh = msgpack::unpack(b, e);
                    std::cout << oh.get() << std::endl;
                }
            }

msgpack-3.3.0/example/x3/stream_unpack.cpp  view on Meta::CPAN

                    data->resize(length);
                    (*sink_)(data);
                    do_read();
                }
            }
        );
    }

    as::ip::tcp::socket socket_;
    static constexpr std::size_t const max_length = 1024;
    std::shared_ptr<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type> sink_;
};

class server {
public:
    server(
        as::io_service& ios,
        std::uint16_t port)
        : acceptor_(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port)),
          socket_(ios) {
        do_accept();



( run in 0.294 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )