LoopJump's Blog

Cwinux源码解析1

2014-01-06

Cwinux源码解析(一)

Cwinux源码结构

Cwinux源码包含三个部分:公共代码库cmn,同步网络库net,应用架构库app。

公共代码库cmn:cmn是Cwinux的基础代码库,实现了Cwinux平台的最基础的公共代码,包括线程池、TaskBoard(异步任务中使用)、Commander(负责将消息包发送到对应的处理句柄)、锁、内存池、package(对数据打包解包)、字符过滤、字符集定义等。

同步网络包net:net封装了网络层与传输层的TCP,UDP,Unix-Domain的协议。包括各自的网络地址对象、网络连接对象、网络连接建立对象、网络连接accept对象。net依赖cmn。

应用架构库app:app是Cwinux的Framework库,包含Framework,Channel等。app是Cwinux最终端的库,Cwinux使用者(网络服务开发者)继承app里的相关类,重载所关心的接口函数,实现自己的网络服务业务逻辑。app依赖cmn和net。

公共代码库cmn

1. CwxBitMap位图工具类

CwxBitMap实现了位图的基本操作:setBit将某一位设置为1,clrBit将某一位清零,检查某一位是否为1。CwxBitMap本身没有保存位图的数据,只提供操作函数。位图数据作为参数传递给操作函数。例如:

1
2
3
4
inline static void setBit(void* bitMap, CWX_UINT32 bit)
{
(((char*)bitMap)[bit>>3]) |= (1<<(bit&0x07));
}

CwxBitMap用char数组来保存位图,每个char可以存放8位。置位时,首先跳到第 (bit>>3) 个char,然后将这个char的第 (bit&0x07) 个位置1。另外,参数为void*类型,因此需要强转一次。

ps:淘宝的tbsys的BitMap类是将char数组作为类的成员变量保存在类中,再加上一些统计变量,这样可以快速提供已有多少位被置位,多少位清零。

2. CwxLogger日志类

CwxLogger提供日志功能。

CwxLogger使用了懒汉模式的单实例模式。 CwxLogger日志类的几个主要成员变量包括日志输出级别m_uiLevel(info、debug、warning、error四个级别,可以设置输出全部或某几个级别的日志),日志文件大小和数量参数。CwxLogger还实现了日志文件的切换的循环使用(当前日志文件超过一定大小,切换到新的日志文件,日志文件数目超过一定数目,会重复使用最旧的日志文件)。

不少日志类(CwxLogger,tbsys.log)都使用内置宏__FILE__、__LINE__等来添加更丰富的信息。其中__FILE__指当前文件,__LINE__指当前行号。有了这些信息,使用日志文件时,能更清楚地知道某条日志是哪个源码文件的哪一行打印的。

3. CwxMutexLock,CwxMutexGuard等互斥锁相关类

CwxMutexLock是互斥锁(排它锁)类。这个类主要对pthread库中的互斥锁类型pthread_mutex_t进行了封装,提供了申请锁、释放锁、尝试释放锁等。

CwxMutexGuard(Cwinux注释中称之为策略锁)封装了临界区的进入和退出。这是一个模板类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <class LOCK>
class CwxMutexGuard
{
public:
///构造函数,若lock不为空,则加锁
CwxMutexGuard (LOCK * lock):m_pLock(lock)
{
if (m_pLock) m_pLock->acquire();
}
///析构函数,若锁对象不为空,则解锁
~CwxMutexGuard ()
{
if (m_pLock) m_pLock->release();
}
private:
///禁止不带锁对象的默认构造函数.
CwxMutexGuard (): m_pLock (0) {}
///锁对象的指针
LOCK* m_pLock;
};

使用方式为:

1
2
3
4
5
6
7
8
9
inline void append(CwxAppForkEnv* pForkEnv)
{
CwxMutexGuard<CwxMutexLock> lock(&m_lock);
if (pForkEnv)
{
m_forks.push_back(pForkEnv);
m_bEvent = true;
}
}

