找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 9440|回复: 2

thrift的asio实现

[复制链接]
发表于 2013-12-15 21:38:56 | 显示全部楼层 |阅读模式
thrift是目前能找到的轻量级,高性能,非常灵活的一个rpc。
官方的实现是用libevent2+多线程做为网络接口,网上也有人有了asio的实现。

写了个简单的
#pragma once
#include "Api.h"
#include <iostream>
#include <stdexcept>
#include <sstream>
#include "rpc_conn.h"

using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

class RpcJgServer : virtual public jgrpc::ApiIf
{
public:
    explicit RpcJgServer(const std::string& address, int port,std::size_t thread_pool_size);

    void run();

    void stop();

private:
    void start_accept();
    void handle_accept(RpcConnectionPtr conn,const asio::error_code& e);

    std::size_t thread_pool_size_;
    asio::io_service io_service_;

    asio::ip::tcp::acceptor acceptor_;

    boost::shared_ptr<TProcessor> processor_;
    boost::shared_ptr<TProtocolFactory> input_protocol_factory_;
    boost::shared_ptr<TProtocolFactory> output_protocol_factory_;
    boost::shared_ptr<jgrpc::ApiIf> evt_If;

public:
    std::set<RpcConnectionPtr> clients_;//已登录的客户端
    asio::detail::mutex mutex_;//是一把锁

public:
    void jgrpc::ApiIf::ping(void){};
};
#include "stdafx.h"
#include "rpc_jg.h"

RpcJgServer::RpcJgServer(const std::string& address, int port,std::size_t thread_pool_size)
               : thread_pool_size_(thread_pool_size),
               acceptor_(io_service_, asio::ip::tcp::endpoint(asio::ip::address_v4::from_string(address), port))
{
    boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
    input_protocol_factory_=protocolFactory;
    output_protocol_factory_=protocolFactory;
    evt_If.reset(this);
    processor_.reset(new jgrpc::ApiProcessor(evt_If));
    start_accept();
}

void RpcJgServer::start_accept()
{
    RpcConnectionPtr new_connection;
    new_connection.reset(new RpcConnection(io_service_, processor_,input_protocol_factory_, output_protocol_factory_));
    acceptor_.async_accept(new_connection->socket(),
        boost::bind(&RpcJgServer::handle_accept, this,
        new_connection,
        asio::placeholders::error));   
}
void RpcJgServer::run()
{
    std::vector<boost::shared_ptr<asio::thread> > threads;
    for (std::size_t i = 0; i < thread_pool_size_; ++i)
    {
        boost::shared_ptr<asio::thread> thread(new asio::thread(
            boost::bind(&asio::io_service::run, &io_service_)));
        threads.push_back(thread);
    }

    for (std::size_t i = 0; i < threads.size(); ++i) {
        threads->join();
    }
}

void RpcJgServer::stop()
{
    io_service_.stop();
}

void RpcJgServer::handle_accept(RpcConnectionPtr conn,const asio::error_code& e)
{
    if (!e)
    {
        //这里进行处理
        conn->start();
    }
    start_accept();   
}
  1. #pragma once
  2. using namespace apache::thrift;
  3. using namespace apache::thrift::protocol;
  4. using namespace apache::thrift::transport;
  5. class RpcConnection  : public boost::enable_shared_from_this<RpcConnection>,private boost::noncopyable
  6. {
  7. public:
  8.         explicit RpcConnection(asio::io_service& io_service,
  9.                 boost::shared_ptr<TProcessor> processor,
  10.                 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
  11.                 boost::shared_ptr<TProtocolFactory> outputProtocolFactory);
  12.         asio::ip::tcp::socket& socket()
  13.         {
  14.                 return socket_;
  15.         }
  16.         void start()
  17.         {
  18.                 asio::async_read(socket_, asio::buffer(length_),
  19.                         strand_.wrap(
  20.                         boost::bind(&RpcConnection::handle_read_length, shared_from_this(),
  21.                         asio::placeholders::error)));
  22.         }
  23.         static RpcConnection *get_current_conn();
  24. private:
  25.         void handle_read_length(const asio::error_code& e);
  26.         void handle_read_frame(int32_t sz, const asio::error_code& e);
  27.         void handle_write(const asio::error_code& e);
  28.         asio::io_service::strand strand_;
  29.         asio::ip::tcp::socket socket_;
  30.         boost::array<uint8_t, 4> length_;
  31.         boost::shared_ptr<TProcessor> processor_;
  32.         boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
  33.         boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
  34.         std::vector<uint8_t> frame_;
  35.         asio::io_service& io_service_;
  36.        
  37. };
  38. typedef boost::shared_ptr<RpcConnection> RpcConnectionPtr;
