在描述了Reactor模式后,本文介绍Cwinux中关于Reactor模式的具体实现。
CwxAppHandler4Base CwxAppHandler4Base 是Reactor模式中的EventHandler的实现,EventHandle是对CwxAppHandler4Base的封装。其代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 class EventHandle { public : EventHandle() { m_mask = 0 ; m_handler = NULL ; } inline bool isReg () { return (m_mask & CwxAppHandler4Base::RW_MASK) != 0 ; } public : int m_mask; CwxAppHandler4Base* m_handler; };
代码中的m_mask是事件掩码(不同位表示读/写/超时/信号等事件)。CwxAppHandler4Base是具体的事件处理者(Handler)。CwxAppHandler4Base的最主要的函数是handle_event函数,该函数用来处理事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class CWX_API CwxAppHandler4Base { public: enum { TIMEOUT_MASK = 0x01 , READ_MASK = 0x02 , ... ALL_EVENTS_MASK = TIMEOUT_MASK| READ_MASK| WRITE_MASK| SIGNAL_MASK| PERSIST_MASK }; public : virtual int handle_event (int event, CWX_HANDLE handle=CWX_INVALID_HANDLE) =0 ; private: CwxAppReactor * m_reactor; int m_regType; CWX_HANDLE m_handler; int m_type; CWX_UINT64 m_ullTimeout; int m_index; };
CwxAppHandler4Base有一个字段m_handle,即文件描述符fd。其中open函数和close函数用于打开和关闭这个fd。handle_event函数,该函数用来处理事件。
CwxAppHandler4Msg CwxAppHandler4Base是抽象的EventHandler。Reactor模式中的ConcreateEventHandler 包括CwxAppHandler4Msg等。CwxAppHandler4Msg都是从CwxAppHandler4Base继承的子类。该类能够从fd上读取数据,并生成消息对象,或者将消息从网络上发送出去(发送给fd对应的网络连接)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class CWX_API CwxAppHandler4Msg : public CwxAppHandler4Base { public: virtual int open (void * arg= 0 ) ; virtual int handle_event (int event, CWX_HANDLE handle=CWX_INVALID_HANDLE) ; virtual int close (CWX_HANDLE handle=CWX_INVALID_HANDLE) ; virtual int handle_output () ; virtual int handle_input () ; virtual void handle_timeout () ; private: inline int nonBlockSend () ; protected: CwxMsgHead m_header; CwxAppConnInfo m_conn; CwxMsgBlock* m_curSndingMsg; CwxMsgBlock* m_waitSendMsgHead; CwxMsgBlock* m_waitSendMsgTail; char m_szHeadBuf[CwxMsgHead::MSG_HEAD_LEN]; CwxMsgBlock* m_recvMsgData; };
CwxAppHandler4Msg的handle_event函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 /** int CwxAppHandler4Msg::handle_event(int event, CWX_HANDLE) { if (CwxAppConnInfo::ESTABLISHED == m_conn.getState()) { CWX_ASSERT((event & ~CwxAppHandler4Base::RW_MASK) == 0 ); if (CWX_CHECK_ATTR(event, CwxAppHandler4Base::WRITE_MASK)) { handle_output(); } if (CWX_CHECK_ATTR(event, CwxAppHandler4Base::READ_MASK)) { handle_input(); } } else if (CwxAppConnInfo::CONNECTING == m_conn.getState()) { ... } return 0 ; }
代码中m_conn表示连接的属性,其定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 class CWX_API CwxAppConnInfo { CWX_UINT32 m_uiSvrId; CWX_UINT32 m_uiHostId; CWX_UINT32 m_uiConnId; CWX_UINT32 m_uiListenId; CWX_UINT16 m_unState; bool m_bActiveConn; bool m_bRawData; void * m_pUserData; CwxAppHandler4Msg* m_pHandler; ... };
m_conn.getState()返回该连接的状态。handle_event函数中,如果连接的状态是ESTABLISHED,则判断事件掩码,针对写事件和读事件分别调用handle_output()函数和handle_input()函数。
以handle_input()为例,其实现如下(其中删除了错误检查/更新统计量等部分代码):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 int CwxAppHandler4Msg::handle_input() { bool bSuspend = false ; if (getApp()->isStopped()) return 0 ; ssize_t recv_size = 0 ; ssize_t need_size = 0 ; int result = 0 ; if (this->m_conn.isRawData()) { bSuspend = false ; result = this->getApp()->onRecvMsg(*this, bSuspend); return result >= 0 ? 0 : -1 ; } //接收到的数据实际是消息对象 while (1 ) { need_size = CwxMsgHead::MSG_HEAD_LEN - this->m_uiRecvHeadLen; if (need_size > 0 ) { recv_size = CwxSocket::recv(getHandle(), this->m_szHeadBuf + this->m_uiRecvHeadLen, need_size); this->m_uiRecvHeadLen += recv_size; if (recv_size < need_size) { m_conn.setContinueRecvNum(0 ); return 0 ; } this->m_szHeadBuf[this->m_uiRecvHeadLen] = 0x00 ; if (!m_header.fromNet(this->m_szHeadBuf)) { CWX_ERROR(("Msg header is error." )); return -1 ; } this->m_recvMsgData = CwxMsgBlockAlloc::malloc(m_header.getDataLen()); } //接收消息内容数据 need_size = m_header.getDataLen() - this->m_uiRecvDataLen; if (need_size > 0 ) { recv_size = CwxSocket::recv(getHandle(), this->m_recvMsgData->wr_ptr(), need_size); //move write pointer this->m_recvMsgData->wr_ptr(recv_size); this->m_uiRecvDataLen += recv_size; }http: //notice recieving a msg. if (!this->m_recvMsgData) this->m_recvMsgData = CwxMsgBlockAlloc::malloc(0 ); bSuspend = false ; result = this->getApp()->recvMessage(m_header, this->m_recvMsgData, *this, bSuspend); } return result; }
Cwinux中建立的网络连接上的数据有可能是消息 或者是raw data 。前者(消息)会由Cwinux来进行接收数据并打包成消息,用户得到的就是一个接收好的消息对象。后者(raw data)指的是这个连接上的数据不是消息,用户要自己读取所有的字节数据并处理。这是连接的两种模式,具体使用哪种模式是在用户建立连接时通过参数指定的。另外,代码中**this->getApp()**相关的代码是AppFramework相关的内容,我们会在后面解释这些代码。
handle_input实现过程:首先判断该连接是消息模式还是raw data模式。如果是raw data模式,则调用**result = this->getApp()->onRecvMsg(*this, bSuspend);**函数。该函数会通知用户接收数据(后面详解)。如果是消息模式,则先读取数据,形成一个消息头,然后根据消息头中记录的长度,读取一个完整的消息,并将这个消息放在m_recvMsgData对象中,最后通知用户接收该消息。
handle_output()函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 int CwxAppHandler4Msg::handle_output() { int result = 0 ; CWX_UINT32 uiNum = 0 ; bool bCloseConn = false ; bool bReconn = false ; CWX_UINT32 uiReconnDelay = 0 ; while (1 ) { while (1 ) { result = this ->getNextMsg(); if (0 == result) return this ->cancelWakeup(); if (CWX_CHECK_ATTR(this ->m_curSndingMsg->send_ctrl().getMsgAttr(), CwxMsgSendCtrl::BEGIN_NOTICE)) { if (-1 == this ->getApp()->onStartSendMsg(m_curSndingMsg, *this )) { this ->m_conn.setWaitingMsgNum(this ->m_conn.getWaitingMsgNum() - 1 ); continue ; } } break ; } result = this ->nonBlockSend(); } return result; }
首先从以保存的消息列表中取出一个消息,然后调用nonBlockSend函数发送该消息。nonBlockSend调用CwxSocket::write将消息发送出去。
好,先小结一下前面所说的内容:Cwinux中,CwxAppHandler4Base是Reactor模式中的EventHandler,是个父类,它定义了handle_event这样的一个接口。CwxAppHandler4Base派生的子类如CwxAppHandler4Msg在其覆盖(override)的handle_event函数中处理具体的事件。
CwxAppEpoll CwxAppEpoll是Cwinux中的epoll引擎的封装,同时CwxAppEpoll封装了fd到EventHandle(mask+CwxAppHandler4Base)的映射。
CwxAppEpoll的代码如下(部分):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class CWX_API CwxAppEpoll { public: CwxAppEpoll(bool bEnableSignal = true ); ~CwxAppEpoll(); public: int init () ; int registerHandler (CWX_HANDLE handle, CwxAppHandler4Base *event_handler, int mask, CWX_UINT32 uiMillSecond = 0 ) ; CwxAppHandler4Base* removeHandler(CWX_HANDLE handle); int suspendHandler (CWX_HANDLE handle, int suspend_mask) ; int resumeHandler (CWX_HANDLE handle, int resume_mask) ; int registerSignal (int signum, CwxAppHandler4Base *event_handler) ; int removeSignal (CwxAppHandler4Base *event_handler) ; CwxAppHandler4Base* removeSignal(int sig); int scheduleTimer (CwxAppHandler4Base *event_handler, CwxTimeValue const &interval) ; int cancelTimer (CwxAppHandler4Base *event_handler) ; int forkReinit () ; int poll (REACTOR_CALLBACK callback, void * arg, CWX_UINT32 uiMiliTimeout = 0 ) ; void stop () ; CwxTimeValue const & getCurTime() const ; private: EventHandle m_eHandler[CWX_APP_MAX_IO_NUM]; CwxMinHeap<CwxAppHandler4Base> m_timeHeap; };
m_eHandler描述了fd到EventHandle的映射。某个fd上有事件发生,可以查询m_eHandler数组来获得该fd上事件处理handler。registerHandler和resumeHandler等都是维护这个数据结构的函数。另外,CwxAppEpoll除了处理网络连接上的事件之外,还处理了signal的事件和超时事件。超时事件是由m_timeHeap时间堆来维护的。
CwxAppEpoll最重要的函数当然是执行多路分发的poll函数。
poll函数处理了网络连接上的读写等事件、信号signal事件和超时事件(代码经过了简化)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 int CwxAppEpoll::poll(REACTOR_CALLBACK callback, void* arg, CWX_UINT32 uiMiliTimeout) { timeout(ullTimeout); num = epoll_wait(m_epfd, m_events, CWX_APP_MAX_IO_NUM, tv); if (num > 0 ) { for (i = 0 ; i < num; i++) { event = &m_events[i]; if (event->events & EPOLLIN) mask |= CwxAppHandler4Base::READ_MASK; if (event->events & EPOLLOUT) mask |= CwxAppHandler4Base::WRITE_MASK; handler = m_eHandler[event->data.fd].m_handler; callback(handler, mask, CWX_CHECK_ATTR(m_eHandler[event->data.fd].m_mask, CwxAppHandler4Base::PERSIST_MASK), arg); } } if (m_bSignal && m_bEnableSignal) { if (m_bStop) return 0 ; m_bSignal = 0 ; for (i = 0 ; i < CWX_APP_MAX_SIGNAL_ID; i++) { if (m_arrSignals[i]) { m_arrSignals[i] = 0 ; callback(m_sHandler[i], CwxAppHandler4Base::SIGNAL_MASK, true , arg); } } } while (!m_timeHeap.isEmpty() && (m_timeHeap.top()->getTimeout() < ullNow)) { if (m_timeHeap.top()->getHandle() != CWX_INVALID_HANDLE) { handler = removeHandler(m_timeHeap.top()->getHandle()); } else { handler = m_timeHeap.pop(); } callback(handler, CwxAppHandler4Base::TIMEOUT_MASK, false , arg); } return 0 ; }
代码首先处理计算超时时间,tv记录下次超时事件还有多长时间。
然后调用epoll_wait来查询网络连接上是否有事件发生。epoll_wait会等待tv时间,期间如果某些连接上有事件发生,则epoll_wait返回,num记录了有多少连接上有事件发生。当有事件发生时,针对这些连接上的事件执行对应的handle。具体通过callback来执行。
信号处理类似,某些信号发生时,对应位置被置位。检查所有位置,发生信号的话,就执行对应的处理handler。
超时事件通过最小堆来完成,不断地从堆中取出已经超时的handler,执行对应的处理handler。
好,先小结一下这段内容:CwxAppEpoll是对epoll的封装,同时维护了fd到EventHandle的映射关系。执行poll函数后,能够检测到网络连接上的事件、信号事件和超时事件,并分别处理。
CwxAppReactor 下面看一下Reactor类。
Cwinux中的CwxAppReactor是Reactor模式中Reactor的具体实现。
主要成员变量:
*AppEpoll m_engine;**即一个epoll引擎,通过这个引擎,Reactor监测事件的发生并处理事件。
**m_connId[];**该数组维护了文件描述符fd到 连接ID(connId,整形)的映射。关于connId,当新的连接建立时,由Cwinux指定一个新的整形数字作为新连接的id。参见代码CwxAppTcpAcceptor::makeHander函数。这样Cwinux就可以从fd直接找到对应的connId,同时已知connId能够通过m_connMap这一数据结构得到fd。
**hash_map<ConnIdType,Handler4Base*> m_connMap;*维护了connId到Handler4Base 的映射关系。给定一个connId,Cwinux能够得到对应的Handler4Base*,通过Handler4Base的getHandle()函数可以得到对应的文件描述符fd。
主要成员函数包括run和callback函数。
run函数代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 /** int CwxAppReactor::run(REACTOR_EVENT_HOOK hook, void * arg, bool bOnce, CWX_UINT32 uiMiliTimeout) { int ret = 0 ; if (!bOnce) { if (!m_bStop || !m_engine) { CWX_ERROR(("CwxAppReactor::open() must be invoke before CwxAppReactor::run()" )); return -1 ; } } m_bStop = false ; do { { CwxMutexGuard<CwxMutexLock> lock(&m_lock); ret = m_engine->poll(CwxAppReactor::callback, this , uiMiliTimeout); } if (m_bStop) { CWX_DEBUG(("Stop running for stop" )); break ; } if (0 != ret) { if ((-1 == ret) && (EINTR != errno)) { CWX_ERROR(("Failure to running epoll with -1, errno=%d" , errno)); break ; } } if (hook) { if (0 != hook(arg)) { CWX_DEBUG(("Stop running for hook() without 0" )); break ; } } m_rwLock.acquire_write(); m_rwLock.release(); }while (!m_bStop && !bOnce); return ret; }
简言之,run函数的代码结构是这样的:
1 2 3 4 5 6 7 run() { do { m_engine->poll(callback); }while (!stop); }
run函数的所有核心工作都是有AppEpoll的poll来完成的。
callback函数也很简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void CwxAppReactor::callback(CwxAppHandler4Base* handler, int mask, bool bPersist, void *arg) { CwxAppReactor* reactor = (CwxAppReactor*)arg; if (!bPersist) { switch (handler->getRegType()) { case REG_TYPE_IO: reactor->m_connId[handler->getHandle()] = CWX_APP_INVALID_CONN_ID; break ; default : break ; } } int ret = handler->handle_event(mask, handler->getHandle()); if (-1 == ret) { handler->close(handler->getHandle()); } }
简言之,就是执行参数中handler的具体动作(调用handle_event函数)。
总结:
对照Reactor模式,看Cwinux中相应的实现:CwxAppHandler4Base 是Reactor模式中的EventHandler的实现,EventHandle是对CwxAppHandler4Base的封装;CwxAppEpoll是多路复用引擎;CwxAppReactor是对应于Reactor模式中的Reactor的实现。此外,CwxAppReactor通过两个数据结构,维护了fd到connId的映射。