该类包含一个LOCK模板对象m_pLock,具体可以是某个类型的锁。在CwxMutexGuard的构造函数中对m_pLock进行加锁,在析构函数中解锁。这样封装后,程序员不再手工调用lock和unlock函数,而是将其交给栈上对象的构造和析构函数负责。CwxMutexGuard的声明周期正好等于临界区。这样就能够保证在同一个函数中同一个scope中对某个互斥锁进行加锁和解锁。避免了在不同函数中、不同条件分支中、try-catch异常流程中加锁和解锁的麻烦。

4. CwxCondition条件变量类

CwxCondition条件变量类对pthread库的条件变量pthread_cond_t进行了封装。封装过程比较简单,主要是对参数和返回值进行了一些处理。

pthread库的条件变量pthread_cond_t用法简述如下。

pthread_cond_t用于在执行一些动作前等待一个条件成立。例如,在使用了pthread_cond_t实现的生产者消费者问题中,生产者放入一个元素前需要等待“环形缓冲区中有空位置(读写指针指向的位置相同)”这一条件成立。

pthread_cond_t使用时需要和一个互斥锁mutex配合使用。mutex用于对条件的读写进行互斥保护。条件变量pthread_cond_t的使用模式比较固定,基本代码结构如下(代码中用cond表示条件变量):

wait端:

1
2
3
4
5
6
mutex.lock();
while(waitCondition){
cond.wait(mutex);
}
mutex.unlock();
Do Something 1...

signal端:

1
2
3
4
mutex.lock();
cond.signal();
mutex.unlock();
Do Something 2...

注:cond.wait(mutex)中会对mutex进行unlock,然后睡眠,等被唤醒后,重新mutex.lock,并从wait函数返回。所以从配对上看,显式的mutex.lock和mutex.unlock不是一对,它们跟wait中的unlock和lock分别是一对。wait中while用于避免虚假唤醒,即被唤醒后其实条件仍不满足(例如其他线程抢到了执行机会,条件又变得不满足了),此时应当继续wait。

CwxCondition类中包含了一个mutex的引用,该引用在CwxCondition的构造函数中被初始化。

5. CwxMsgQueue消息队列类

CwxMsgQueue是一个阻塞的消息队列类,内部是一个CwxMsgBlock类型的消息对象的链表。在CwxMsgQueue中,没有消息为队列空条件,所有消息的大小超过一定预设值为队列满条件。CwxMsgQueue类包含了两个CwxCondition类型(条件变量类型)的成员变量m_notEmptyCond和m_notFullCond,用于表示队列非空和非满两个条件。

队列入队函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
inline int CwxMsgQueue::_waitNotFullCond(CwxTimeValue *timeout)
{
// Wait while the queue is full.
while (this->_isFull())
{
if (this->m_notFullCond.wait(timeout) == -1)
{
return -1;
}
if (this->m_state != ACTIVATED)
{
return -1;
}
}
return 0;
}

///将一个消息放到队列的头部。-1:失败;>=0:队列中的消息数量
int CwxMsgQueue::enqueue (CwxMsgBlock *msg,CwxTimeValue *timeout)
{
int queue_count = 0;
{
CwxMutexGuard<CwxMutexLock> guard(&this->m_lock);
if (DEACTIVATED == this->m_state)
{
return -1;
}
if (this->_waitNotFullCond(timeout) == -1)
return -1;
if (!msg) return -1;
m_curCount ++;
m_curLength += msg->length();
m_curMsgBytes += msg->capacity();
msg->m_next = m_head;
m_head = msg;
if (!m_tail) m_tail = m_head;
if (1 == m_curCount)
{
if (-1 == m_notEmptyCond.signal()) return -1;
}
queue_count = m_curCount;
}
return queue_count;
}

_waitNotFullCond中封装了条件变量m_notFullCond.wait操作。

