Linux下asio如何利用reactor来模拟proactor模式

高并发网络有reactor和proactor两种模式,reactor基于非阻塞io多路复用,本质上属于同步io,而proactor模式则基于异步io. 类unix平台一般都采用reactor模式,如epoll和kqueue等.而Windows平台因为对于reactor模式的支持不好,但是对于异步io有IOCP非常高效. 作为有可能在c++20中进入标准的网络库,asio需要对不同平台都提供支持,这样就造成了只能用一种模式去模拟另一种模式,asio选择在linux下面使用reactor模拟proactor模式.

言归正传,本文就一个主题:在linux下面asio到底是如何借助reactor如epoll来模拟proactor异步模式的.

1. reactor和proactor模式之间到底有什么区别

首先reactor模式依赖于epoll这样的io多路复用机制,能监听大量文件描述符的读写,在有可读写时发生超时或收到信号时从阻塞状态退出,用户程序根据发生的事件进行相应的处理. 本质上来看reactor模式还是同步模式,当前进程一直监控一个事件集合,可读就读,可写就写.

但是proactor模式就不一样了,本质上是异步模式,只要注册了读事件处理函数,就可以继续做其它事情了, 也就是说不需要当前进程再处理读和写操作了,只需要在注册事件处理函数的时候提供一个缓冲区,操作系统就会在可读或可写时将读写完成,然后将数据拷贝到注册的缓冲区,然后通知用户程序事情干完了, windows下的iocp能提供很好的异步io支持.

2. proactor模式需要提供什么接口,满足什么特点

首先可以看下使用asio异步读写的例子, 首先声明asio中最核心的对象io_context,之后依据io_context声明一个socket,然后注册异步写提供缓冲区,最后调用io_context.run()等待事件执行.

asio::io_context io_context;

tcp::socket socket_(io_context);
asio::async_write(socket_, asio::buffer(data_, length),
    [this, self](std::error_code ec, std::size_t /*length*/) { });

io_context.run();

这一套流程使用同步io的思路很容易理解,注册写处理函数,然后run的时候等待事件发生(这里即可写),事件发生后执行事件处理函数,事件完成,退出run循环. 如果从异步的角度来看的话,对于上述流程居然是一样的,就asio而言,即使是异步模式,也必须在io_context.run()执行之后才会开始处理注册的io事件.

那么如何让事件在注册之后尽可能早的执行呢,可以看如下示例,如果有两个读写事件,可以在注册第一个写事件之后立刻启动一个线程来调用 io_context.run() 来处理事件,这样就可以同时等待执行第一个事件,而当前线程继续注册后续任务了. 这样thread1和当前线程在asio中会组成线程池,如果有事件发生了,可以分别获取任务进行执行.

asio::io_context io_context;

