高并发网络有reactor和proactor两种模式,reactor基于非阻塞io多路复用,本质上属于同步io,而proactor模式则基于异步io. 类unix平台一般都采用reactor模式,如epoll和kqueue等.而Windows平台因为对于reactor模式的支持不好,但是对于异步io有IOCP非常高效. 作为有可能在c++20中进入标准的网络库,asio需要对不同平台都提供支持,这样就造成了只能用一种模式去模拟另一种模式,asio选择在linux下面使用reactor模拟proactor模式.
言归正传,本文就一个主题:在linux下面asio到底是如何借助reactor如epoll来模拟proactor异步模式的.
1. reactor和proactor模式之间到底有什么区别
首先reactor模式依赖于epoll这样的io多路复用机制,能监听大量文件描述符的读写,在有可读写时发生超时或收到信号时从阻塞状态退出,用户程序根据发生的事件进行相应的处理. 本质上来看reactor模式还是同步模式,当前进程一直监控一个事件集合,可读就读,可写就写.
但是proactor模式就不一样了,本质上是异步模式,只要注册了读事件处理函数,就可以继续做其它事情了, 也就是说不需要当前进程再处理读和写操作了,只需要在注册事件处理函数的时候提供一个缓冲区,操作系统就会在可读或可写时将读写完成,然后将数据拷贝到注册的缓冲区,然后通知用户程序事情干完了, windows下的iocp能提供很好的异步io支持.
2. proactor模式需要提供什么接口,满足什么特点
首先可以看下使用asio异步读写的例子, 首先声明asio中最核心的对象io_context,之后依据io_context声明一个socket,然后注册异步写提供缓冲区,最后调用io_context.run()等待事件执行.
asio::io_context io_context; tcp::socket socket_(io_context); asio::async_write(socket_, asio::buffer(data_, length), [this, self](std::error_code ec, std::size_t /*length*/) { }); io_context.run();
这一套流程使用同步io的思路很容易理解,注册写处理函数,然后run的时候等待事件发生(这里即可写),事件发生后执行事件处理函数,事件完成,退出run循环. 如果从异步的角度来看的话,对于上述流程居然是一样的,就asio而言,即使是异步模式,也必须在io_context.run()执行之后才会开始处理注册的io事件.
那么如何让事件在注册之后尽可能早的执行呢,可以看如下示例,如果有两个读写事件,可以在注册第一个写事件之后立刻启动一个线程来调用 io_context.run() 来处理事件,这样就可以同时等待执行第一个事件,而当前线程继续注册后续任务了. 这样thread1和当前线程在asio中会组成线程池,如果有事件发生了,可以分别获取任务进行执行.
asio::io_context io_context; tcp::socket socket_(io_context); asio::async_write(socket_, asio::buffer... asio::thread thread1([& io_context](){ io_context.run(); }); asio::async_read(socket_, asio::buffer... io_context.run(); thread1.join();
小结一下,proactor模式需要提供异步接口如 async_write 和 async_accept等,并且可以由一个对象io_context来管理io事件,但是不能像reactor一样由一个线程来管理事件并且处理事件.proactor模式要能够在注册完事件之后马上就能够开始等待处理,并且在底层完成读写操作将数据拷贝到提供的缓冲区.
3. 设想:如何通过reactor模拟proactor
假定我有reactor模式支持如epoll,我该如何实现一个proactor模式呢.首先对于异步接口可以设计为向reactor中添加事件,这是显而易见的.另外io_context.run()操作正好对应着reactor模式中的epoll_wait,正好用来监听事件.
但是最关键的还是没解决,还需要做到让刚注册的读写事件马上就可以在后台执行.虽说借助额外的线程可以直接以阻塞的模式来处理,但是这并不是proactor模式. 如果以非阻塞的方式处理,允许引入额外的线程执行epoll_wait好像可行.但是需要注意的是多个线程可以执行同时执行epoll_wait监控相同的描述符集合吗,应该是不行的.
这时就需要对多个线程进行调度了,需要有一个任务就绪队列,调度的线程中只有一个会执行epoll_wait来监控事件,所以的线程都可以来处理事件.如此,不就异步了吗.
4. linux下asio异步io接口实现
可以从类的继承关系找到异步接口的实现,以tcp::socket为例, 其继承关系如下, 祖父类basic_socket中定义了socket在linux下的类型为reactive_socket_service.
/// The TCP socket type. typedef basic_stream_socket< tcp> socket template < typename Protocol, typename Executor> class basic_stream_socket : public basic_socket< Protocol, Executor> template < typename Protocol, typename Executor> class basic_socket : public socket_base { typedef typename detail::reactive_socket_service< Protocol>::native_handle_type native_handle_type;
可以看reactive_socket_service中提供的操作,以async_accept为例,实例化了一个op来包裹了注册在async_accept上的handler,然后用start_accept_op来启动这个op,后续会继续调用start_op以及reactor也就是的start_op函数, 会将op加入到op_queue队列中等待处理, 同时会将对应的fd及事件通过epoll的接口加入到监听事件中.
template < typename Socket, typename Handler, typename IoExecutor> void async_accept(implementation_type& impl, Socket& peer, endpoint_type* peer_endpoint, Handler& handler, const IoExecutor& io_ex) { bool is_continuation = asio_handler_cont_helpers::is_continuation(handler); // Allocate and construct an operation to wrap the handler. typedef reactive_socket_accept_op< Socket, Protocol, Handler, IoExecutor> op; typename op::ptr p = { asio::detail::addressof(handler), op::ptr::allocate(handler), 0 }; p.p = new (p.v) op(impl.socket_, impl.state_, peer, impl.protocol_, peer_endpoint, handler, io_ex); ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket", & impl, impl.socket_, "async_accept")); start_accept_op(impl, p.p, is_continuation, peer.is_open()); p.v = p.p = 0; }
5. 读写上下文io_context和任务调度器scheduler
作为asio中最核心的对象io_context承担着消除不同平台使用时的差异的功能,但是在linux下最核心的实现是在 scheduler中的. 可以看出来,在windows下的实现基于iocp,而没有iocp的话就采用scheduler,而scheduler又可以基于各种reactor模式,比如linux下的epoll.
namespace detail { #if defined(ASIO_HAS_IOCP) typedef class win_iocp_io_context io_context_impl; class win_iocp_overlapped_ptr; #else typedef class scheduler io_context_impl; #endif } // namespace detail class io_context : public execution_context { private: typedef detail::io_context_impl impl_type;
在io_context中的run,run_one等函数,最终都是调用scheduler中的对应操作,对于run函数的功能,可以看看asio的注释,从注释可以看出,run会一直阻塞,直到所有的任务都完成了,并且没有handlers要处理了,或者知道io_context被终止了. 而且多个线程可以调用同一个io_context的run()含税,这些线程会自动组成一个线程池共同处理所有的就绪任务.
/// event processing loop. /** * The run() function blocks until all work has finished and there are no * more handlers to be dispatched, or until the io_context has been stopped. * * Multiple threads may call the run() function to set up a pool of threads * from which the io_context may execute handlers. All threads that are * waiting in the pool are equivalent and the io_context may choose any one * of them to invoke a handler. * * A normal exit from the run() function implies that the io_context object * is stopped (the stopped() function returns @c true). Subsequent calls to * run(), run_one(), poll() or poll_one() will return immediately unless there * is a prior call to restart(). * * @param ec Set to indicate what error occurred, if any. * * @return The number of handlers that were executed. * * @note Calling the run() function from a thread that is currently calling * one of run(), run_one(), run_for(), run_until(), poll() or poll_one() on * the same io_context object may introduce the potential for deadlock. It is * the caller's reponsibility to avoid this. * * The poll() function may also be used to dispatch ready handlers, but * without blocking. */ ASIO_DECL count_type run(asio::error_code& ec);
6. scheduler如何调度任务
先来看看scheduler中比较重要的数据,outstanding_work_用来记录未完成的任务量,当outstanding_work_为0时表明没有额外任务了,scheduler会退出. 另外还有如stopped,shutdown,task_interrupted等标志来表明scheduler是否被终止了.另外这里task_保存了底层的 reactor模式,这里也就是epoll_reactor.
// The count of unfinished work. atomic_count outstanding_work_; // The queue of handlers that are ready to be delivered. op_queue< operation> op_queue_; // The task to be run by this service. reactor* task_
6.1 三条队列 descriptor_data->op_queue_[3] 与 private_op_queue 及 op_queue_
在asio的事件处理过程中,handler被封装为op, 但是实现的过程中出现了三条队列,非常容易混淆.
1. 首先第一条队列 descriptor_data->op_queue_[3] , 这里其实有三条队列,分别对应着读写操作.在start_op中会将descriptor_data的地址绑定到ev.data.ptr,这里ev是 epoll_event. 这样在事件发生的时候,就可以利用ev.data.ptr来取出注册的handler了.
reactor::per_descriptor_data reactor_data_; reactor_.start_op(op_type, impl.socket_, impl.reactor_data_, op, is_continuation, is_non_blocking); class epoll_reactor { class descriptor_state { op_queue< reactor_op> op_queue_[3]; }; void start_op(...) { ev.data.ptr = descriptor_data; descriptor_data->op_queue_[op_type].push(op); scheduler_.work_started(); } };
2. 第二条队列是由当前线程保存的 private_op_queue,还没有加入到全局的op_queue_中,用来暂存从epoll_wait中就绪的op
struct scheduler_thread_info : public thread_info_base { op_queue< scheduler_operation> private_op_queue; long private_outstanding_work; };
3. 最后也是最重要的一个是op_queue_, 这是由scheduler管理的全局队列,每个线程都可以访问,所以在访问的过程中基本上都涉及到锁. 第二条队列也就是private_op_queue中的op最终会加入到op_queue_中被处理.
class scheduler { // The queue of handlers that are ready to be delivered. op_queue< operation> op_queue_; };
6.2 线程的睡眠与唤醒
当多个线程组成线程池时,肯定不会一直运行,肯定会等待任务准备好才会尝试获取任务,这中间需要有锁的控制和线程的唤醒机制.在asio中这一切基于一个唤醒事件,底层实现基于posix_event,封装了pthread_cond_signal和pthread_cond_wait等POSIX接口.
// Event to wake up blocked threads. event wakeup_event_;
6.3 真正的run函数 do_run_one()
在asio使用时必须调用io_context.run(),继而会调用scheduler的run函数,后者只是不断的调用do_run_one,而真正的任务处理在do_run_one函数中. 那么do_run_one做了什么呢, 其实从实现中可以看出来do_run_one非常类似线程池的实现,即无限循环等待处理任务, 主要有如下几个特点:
1. 不断循环,检测到stop则退出
2. 判断准备队列op_queue_中是否为空,如果为空则wait,进入阻塞状态
3. 如果准备队列不为空, 准备处理queue中的handler,从队列中pop出一个op,先判断如果o==&task_operation_的话,说明需要调用epoll_wait来等待其它未就绪事件了. 而task_->run底层就是epoll_wait
4. 阻塞在task_->run被事件唤醒后,前面注册在epoll中的就绪事件会通过 events[i].data.ptr 来传递给this_thread.private_op_queue
5. this_thread.private_op_queue 中的就绪事件会在其他时候,主要是poll系列函数被调用的时候加入到 op_queue_ 队列中被处理
6. 再回到步骤3, 如果队列中取出来的op不是 &task_operation的话,说明可以直接处理,先判断一下队列是否为空,如果不空的话说明还有其他op,直接唤醒一个线程来继续处理
7. 当前进程则直接用o->complete来处理刚刚拿出来的op, 这样就完成了一次完整的异步事件处理了.
std::size_t scheduler::do_run_one(mutex::scoped_lock& lock, scheduler::thread_info& this_thread, const asio::error_code& ec) { while (!stopped_) { if (!op_queue_.empty()) { // Prepare to execute first handler from queue. operation* o = op_queue_.front(); op_queue_.pop(); bool more_handlers = (!op_queue_.empty()); if (o == & task_operation_) { task_interrupted_ = more_handlers; if (more_handlers & & !one_thread_) wakeup_event_.unlock_and_signal_one(lock); else lock.unlock(); task_cleanup on_exit = { this, & lock, & this_thread }; (void)on_exit; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue); } else { std::size_t task_result = o->task_result_; if (more_handlers & & !one_thread_) wake_one_thread_and_unlock(lock); else lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. work_cleanup on_exit = { this, & lock, & this_thread }; (void)on_exit; // Complete the operation. May throw an exception. Deletes the object. o->complete(this, ec, task_result); return 1; } } else { wakeup_event_.clear(lock); wakeup_event_.wait(lock); } } return 0; }
至此,在Linux下asio如何利用epoll来模拟proactor模式算是分析完了,核心的思想与前面分析的差不多,主要就是利用reactor来统一监控文件描述符,利用自主加入的thread来组成线程池一起执行就绪事件,达到异步的目的.