enqueue函数中首先通过guard对this->m_lock加锁,然后调用_waitNotFullCond执行条件变量的wait。wait如果返回表示条件满足,开始执行具体的入队列动作,包括将新的消息对象串到链表中,修改统计信息等。如果这是第一个消息,也就是说队列从空变为不空,则调用m_notEmptyCond.signal()来唤醒等待队列不空这一条件的线程(这些线程在执行m_notEmptyCond.wait时,因为当时队列空而阻塞睡眠了)。

注意到,该代码没有显示表达出CwxCondition条件变量类中描述的“条件变量需要和一个互斥锁配合使用”的用法。这是因为CwxCondition类在内部包含了一个mutex的引用。例如,m_notFullCond操作在初始化时,它的成员变量mutex&被初始化,后续m_notFullCond执行wait时,wait实现中会对该mutex加锁。

队列出队函数实现与入队相似。

6. CwxThread线程类

CwxThread是对pthread线程的封装,注意到CwxThread类中包含一个CwxMsgQueue类型的对象。换言之,每个线程都能访问到某个消息队列。至于这个消息对象的来源,从CwxThread的构造函数可以看出,创建线程时可以指定一个已有的CwxMsgQueue对象作为该线程的消息队列,如果不指定,则构造函数负责创建一个新的消息队列。

spawn函数用来生成一个线程。该函数主要是进行pthread相关的设置,最后调用pthread_create函数创建线程。

start函数用来启动一个线程。start函数调用spawn函数,并将该线程将要执行的函数设置为threadFunc函数(函数指针作为参数传递给spawn函数)。threadFunc函数实际执行threadMain函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void CwxThread::threadMain() {
/*
* 注册并设置 tss的相关代码。(略)
*/
m_pTssEnv->getThreadInfo().setThreadNo(m_unThreadId);

/*
* 线程实际执行代码
*/
do {
if (m_func)
{
//指定了线程执行的函数,则执行该函数
m_func(m_pTssEnv, m_msgQueue, m_arg);
}
else if (m_commander)
{
//未指定线程执行的函数,则通过Commander执行。
while( (iRet = this->pop(block)) != -1) //block until has query message
{
if (!m_commander->dispatch(block, m_pTssEnv, iRet))
{
CWX_DEBUG("No handle to deal with event");
}
if (1 != iRet)
{
CWX_DEBUG("iRet = ???");
}
if (block)
CwxMsgBlockAlloc::free(block);
}
} else {
CWX_ERROR("Thread has neither commander nor own func, exit.");
}
}while(0);
}

threadMain首先进行必要的TSS设置(TSS指局部于线程的变量 thread specific storage),然后开始实际的工作。threadMain依据构造函数传递的参数不同,有两种工作方式。第一种是执行已指定的某个函数,这种方式比较简单,与未封装的pthread的用法基本一致。第二种是通过Commander这种机制来完成具体的功能。这种方式比较复杂,后面会有详细的介绍。这里先简单介绍一下。每个消息都有自己的类型, 每种类型的消息都有一个预设的操作对象来处理这个消息。Commander可以被视为类型到操作对象的映射关系管理者。当threadMain按第二种方式工作时,线程不断地从消息队列中取出消息,然后交给Commander处理。

7. CwxThreadPool,CwxThreadPoolEx线程池

线程池的大致思路是:在服务启动时预先创建一定数目的线程,这些线程循环执行一些具体的工作,这样就避免了在每次处理你不同请求时进行创建线程和销毁线程。

Cwinux提供了两种线程池CwxThreadPool和CwxThreadPoolEx。回顾CwxThread类的实现,每个线程都能访问到某个消息队列。CwxThreadPool和CwxThreadPoolEx的区别如图示。

tp1

CwxThreadPool的所有线程共享一个消息队列。新的消息会追加到该消息队列。CwxThreadPoolEx的每个线程都有自己的消息队列,每个线程都只能从自己的消息队列中取出消息来处理。新的消息追加时需要制定具体追加到哪一个消息队列中。

线程池在创建并启动后,池中的所有线程会并发执行,从消息队列中读取消息,并依据线程的配置执行(参见CwxThread类的两种执行方式)。

扫描二维码,分享此文章