复制代码
  1. #include "stdafx.h"
  2. #include "rpc_conn.h"
  3. #include <boost/thread/tss.hpp>
  4. static boost::thread_specific_ptr<RpcConnection> tls_sessiondata;
  5. static boost::shared_ptr<boost::detail::tss_cleanup_function> tls_cleanup;
  6. RpcConnection *RpcConnection::get_current_conn()
  7. {
  8.         RpcConnection *cursession=tls_sessiondata.get();
  9.         return cursession;
  10. }
  11. RpcConnection::RpcConnection(asio::io_service& io_service,
  12.                                            boost::shared_ptr<TProcessor> processor,
  13.                                            boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
  14.                                            boost::shared_ptr<TProtocolFactory> outputProtocolFactory
  15.                                            )
  16.                                            : strand_(io_service),
  17.                                            socket_(io_service),
  18.                                            processor_(processor),
  19.                                            inputProtocolFactory_(inputProtocolFactory),
  20.                                            outputProtocolFactory_(outputProtocolFactory),
  21.                                            io_service_(io_service)
  22. {
  23. }
  24. void RpcConnection::handle_read_length(const asio::error_code& e)
  25. {
  26.         if (!e)
  27.         {
  28.                 int32_t sz =
  29.                         ((length_[0] & 0xff) << 24) |
  30.                         ((length_[1] & 0xff) << 16) |
  31.                         ((length_[2] & 0xff) <<  8) |
  32.                         ((length_[3] & 0xff));
  33.                 if (sz < 0)
  34.                 {
  35.                         std::cerr << "Frame length is negative. Are you sure your client uses a TFramedTransport?" << std::endl;
  36.                         delete this;
  37.                 }
  38.                 else if (sz == 0)
  39.                 {
  40.                         std::cerr << "Frame length is zero, weird" << std::endl;
  41.                         delete this;
  42.                 }
  43.                 else
  44.                 {
  45.                         frame_.resize(sz);
  46.                         asio::async_read(socket_,
  47.                                 asio::buffer(frame_, sz),
  48.                                 strand_.wrap(
  49.                                 boost::bind(&RpcConnection::handle_read_frame, shared_from_this(), sz,asio::placeholders::error)));
  50.                 }
  51.         }
  52. }
  53. //放到tls里面
  54. void RpcConnection::handle_read_frame(int32_t sz, const asio::error_code& e)
  55. {
  56.         if (!e)
  57.         {
  58.                 boost::detail::set_tss_data(&tls_sessiondata,tls_cleanup,this,false);
  59.                 boost::shared_ptr<TMemoryBuffer> inputTransport(new TMemoryBuffer(&frame_[0], sz));
  60.                 boost::shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
  61.                 boost::shared_ptr<TMemoryBuffer> outputTransport(new TMemoryBuffer);
  62.                 boost::shared_ptr<TFramedTransport> outputFramedTransport(new TFramedTransport(outputTransport));
  63.                 boost::shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputFramedTransport);
  64.                 processor_->process(inputProtocol, outputProtocol,NULL);
  65.                 uint32_t outframeSize;
  66.                 uint8_t *outframe;
  67.                 outputTransport->getBuffer(&outframe, &outframeSize);
  68.                 asio::async_write(socket_,
  69.                         asio::buffer(outframe, outframeSize),
  70.                         strand_.wrap(boost::bind(&RpcConnection::handle_write, shared_from_this(), asio::placeholders::error)));
  71.         }
  72. }
  73. void RpcConnection::handle_write(const asio::error_code& e)
  74. {
  75.         if (!e)
  76.         {
  77.                 asio::async_read(socket_, asio::buffer(length_),
  78.                         strand_.wrap(boost::bind(&RpcConnection::handle_read_length, shared_from_this(), asio::placeholders::error)));
  79.         }
  80. }
复制代码
 楼主| 发表于 2013-12-15 21:39:43 | 显示全部楼层
没有实现servercontext接口,因为在这个框架下你可以直接通过tls直接获取connection。
本代码没有经过哪怕简单的测试。
 楼主| 发表于 2013-12-15 21:43:09 | 显示全部楼层
注:TFramedTransport这个东东其实就是在传输前加上一个长度。所以代码中读的时候先读一个整数,写的时候就直接调用的
boost::shared_ptr<TFramedTransport> outputFramedTransport(new TFramedTransport(outputTransport));
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-21 17:02 , Processed in 0.017574 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表