在描述了Reactor模式后,本文介绍Cwinux中关于Reactor模式的具体实现。
CwxAppHandler4Base CwxAppHandler4Base 是Reactor模式中的EventHandler的实现,EventHandle是对CwxAppHandler4Base的封装。其代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 class EventHandle { public: EventHandle() { m_mask = 0; m_handler = NULL; } inline bool isReg() { return (m_mask & CwxAppHandler4Base::RW_MASK) != 0; } public: int m_mask; CwxAppHandler4Base* m_handler; };
代码中的m_mask是事件掩码(不同位表示读/写/超时/信号等事件)。CwxAppHandler4Base是具体的事件处理者(Handler)。CwxAppHandler4Base的最主要的函数是handle_event函数,该函数用来处理事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class CWX_API CwxAppHandler4Base{ public: ///定义事件类型 enum{ TIMEOUT_MASK = 0x01, READ_MASK = 0x02, ... ALL_EVENTS_MASK = TIMEOUT_MASK| READ_MASK| WRITE_MASK| SIGNAL_MASK| PERSIST_MASK }; public : /** @brief Handler的事件通知回调。 @param [in] event 发生的事件类型,为TIMEOUT_MASK、READ_MASK、WRITE_MASK、SIGNAL_MASK的组合。 @param [in] handle 发生的事件的handle。 @return -1:失败,reactor会主动调用close; 0:成功; */ virtual int handle_event(int event, CWX_HANDLE handle=CWX_INVALID_HANDLE)=0; private: CwxAppReactor * m_reactor; ///<reactor对象的指针 int m_regType; ///<handler的reactor注册类型 CWX_HANDLE m_handler; ///<事件的io handle int m_type; ///<event handle type; CWX_UINT64 m_ullTimeout; ///<超时的时刻 int m_index; };
CwxAppHandler4Base有一个字段m_handle,即文件描述符fd。其中open函数和close函数用于打开和关闭这个fd。handle_event函数,该函数用来处理事件。
CwxAppHandler4Msg CwxAppHandler4Base是抽象的EventHandler。Reactor模式中的ConcreateEventHandler 包括CwxAppHandler4Msg等。CwxAppHandler4Msg都是从CwxAppHandler4Base继承的子类。该类能够从fd上读取数据,并生成消息对象,或者将消息从网络上发送出去(发送给fd对应的网络连接)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class CWX_API CwxAppHandler4Msg : public CwxAppHandler4Base { public: /** @brief 初始化建立的连接,并往Reactor注册连接 @param [in] arg 建立连接的acceptor或为NULL @return -1:放弃建立的连接; 0:连接建立成功 */ virtual int open (void * arg= 0); /** @brief 接受连接上的事件 @param [in] event 连接的handle上的事件 @param [in] handle 发生的事件的handle。 @return -1:处理失败; 0:处理成功 */ virtual int handle_event(int event, CWX_HANDLE handle=CWX_INVALID_HANDLE); ///handle close virtual int close(CWX_HANDLE handle=CWX_INVALID_HANDLE); ///发送消息 virtual int handle_output(); ///接收消息 virtual int handle_input(); ///超时 virtual void handle_timeout(); private: ///以非阻塞的方式,发送消息。返回值,-1: failure; 0: not send all;1:send a msg inline int nonBlockSend(); protected: CwxMsgHead m_header; CwxAppConnInfo m_conn;///<connection information CwxMsgBlock* m_curSndingMsg; ///<current sending msg; CwxMsgBlock* m_waitSendMsgHead; ///<The header for wait to be sent msg. CwxMsgBlock* m_waitSendMsgTail; ///<The tail for wait to be sent msg. char m_szHeadBuf[CwxMsgHead::MSG_HEAD_LEN];///<the buf for header CwxMsgBlock* m_recvMsgData; ///<the recieved msg data };
CwxAppHandler4Msg的handle_event函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 /** @brief 接受连接上的到达数据 @param [in] handle 连接的handle @return -1:关闭的连接; 0:接收数据成功 */ int CwxAppHandler4Msg::handle_event(int event, CWX_HANDLE) { if (CwxAppConnInfo::ESTABLISHED == m_conn.getState()) ///通信状态 { CWX_ASSERT((event & ~CwxAppHandler4Base::RW_MASK) == 0); if (CWX_CHECK_ATTR(event, CwxAppHandler4Base::WRITE_MASK)) { handle_output(); } if (CWX_CHECK_ATTR(event, CwxAppHandler4Base::READ_MASK)) { handle_input(); } } else if (CwxAppConnInfo::CONNECTING == m_conn.getState()) ///等待连接状态 { ... } return 0; }
代码中m_conn表示连接的属性,其定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 class CWX_API CwxAppConnInfo { CWX_UINT32 m_uiSvrId; ///<svr id CWX_UINT32 m_uiHostId; ///<host id CWX_UINT32 m_uiConnId; ///<connection id CWX_UINT32 m_uiListenId; ///<accept connection's acceptor ID. for passive conn. CWX_UINT16 m_unState; ///<connection state. bool m_bActiveConn; ///< sign for active connection. bool m_bRawData; ///< sign for raw data connection void* m_pUserData; ///<user dat for connection CwxAppHandler4Msg* m_pHandler; ///<连接对应的Handler ... };
m_conn.getState()返回该连接的状态。handle_event函数中,如果连接的状态是ESTABLISHED,则判断事件掩码,针对写事件和读事件分别调用handle_output()函数和handle_input()函数。
以handle_input()为例,其实现如下(其中删除了错误检查/更新统计量等部分代码):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 int CwxAppHandler4Msg::handle_input() { bool bSuspend = false; if (getApp()->isStopped()) return 0; ssize_t recv_size = 0; ssize_t need_size = 0; int result = 0; if (this->m_conn.isRawData()) //recv raw data(该数据流是普通的字节流,不是消息对象) { bSuspend = false; result = this->getApp()->onRecvMsg(*this, bSuspend); return result >= 0 ? 0 : -1; } //接收到的数据实际是消息对象 while (1) { need_size = CwxMsgHead::MSG_HEAD_LEN - this->m_uiRecvHeadLen; if (need_size > 0) //接收到的数据还不是一个完整的包 { recv_size = CwxSocket::recv(getHandle(), this->m_szHeadBuf + this->m_uiRecvHeadLen, need_size); this->m_uiRecvHeadLen += recv_size; if (recv_size < need_size) { m_conn.setContinueRecvNum(0); return 0; } this->m_szHeadBuf[this->m_uiRecvHeadLen] = 0x00; if (!m_header.fromNet(this->m_szHeadBuf)) { CWX_ERROR(("Msg header is error.")); return -1; } this->m_recvMsgData = CwxMsgBlockAlloc::malloc(m_header.getDataLen()); } //end if (need_size > 0) //接收消息内容数据 need_size = m_header.getDataLen() - this->m_uiRecvDataLen; if (need_size > 0) //还没有接收到该消息内容的完整数据 { recv_size = CwxSocket::recv(getHandle(), this->m_recvMsgData->wr_ptr(), need_size); //move write pointer this->m_recvMsgData->wr_ptr(recv_size); this->m_uiRecvDataLen += recv_size; }http://loopjump.com/wp-admin/post-new.php# //notice recieving a msg. if (!this->m_recvMsgData) this->m_recvMsgData = CwxMsgBlockAlloc::malloc(0); bSuspend = false; result = this->getApp()->recvMessage(m_header, this->m_recvMsgData, *this, bSuspend); } return result; }
Cwinux中建立的网络连接上的数据有可能是消息 或者是raw data 。前者(消息)会由Cwinux来进行接收数据并打包成消息,用户得到的就是一个接收好的消息对象。后者(raw data)指的是这个连接上的数据不是消息,用户要自己读取所有的字节数据并处理。这是连接的两种模式,具体使用哪种模式是在用户建立连接时通过参数指定的。另外,代码中**this->getApp()**相关的代码是AppFramework相关的内容,我们会在后面解释这些代码。
handle_input实现过程:首先判断该连接是消息模式还是raw data模式。如果是raw data模式,则调用**result = this->getApp()->onRecvMsg(*this, bSuspend);**函数。该函数会通知用户接收数据(后面详解)。如果是消息模式,则先读取数据,形成一个消息头,然后根据消息头中记录的长度,读取一个完整的消息,并将这个消息放在m_recvMsgData对象中,最后通知用户接收该消息。
handle_output()函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 int CwxAppHandler4Msg::handle_output() { int result = 0; CWX_UINT32 uiNum = 0; bool bCloseConn = false; bool bReconn = false; CWX_UINT32 uiReconnDelay = 0; // The list had better not be empty, otherwise there's a bug! while (1) { while (1) { result = this->getNextMsg(); if (0 == result) return this->cancelWakeup(); if (CWX_CHECK_ATTR(this->m_curSndingMsg->send_ctrl().getMsgAttr(), CwxMsgSendCtrl::BEGIN_NOTICE)) { if (-1 == this->getApp()->onStartSendMsg(m_curSndingMsg, *this)) { this->m_conn.setWaitingMsgNum(this->m_conn.getWaitingMsgNum() - 1); continue; //next msg; } } //end if (this->m_curSndingMsg->IsBeginNotice()) //it's a msg which need be sent. break; //break while } //end while result = this->nonBlockSend(); } return result; }
首先从以保存的消息列表中取出一个消息,然后调用nonBlockSend函数发送该消息。nonBlockSend调用CwxSocket::write将消息发送出去。
好,先小结一下前面所说的内容:Cwinux中,CwxAppHandler4Base是Reactor模式中的EventHandler,是个父类,它定义了handle_event这样的一个接口。CwxAppHandler4Base派生的子类如CwxAppHandler4Msg在其覆盖(override)的handle_event函数中处理具体的事件。
CwxAppEpoll CwxAppEpoll是Cwinux中的epoll引擎的封装,同时CwxAppEpoll封装了fd到EventHandle(mask+CwxAppHandler4Base)的映射。
CwxAppEpoll的代码如下(部分):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class CWX_API CwxAppEpoll { public: CwxAppEpoll(bool bEnableSignal = true); ~CwxAppEpoll(); public: int init(); /** @brief 注册IO事件处理handle。 @param [in] handle 监测的IO handle @param [in] event_handler io handle对应的event handler。 @param [in] mask 注册的事件掩码,为READ_MASK、WRITE_MASK、PERSIST_MASK、TIMEOUT_MASK组合 @param [in] uiMillSecond 多少毫秒超时。0表示没有超时设置。 @param [in] bForkAdd 是否是fork后重新添加。 @return -1:失败;0:成功; */ int registerHandler(CWX_HANDLE handle, CwxAppHandler4Base *event_handler, int mask, CWX_UINT32 uiMillSecond = 0); CwxAppHandler4Base* removeHandler(CWX_HANDLE handle); int suspendHandler(CWX_HANDLE handle, int suspend_mask); int resumeHandler(CWX_HANDLE handle, int resume_mask); int registerSignal(int signum, CwxAppHandler4Base *event_handler); int removeSignal(CwxAppHandler4Base *event_handler); CwxAppHandler4Base* removeSignal(int sig); int scheduleTimer(CwxAppHandler4Base *event_handler, CwxTimeValue const &interval); int cancelTimer(CwxAppHandler4Base *event_handler); int forkReinit(); /** @brief 检测事件。 @param [in] callback 事件的回调函数 @param [in] arg 回调函数的参数 @param [in] uiMiliTimeout 超时的毫秒数,0表示一直阻塞到事件发生。 @return -1:失败;0:成功 */ int poll(REACTOR_CALLBACK callback, void* arg, CWX_UINT32 uiMiliTimeout = 0); void stop(); CwxTimeValue const& getCurTime() const; private: EventHandle m_eHandler[CWX_APP_MAX_IO_NUM]; ///<epoll的event handler CwxMinHeap<CwxAppHandler4Base> m_timeHeap; ///<时间堆 };
m_eHandler描述了fd到EventHandle的映射。某个fd上有事件发生,可以查询m_eHandler数组来获得该fd上事件处理handler。registerHandler和resumeHandler等都是维护这个数据结构的函数。另外,CwxAppEpoll除了处理网络连接上的事件之外,还处理了signal的事件和超时事件。超时事件是由m_timeHeap时间堆来维护的。
CwxAppEpoll最重要的函数当然是执行多路分发的poll函数。
poll函数处理了网络连接上的读写等事件、信号signal事件和超时事件(代码经过了简化)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 int CwxAppEpoll::poll(REACTOR_CALLBACK callback, void* arg, CWX_UINT32 uiMiliTimeout) { /*计算最近一次超时事件还有多长时间发生。*/ timeout(ullTimeout); //从时间堆里获取堆顶元素的最小值。 //计算下次超时还有多长时间,记录在tv中。 num = epoll_wait(m_epfd, m_events, CWX_APP_MAX_IO_NUM, tv); if (num > 0) { for (i = 0; i < num; i++) //epoll的通用写法 { event = &m_events[i]; if (event->events & EPOLLIN) mask |= CwxAppHandler4Base::READ_MASK; if (event->events & EPOLLOUT) mask |= CwxAppHandler4Base::WRITE_MASK; handler = m_eHandler[event->data.fd].m_handler; //根据fd得到对应的EventHandle callback(handler, mask, CWX_CHECK_ATTR(m_eHandler[event->data.fd].m_mask, CwxAppHandler4Base::PERSIST_MASK), arg); //执行回调函数 } } //检测处理信号signal事件 if (m_bSignal && m_bEnableSignal) { if (m_bStop) return 0; m_bSignal = 0; for (i = 0; i < CWX_APP_MAX_SIGNAL_ID; i++) { if (m_arrSignals[i]) //id为i的信号发生信号事件。 { m_arrSignals[i] = 0; //清空 callback(m_sHandler[i], CwxAppHandler4Base::SIGNAL_MASK, true, arg); //执行回调函数 } } } //检测超时事件 while (!m_timeHeap.isEmpty() && (m_timeHeap.top()->getTimeout() < ullNow)) { if (m_timeHeap.top()->getHandle() != CWX_INVALID_HANDLE) { handler = removeHandler(m_timeHeap.top()->getHandle()); } else { handler = m_timeHeap.pop(); } callback(handler, CwxAppHandler4Base::TIMEOUT_MASK, false, arg); } return 0; }
代码首先处理计算超时时间,tv记录下次超时事件还有多长时间。
然后调用epoll_wait来查询网络连接上是否有事件发生。epoll_wait会等待tv时间,期间如果某些连接上有事件发生,则epoll_wait返回,num记录了有多少连接上有事件发生。当有事件发生时,针对这些连接上的事件执行对应的handle。具体通过callback来执行。
信号处理类似,某些信号发生时,对应位置被置位。检查所有位置,发生信号的话,就执行对应的处理handler。
超时事件通过最小堆来完成,不断地从堆中取出已经超时的handler,执行对应的处理handler。
好,先小结一下这段内容:CwxAppEpoll是对epoll的封装,同时维护了fd到EventHandle的映射关系。执行poll函数后,能够检测到网络连接上的事件、信号事件和超时事件,并分别处理。
CwxAppReactor 下面看一下Reactor类。
Cwinux中的CwxAppReactor是Reactor模式中Reactor的具体实现。
主要成员变量:
*AppEpoll m_engine;**即一个epoll引擎,通过这个引擎,Reactor监测事件的发生并处理事件。
**m_connId[];**该数组维护了文件描述符fd到 连接ID(connId,整形)的映射。关于connId,当新的连接建立时,由Cwinux指定一个新的整形数字作为新连接的id。参见代码CwxAppTcpAcceptor::makeHander函数。这样Cwinux就可以从fd直接找到对应的connId,同时已知connId能够通过m_connMap这一数据结构得到fd。
**hash_map<ConnIdType,Handler4Base*> m_connMap;*维护了connId到Handler4Base 的映射关系。给定一个connId,Cwinux能够得到对应的Handler4Base*,通过Handler4Base的getHandle()函数可以得到对应的文件描述符fd。
主要成员函数包括run和callback函数。
run函数代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 /** @brief 架构事件的循环处理API,实现消息的分发。 @return -1:失败;0:正常退出 */ int CwxAppReactor::run(REACTOR_EVENT_HOOK hook, void* arg, bool bOnce, CWX_UINT32 uiMiliTimeout) { int ret = 0; if (!bOnce) { if (!m_bStop || !m_engine) { CWX_ERROR(("CwxAppReactor::open() must be invoke before CwxAppReactor::run()")); return -1; } } m_bStop = false; do { { ///带锁执行event-loop CwxMutexGuard<CwxMutexLock> lock(&m_lock); ret = m_engine->poll(CwxAppReactor::callback, this, uiMiliTimeout); } if (m_bStop) { CWX_DEBUG(("Stop running for stop")); break; } if (0 != ret) { if ((-1 == ret) && (EINTR != errno)) { CWX_ERROR(("Failure to running epoll with -1, errno=%d", errno)); break; } } ///调用hook if (hook) { if (0 != hook(arg)) { CWX_DEBUG(("Stop running for hook() without 0")); break; } } ///等待其他的线程执行各种操作。 m_rwLock.acquire_write(); m_rwLock.release(); }while(!m_bStop && !bOnce); return ret; }
简言之,run函数的代码结构是这样的:
1 2 3 4 5 6 7 run() { do { m_engine->poll(callback); }while(!stop); }
run函数的所有核心工作都是有AppEpoll的poll来完成的。
callback函数也很简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void CwxAppReactor::callback(CwxAppHandler4Base* handler, int mask, bool bPersist, void *arg) { CwxAppReactor* reactor = (CwxAppReactor*)arg; if (!bPersist) { switch(handler->getRegType()) { case REG_TYPE_IO: reactor->m_connId[handler->getHandle()] = CWX_APP_INVALID_CONN_ID; break; default: break; } } int ret = handler->handle_event(mask, handler->getHandle()); if (-1 == ret) { handler->close(handler->getHandle()); } }
简言之,就是执行参数中handler的具体动作(调用handle_event函数)。
总结:
对照Reactor模式,看Cwinux中相应的实现:CwxAppHandler4Base 是Reactor模式中的EventHandler的实现,EventHandle是对CwxAppHandler4Base的封装;CwxAppEpoll是多路复用引擎;CwxAppReactor是对应于Reactor模式中的Reactor的实现。此外,CwxAppReactor通过两个数据结构,维护了fd到connId的映射。