LoopJump's Blog

Cwinux源码解析5

2014-01-09

在描述了Reactor模式后,本文介绍Cwinux中关于Reactor模式的具体实现。

CwxAppHandler4Base

CwxAppHandler4Base 是Reactor模式中的EventHandler的实现,EventHandle是对CwxAppHandler4Base的封装。其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
class EventHandle {
public:
EventHandle() {
m_mask = 0;
m_handler = NULL;
}
inline bool isReg() {
return (m_mask & CwxAppHandler4Base::RW_MASK) != 0;
}
public:
int m_mask;
CwxAppHandler4Base* m_handler;
};

代码中的m_mask是事件掩码(不同位表示读/写/超时/信号等事件)。CwxAppHandler4Base是具体的事件处理者(Handler)。CwxAppHandler4Base的最主要的函数是handle_event函数,该函数用来处理事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class CWX_API CwxAppHandler4Base{
public:
///定义事件类型
enum{
TIMEOUT_MASK = 0x01,
READ_MASK = 0x02,
...
ALL_EVENTS_MASK = TIMEOUT_MASK| READ_MASK| WRITE_MASK| SIGNAL_MASK| PERSIST_MASK
};
public :
/**
@brief Handler的事件通知回调。
@param [in] event 发生的事件类型,为TIMEOUT_MASK、READ_MASK、WRITE_MASK、SIGNAL_MASK的组合。
@param [in] handle 发生的事件的handle。
@return -1:失败,reactor会主动调用close; 0:成功;
*/
virtual int handle_event(int event, CWX_HANDLE handle=CWX_INVALID_HANDLE)=0;

private:
CwxAppReactor * m_reactor; ///<reactor对象的指针
int m_regType; ///<handler的reactor注册类型
CWX_HANDLE m_handler; ///<事件的io handle
int m_type; ///<event handle type;
CWX_UINT64 m_ullTimeout; ///<超时的时刻
int m_index;
};

CwxAppHandler4Base有一个字段m_handle,即文件描述符fd。其中open函数和close函数用于打开和关闭这个fd。handle_event函数,该函数用来处理事件。

CwxAppHandler4Msg

CwxAppHandler4Base是抽象的EventHandler。Reactor模式中的ConcreateEventHandler 包括CwxAppHandler4Msg等。CwxAppHandler4Msg都是从CwxAppHandler4Base继承的子类。该类能够从fd上读取数据,并生成消息对象,或者将消息从网络上发送出去(发送给fd对应的网络连接)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class CWX_API CwxAppHandler4Msg : public CwxAppHandler4Base
{
public:
/**
@brief 初始化建立的连接,并往Reactor注册连接
@param [in] arg 建立连接的acceptor或为NULL
@return -1:放弃建立的连接; 0:连接建立成功
*/
virtual int open (void * arg= 0);
/**
@brief 接受连接上的事件
@param [in] event 连接的handle上的事件
@param [in] handle 发生的事件的handle。
@return -1:处理失败; 0:处理成功
*/
virtual int handle_event(int event, CWX_HANDLE handle=CWX_INVALID_HANDLE);
///handle close
virtual int close(CWX_HANDLE handle=CWX_INVALID_HANDLE);
///发送消息
virtual int handle_output();
///接收消息
virtual int handle_input();
///超时
virtual void handle_timeout();
private:
///以非阻塞的方式,发送消息。返回值,-1: failure; 0: not send all;1:send a msg
inline int nonBlockSend();
protected:
CwxMsgHead m_header;
CwxAppConnInfo m_conn;///<connection information
CwxMsgBlock* m_curSndingMsg; ///<current sending msg;
CwxMsgBlock* m_waitSendMsgHead; ///<The header for wait to be sent msg.
CwxMsgBlock* m_waitSendMsgTail; ///<The tail for wait to be sent msg.
char m_szHeadBuf[CwxMsgHead::MSG_HEAD_LEN];///<the buf for header
CwxMsgBlock* m_recvMsgData; ///<the recieved msg data
};