tcp::socket socket_(io_context);
asio::async_write(socket_, asio::buffer...
asio::thread thread1([& io_context](){ io_context.run(); });

asio::async_read(socket_, asio::buffer...
io_context.run();

thread1.join();

小结一下,proactor模式需要提供异步接口如 async_write 和 async_accept等,并且可以由一个对象io_context来管理io事件,但是不能像reactor一样由一个线程来管理事件并且处理事件.proactor模式要能够在注册完事件之后马上就能够开始等待处理,并且在底层完成读写操作将数据拷贝到提供的缓冲区.

3. 设想:如何通过reactor模拟proactor

假定我有reactor模式支持如epoll,我该如何实现一个proactor模式呢.首先对于异步接口可以设计为向reactor中添加事件,这是显而易见的.另外io_context.run()操作正好对应着reactor模式中的epoll_wait,正好用来监听事件.

但是最关键的还是没解决,还需要做到让刚注册的读写事件马上就可以在后台执行.虽说借助额外的线程可以直接以阻塞的模式来处理,但是这并不是proactor模式. 如果以非阻塞的方式处理,允许引入额外的线程执行epoll_wait好像可行.但是需要注意的是多个线程可以执行同时执行epoll_wait监控相同的描述符集合吗,应该是不行的.

这时就需要对多个线程进行调度了,需要有一个任务就绪队列,调度的线程中只有一个会执行epoll_wait来监控事件,所以的线程都可以来处理事件.如此,不就异步了吗.

4. linux下asio异步io接口实现

可以从类的继承关系找到异步接口的实现,以tcp::socket为例, 其继承关系如下, 祖父类basic_socket中定义了socket在linux下的类型为reactive_socket_service.

  /// The TCP socket type.
  typedef basic_stream_socket< tcp> socket
  
  template < typename Protocol, typename Executor>
class basic_stream_socket
  : public basic_socket< Protocol, Executor>

template < typename Protocol, typename Executor>
class basic_socket
  : public socket_base
{
  typedef typename detail::reactive_socket_service< 
    Protocol>::native_handle_type native_handle_type;

可以看reactive_socket_service中提供的操作,以async_accept为例,实例化了一个op来包裹了注册在async_accept上的handler,然后用start_accept_op来启动这个op,后续会继续调用start_op以及reactor也就是的start_op函数, 会将op加入到op_queue队列中等待处理, 同时会将对应的fd及事件通过epoll的接口加入到监听事件中.

  template < typename Socket, typename Handler, typename IoExecutor>
  void async_accept(implementation_type&  impl, Socket&  peer,
      endpoint_type* peer_endpoint, Handler&  handler, const IoExecutor&  io_ex)
  {
    bool is_continuation =
      asio_handler_cont_helpers::is_continuation(handler);

    // Allocate and construct an operation to wrap the handler.
    typedef reactive_socket_accept_op< Socket, Protocol, Handler, IoExecutor> op;
    typename op::ptr p = { asio::detail::addressof(handler),
      op::ptr::allocate(handler), 0 };
    p.p = new (p.v) op(impl.socket_, impl.state_, peer,
        impl.protocol_, peer_endpoint, handler, io_ex);

    ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
          & impl, impl.socket_, "async_accept"));

    start_accept_op(impl, p.p, is_continuation, peer.is_open());
    p.v = p.p = 0;
  }

5. 读写上下文io_context和任务调度器scheduler

作为asio中最核心的对象io_context承担着消除不同平台使用时的差异的功能,但是在linux下最核心的实现是在 scheduler中的. 可以看出来,在windows下的实现基于iocp,而没有iocp的话就采用scheduler,而scheduler又可以基于各种reactor模式,比如linux下的epoll.

namespace detail {
#if defined(ASIO_HAS_IOCP)
  typedef class win_iocp_io_context io_context_impl;
  class win_iocp_overlapped_ptr;
#else
  typedef class scheduler io_context_impl;
#endif
} // namespace detail

class io_context
  : public execution_context
{
private:
  typedef detail::io_context_impl impl_type;

在io_context中的run,run_one等函数,最终都是调用scheduler中的对应操作,对于run函数的功能,可以看看asio的注释,从注释可以看出,run会一直阻塞,直到所有的任务都完成了,并且没有handlers要处理了,或者知道io_context被终止了. 而且多个线程可以调用同一个io_context的run()含税,这些线程会自动组成一个线程池共同处理所有的就绪任务.

  /// event processing loop.
  /**
   * The run() function blocks until all work has finished and there are no
   * more handlers to be dispatched, or until the io_context has been stopped.
   *
   * Multiple threads may call the run() function to set up a pool of threads
   * from which the io_context may execute handlers. All threads that are
   * waiting in the pool are equivalent and the io_context may choose any one
   * of them to invoke a handler.
   *
   * A normal exit from the run() function implies that the io_context object
   * is stopped (the stopped() function returns @c true). Subsequent calls to
   * run(), run_one(), poll() or poll_one() will return immediately unless there
   * is a prior call to restart().
   *
   * @param ec Set to indicate what error occurred, if any.
   *
   * @return The number of handlers that were executed.
   *
   * @note Calling the run() function from a thread that is currently calling
   * one of run(), run_one(), run_for(), run_until(), poll() or poll_one() on
   * the same io_context object may introduce the potential for deadlock. It is
   * the caller's reponsibility to avoid this.
   *
   * The poll() function may also be used to dispatch ready handlers, but
   * without blocking.
   */
  ASIO_DECL count_type run(asio::error_code&  ec);

6. scheduler如何调度任务

先来看看scheduler中比较重要的数据,outstanding_work_用来记录未完成的任务量,当outstanding_work_为0时表明没有额外任务了,scheduler会退出. 另外还有如stopped,shutdown,task_interrupted等标志来表明scheduler是否被终止了.另外这里task_保存了底层的 reactor模式,这里也就是epoll_reactor.

  // The count of unfinished work.
  atomic_count outstanding_work_;

  // The queue of handlers that are ready to be delivered.
  op_queue< operation> op_queue_;

  // The task to be run by this service.
  reactor* task_

6.1 三条队列 descriptor_data->op_queue_[3] 与 private_op_queue 及 op_queue_

在asio的事件处理过程中,handler被封装为op, 但是实现的过程中出现了三条队列,非常容易混淆.

1. 首先第一条队列 descriptor_data->op_queue_[3] , 这里其实有三条队列,分别对应着读写操作.在start_op中会将descriptor_data的地址绑定到ev.data.ptr,这里ev是 epoll_event. 这样在事件发生的时候,就可以利用ev.data.ptr来取出注册的handler了.

reactor::per_descriptor_data reactor_data_;

reactor_.start_op(op_type, impl.socket_,
          impl.reactor_data_, op, is_continuation, is_non_blocking);

class epoll_reactor
{
class descriptor_state {
  op_queue< reactor_op> op_queue_[3];
};

void start_op(...) {
  ev.data.ptr = descriptor_data;
  descriptor_data->op_queue_[op_type].push(op);
  scheduler_.work_started();
}
};

2. 第二条队列是由当前线程保存的 private_op_queue,还没有加入到全局的op_queue_中,用来暂存从epoll_wait中就绪的op

struct scheduler_thread_info : public thread_info_base
{
  op_queue< scheduler_operation> private_op_queue;
  long private_outstanding_work;
};

3. 最后也是最重要的一个是op_queue_, 这是由scheduler管理的全局队列,每个线程都可以访问,所以在访问的过程中基本上都涉及到锁. 第二条队列也就是private_op_queue中的op最终会加入到op_queue_中被处理.

class scheduler {
  // The queue of handlers that are ready to be delivered.
  op_queue< operation> op_queue_;
};

6.2 线程的睡眠与唤醒

当多个线程组成线程池时,肯定不会一直运行,肯定会等待任务准备好才会尝试获取任务,这中间需要有锁的控制和线程的唤醒机制.在asio中这一切基于一个唤醒事件,底层实现基于posix_event,封装了pthread_cond_signal和pthread_cond_wait等POSIX接口.

  // Event to wake up blocked threads.
  event wakeup_event_;

6.3 真正的run函数 do_run_one()

在asio使用时必须调用io_context.run(),继而会调用scheduler的run函数,后者只是不断的调用do_run_one,而真正的任务处理在do_run_one函数中. 那么do_run_one做了什么呢, 其实从实现中可以看出来do_run_one非常类似线程池的实现,即无限循环等待处理任务, 主要有如下几个特点:

1. 不断循环,检测到stop则退出

2. 判断准备队列op_queue_中是否为空,如果为空则wait,进入阻塞状态

3. 如果准备队列不为空, 准备处理queue中的handler,从队列中pop出一个op,先判断如果o==&task_operation_的话,说明需要调用epoll_wait来等待其它未就绪事件了. 而task_->run底层就是epoll_wait

4. 阻塞在task_->run被事件唤醒后,前面注册在epoll中的就绪事件会通过 events[i].data.ptr 来传递给this_thread.private_op_queue

5. this_thread.private_op_queue 中的就绪事件会在其他时候,主要是poll系列函数被调用的时候加入到 op_queue_ 队列中被处理

6. 再回到步骤3, 如果队列中取出来的op不是 &task_operation的话,说明可以直接处理,先判断一下队列是否为空,如果不空的话说明还有其他op,直接唤醒一个线程来继续处理

7. 当前进程则直接用o->complete来处理刚刚拿出来的op, 这样就完成了一次完整的异步事件处理了.

std::size_t scheduler::do_run_one(mutex::scoped_lock&  lock,
    scheduler::thread_info&  this_thread,
    const asio::error_code&  ec)
{
  while (!stopped_)
  {
    if (!op_queue_.empty())
    {
      // Prepare to execute first handler from queue.
      operation* o = op_queue_.front();
      op_queue_.pop();
      bool more_handlers = (!op_queue_.empty());

      if (o == & task_operation_)
      {
        task_interrupted_ = more_handlers;

        if (more_handlers & &  !one_thread_)
          wakeup_event_.unlock_and_signal_one(lock);
        else
          lock.unlock();

        task_cleanup on_exit = { this, & lock, & this_thread };
        (void)on_exit;

        // Run the task. May throw an exception. Only block if the operation
        // queue is empty and we're not polling, otherwise we want to return
        // as soon as possible.
        task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
      }
      else
      {
        std::size_t task_result = o->task_result_;

        if (more_handlers & &  !one_thread_)
          wake_one_thread_and_unlock(lock);
        else
          lock.unlock();

        // Ensure the count of outstanding work is decremented on block exit.
        work_cleanup on_exit = { this, & lock, & this_thread };
        (void)on_exit;

        // Complete the operation. May throw an exception. Deletes the object.
        o->complete(this, ec, task_result);

        return 1;
      }
    }
    else
    {
      wakeup_event_.clear(lock);
      wakeup_event_.wait(lock);
    }
  }

  return 0;
}

至此,在Linux下asio如何利用epoll来模拟proactor模式算是分析完了,核心的思想与前面分析的差不多,主要就是利用reactor来统一监控文件描述符,利用自主加入的thread来组成线程池一起执行就绪事件,达到异步的目的.

参考

1. https://github.com/chriskohlhoff/asio

2. boost.asio源码剖析

3. 浅谈 Boost.Asio 的多线程模型