CwxAppHandler4Msg的handle_event函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
@brief 接受连接上的到达数据
@param [in] handle 连接的handle
@return -1:关闭的连接; 0:接收数据成功
*/
int CwxAppHandler4Msg::handle_event(int event, CWX_HANDLE)
{
if (CwxAppConnInfo::ESTABLISHED == m_conn.getState()) ///通信状态
{
CWX_ASSERT((event & ~CwxAppHandler4Base::RW_MASK) == 0);
if (CWX_CHECK_ATTR(event, CwxAppHandler4Base::WRITE_MASK))
{
handle_output();
}
if (CWX_CHECK_ATTR(event, CwxAppHandler4Base::READ_MASK))
{
handle_input();
}
}
else if (CwxAppConnInfo::CONNECTING == m_conn.getState()) ///等待连接状态
{
...
}
return 0;
}

代码中m_conn表示连接的属性,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
class CWX_API CwxAppConnInfo
{
CWX_UINT32 m_uiSvrId; ///<svr id
CWX_UINT32 m_uiHostId; ///<host id
CWX_UINT32 m_uiConnId; ///<connection id
CWX_UINT32 m_uiListenId; ///<accept connection's acceptor ID. for passive conn.
CWX_UINT16 m_unState; ///<connection state.
bool m_bActiveConn; ///< sign for active connection.
bool m_bRawData; ///< sign for raw data connection
void* m_pUserData; ///<user dat for connection
CwxAppHandler4Msg* m_pHandler; ///<连接对应的Handler
...
};

m_conn.getState()返回该连接的状态。handle_event函数中,如果连接的状态是ESTABLISHED,则判断事件掩码,针对写事件和读事件分别调用handle_output()函数和handle_input()函数。

以handle_input()为例,其实现如下(其中删除了错误检查/更新统计量等部分代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
int CwxAppHandler4Msg::handle_input()
{
bool bSuspend = false;
if (getApp()->isStopped())
return 0;
ssize_t recv_size = 0;
ssize_t need_size = 0;
int result = 0;
if (this->m_conn.isRawData()) //recv raw data(该数据流是普通的字节流,不是消息对象)
{
bSuspend = false;
result = this->getApp()->onRecvMsg(*this, bSuspend);
return result >= 0 ? 0 : -1;
}
//接收到的数据实际是消息对象
while (1)
{
need_size = CwxMsgHead::MSG_HEAD_LEN - this->m_uiRecvHeadLen;
if (need_size > 0) //接收到的数据还不是一个完整的包
{
recv_size = CwxSocket::recv(getHandle(), this->m_szHeadBuf + this->m_uiRecvHeadLen, need_size);
this->m_uiRecvHeadLen += recv_size;
if (recv_size < need_size)
{
m_conn.setContinueRecvNum(0);
return 0;
}
this->m_szHeadBuf[this->m_uiRecvHeadLen] = 0x00;
if (!m_header.fromNet(this->m_szHeadBuf))
{
CWX_ERROR(("Msg header is error."));
return -1;
}
this->m_recvMsgData = CwxMsgBlockAlloc::malloc(m_header.getDataLen());
} //end if (need_size > 0)
//接收消息内容数据
need_size = m_header.getDataLen() - this->m_uiRecvDataLen;
if (need_size > 0) //还没有接收到该消息内容的完整数据
{
recv_size = CwxSocket::recv(getHandle(), this->m_recvMsgData->wr_ptr(), need_size);
//move write pointer
this->m_recvMsgData->wr_ptr(recv_size);
this->m_uiRecvDataLen += recv_size;
}http://loopjump.com/wp-admin/post-new.php#
//notice recieving a msg.
if (!this->m_recvMsgData)
this->m_recvMsgData = CwxMsgBlockAlloc::malloc(0);
bSuspend = false;
result = this->getApp()->recvMessage(m_header, this->m_recvMsgData, *this, bSuspend);
}
return result;
}

Cwinux中建立的网络连接上的数据有可能是消息或者是raw data。前者(消息)会由Cwinux来进行接收数据并打包成消息,用户得到的就是一个接收好的消息对象。后者(raw data)指的是这个连接上的数据不是消息,用户要自己读取所有的字节数据并处理。这是连接的两种模式,具体使用哪种模式是在用户建立连接时通过参数指定的。另外,代码中**this->getApp()**相关的代码是AppFramework相关的内容,我们会在后面解释这些代码。

handle_input实现过程:首先判断该连接是消息模式还是raw data模式。如果是raw data模式,则调用**result = this->getApp()->onRecvMsg(*this, bSuspend);**函数。该函数会通知用户接收数据(后面详解)。如果是消息模式,则先读取数据,形成一个消息头,然后根据消息头中记录的长度,读取一个完整的消息,并将这个消息放在m_recvMsgData对象中,最后通知用户接收该消息。

handle_output()函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int CwxAppHandler4Msg::handle_output()
{
int result = 0;
CWX_UINT32 uiNum = 0;
bool bCloseConn = false;
bool bReconn = false;
CWX_UINT32 uiReconnDelay = 0;
// The list had better not be empty, otherwise there's a bug!
while (1)
{
while (1)
{
result = this->getNextMsg();
if (0 == result)
return this->cancelWakeup();
if (CWX_CHECK_ATTR(this->m_curSndingMsg->send_ctrl().getMsgAttr(), CwxMsgSendCtrl::BEGIN_NOTICE))
{
if (-1 == this->getApp()->onStartSendMsg(m_curSndingMsg, *this))
{
this->m_conn.setWaitingMsgNum(this->m_conn.getWaitingMsgNum() - 1);
continue; //next msg;
}
} //end if (this->m_curSndingMsg->IsBeginNotice())
//it's a msg which need be sent.
break; //break while
} //end while
result = this->nonBlockSend();
}
return result;
}

首先从以保存的消息列表中取出一个消息,然后调用nonBlockSend函数发送该消息。nonBlockSend调用CwxSocket::write将消息发送出去。

好,先小结一下前面所说的内容:Cwinux中,CwxAppHandler4Base是Reactor模式中的EventHandler,是个父类,它定义了handle_event这样的一个接口。CwxAppHandler4Base派生的子类如CwxAppHandler4Msg在其覆盖(override)的handle_event函数中处理具体的事件。

CwxAppEpoll

CwxAppEpoll是Cwinux中的epoll引擎的封装,同时CwxAppEpoll封装了fd到EventHandle(mask+CwxAppHandler4Base)的映射。

CwxAppEpoll的代码如下(部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class CWX_API CwxAppEpoll
{
public:
CwxAppEpoll(bool bEnableSignal = true);
~CwxAppEpoll();
public:
int init();
/**
@brief 注册IO事件处理handle。
@param [in] handle 监测的IO handle
@param [in] event_handler io handle对应的event handler。
@param [in] mask 注册的事件掩码,为READ_MASK、WRITE_MASK、PERSIST_MASK、TIMEOUT_MASK组合
@param [in] uiMillSecond 多少毫秒超时。0表示没有超时设置。
@param [in] bForkAdd 是否是fork后重新添加。
@return -1:失败;0:成功;
*/
int registerHandler(CWX_HANDLE handle, CwxAppHandler4Base *event_handler, int mask, CWX_UINT32 uiMillSecond = 0);
CwxAppHandler4Base* removeHandler(CWX_HANDLE handle);
int suspendHandler(CWX_HANDLE handle, int suspend_mask);
int resumeHandler(CWX_HANDLE handle, int resume_mask);
int registerSignal(int signum, CwxAppHandler4Base *event_handler);
int removeSignal(CwxAppHandler4Base *event_handler);
CwxAppHandler4Base* removeSignal(int sig);
int scheduleTimer(CwxAppHandler4Base *event_handler, CwxTimeValue const &interval);
int cancelTimer(CwxAppHandler4Base *event_handler);
int forkReinit();
/**
@brief 检测事件。
@param [in] callback 事件的回调函数
@param [in] arg 回调函数的参数
@param [in] uiMiliTimeout 超时的毫秒数,0表示一直阻塞到事件发生。
@return -1:失败;0:成功
*/
int poll(REACTOR_CALLBACK callback, void* arg, CWX_UINT32 uiMiliTimeout = 0);
void stop();
CwxTimeValue const& getCurTime() const;
private:
EventHandle m_eHandler[CWX_APP_MAX_IO_NUM]; ///<epoll的event handler
CwxMinHeap<CwxAppHandler4Base> m_timeHeap; ///<时间堆
};

m_eHandler描述了fd到EventHandle的映射。某个fd上有事件发生,可以查询m_eHandler数组来获得该fd上事件处理handler。registerHandler和resumeHandler等都是维护这个数据结构的函数。另外,CwxAppEpoll除了处理网络连接上的事件之外,还处理了signal的事件和超时事件。超时事件是由m_timeHeap时间堆来维护的。

CwxAppEpoll最重要的函数当然是执行多路分发的poll函数。

poll函数处理了网络连接上的读写等事件、信号signal事件和超时事件(代码经过了简化)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int CwxAppEpoll::poll(REACTOR_CALLBACK callback, void* arg, CWX_UINT32 uiMiliTimeout)
{
/*计算最近一次超时事件还有多长时间发生。*/
timeout(ullTimeout); //从时间堆里获取堆顶元素的最小值。
//计算下次超时还有多长时间,记录在tv中。
num = epoll_wait(m_epfd, m_events, CWX_APP_MAX_IO_NUM, tv);
if (num > 0)
{
for (i = 0; i < num; i++) //epoll的通用写法
{
event = &m_events[i];
if (event->events & EPOLLIN)
mask |= CwxAppHandler4Base::READ_MASK;
if (event->events & EPOLLOUT)
mask |= CwxAppHandler4Base::WRITE_MASK;
handler = m_eHandler[event->data.fd].m_handler; //根据fd得到对应的EventHandle
callback(handler, mask, CWX_CHECK_ATTR(m_eHandler[event->data.fd].m_mask, CwxAppHandler4Base::PERSIST_MASK), arg); //执行回调函数
}
}
//检测处理信号signal事件
if (m_bSignal && m_bEnableSignal)
{
if (m_bStop)
return 0;
m_bSignal = 0;
for (i = 0; i < CWX_APP_MAX_SIGNAL_ID; i++)
{
if (m_arrSignals[i]) //id为i的信号发生信号事件。
{
m_arrSignals[i] = 0; //清空
callback(m_sHandler[i], CwxAppHandler4Base::SIGNAL_MASK, true, arg); //执行回调函数
}
}
}
//检测超时事件
while (!m_timeHeap.isEmpty() && (m_timeHeap.top()->getTimeout() < ullNow))
{
if (m_timeHeap.top()->getHandle() != CWX_INVALID_HANDLE)
{
handler = removeHandler(m_timeHeap.top()->getHandle());
}
else
{
handler = m_timeHeap.pop();
}
callback(handler, CwxAppHandler4Base::TIMEOUT_MASK, false, arg);
}
return 0;
}

代码首先处理计算超时时间,tv记录下次超时事件还有多长时间。

然后调用epoll_wait来查询网络连接上是否有事件发生。epoll_wait会等待tv时间,期间如果某些连接上有事件发生,则epoll_wait返回,num记录了有多少连接上有事件发生。当有事件发生时,针对这些连接上的事件执行对应的handle。具体通过callback来执行。

信号处理类似,某些信号发生时,对应位置被置位。检查所有位置,发生信号的话,就执行对应的处理handler。

超时事件通过最小堆来完成,不断地从堆中取出已经超时的handler,执行对应的处理handler。

好,先小结一下这段内容:CwxAppEpoll是对epoll的封装,同时维护了fd到EventHandle的映射关系。执行poll函数后,能够检测到网络连接上的事件、信号事件和超时事件,并分别处理。

CwxAppReactor

下面看一下Reactor类。

Cwinux中的CwxAppReactor是Reactor模式中Reactor的具体实现。

主要成员变量:

  1. *AppEpoll m_engine;**即一个epoll引擎,通过这个引擎,Reactor监测事件的发生并处理事件。
  2. **m_connId[];**该数组维护了文件描述符fd到 连接ID(connId,整形)的映射。关于connId,当新的连接建立时,由Cwinux指定一个新的整形数字作为新连接的id。参见代码CwxAppTcpAcceptor::makeHander函数。这样Cwinux就可以从fd直接找到对应的connId,同时已知connId能够通过m_connMap这一数据结构得到fd。
  3. **hash_map<ConnIdType,Handler4Base*> m_connMap;*维护了connId到Handler4Base的映射关系。给定一个connId,Cwinux能够得到对应的Handler4Base*,通过Handler4Base的getHandle()函数可以得到对应的文件描述符fd。

主要成员函数包括run和callback函数。

run函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
@brief 架构事件的循环处理API,实现消息的分发。
@return -1:失败;0:正常退出
*/
int CwxAppReactor::run(REACTOR_EVENT_HOOK hook, void* arg, bool bOnce, CWX_UINT32 uiMiliTimeout)
{
int ret = 0;
if (!bOnce)
{
if (!m_bStop || !m_engine)
{
CWX_ERROR(("CwxAppReactor::open() must be invoke before CwxAppReactor::run()"));
return -1;
}
}
m_bStop = false;

do
{
{
///带锁执行event-loop
CwxMutexGuard<CwxMutexLock> lock(&m_lock);
ret = m_engine->poll(CwxAppReactor::callback, this, uiMiliTimeout);
}
if (m_bStop)
{
CWX_DEBUG(("Stop running for stop"));
break;
}
if (0 != ret)
{
if ((-1 == ret) && (EINTR != errno)) {
CWX_ERROR(("Failure to running epoll with -1, errno=%d", errno));
break;
}
}
///调用hook
if (hook)
{
if (0 != hook(arg))
{
CWX_DEBUG(("Stop running for hook() without 0"));
break;
}
}
///等待其他的线程执行各种操作。
m_rwLock.acquire_write();
m_rwLock.release();
}while(!m_bStop && !bOnce);
return ret;
}

简言之,run函数的代码结构是这样的:

1
2
3
4
5
6
7
run()
{
do
{
m_engine->poll(callback);
}while(!stop);
}

run函数的所有核心工作都是有AppEpoll的poll来完成的。

callback函数也很简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void CwxAppReactor::callback(CwxAppHandler4Base* handler, int mask, bool bPersist, void *arg)
{
CwxAppReactor* reactor = (CwxAppReactor*)arg;
if (!bPersist)
{
switch(handler->getRegType())
{
case REG_TYPE_IO:
reactor->m_connId[handler->getHandle()] = CWX_APP_INVALID_CONN_ID;
break;
default:
break;
}
}

int ret = handler->handle_event(mask, handler->getHandle());

if (-1 == ret)
{
handler->close(handler->getHandle());
}
}

简言之,就是执行参数中handler的具体动作(调用handle_event函数)。

总结:

对照Reactor模式,看Cwinux中相应的实现:CwxAppHandler4Base 是Reactor模式中的EventHandler的实现,EventHandle是对CwxAppHandler4Base的封装;CwxAppEpoll是多路复用引擎;CwxAppReactor是对应于Reactor模式中的Reactor的实现。此外,CwxAppReactor通过两个数据结构,维护了fd到connId的映射。

扫描二维码,分享此文章