1. echo服务的实现
muduo的使用非常简单,不需要从指定的类派生,也不用覆写虚函数,只需要注册几个回调函数去处理"三个半的事件"就行了。
以经典的echo回显服务为例,定义EchoServer class,不需要派生自任何基类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#ifndef MUDUO_EXAMPLES_SIMPLE_ECHO_ECHO_H
#define MUDUO_EXAMPLES_SIMPLE_ECHO_ECHO_H
#include "TcpServer.h"
// RFC 862
class EchoServer {
public:
EchoServer(muduo::net::EventLoop *loop,
const muduo::net::InetAddress &listenAddr);
void start(); // calls server_.start();
private:
void onConnection(const muduo::net::TcpConnectionPtr &conn);
void onMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buf, muduo::Timestamp time);
muduo::net::TcpServer server_;
};
#endif // MUDUO_EXAMPLES_SIMPLE_ECHO_ECHO_H
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
#include "echo.h"
#include "Logging.h"
using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
// using namespace muduo;
// using namespace muduo::net;
EchoServer::EchoServer(muduo::net::EventLoop *loop,
const muduo::net::InetAddress &listenAddr)
: server_(loop, listenAddr, "EchoServer") {
server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this, _1, _2, _3));
}
void EchoServer::start() { server_.start(); }
void EchoServer::onConnection(const muduo::net::TcpConnectionPtr &conn) {
LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}
void EchoServer::onMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buf, muduo::Timestamp time) {
muduo::string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
<< "data received at " << time.toFormattedString();
conn->send(msg);
}
|
首先EchoServer会在构造函数内注册回调函数:
1
2
3
|
server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this, _1, _2, _3));
|
EchoServer::onMessage函数是echo服务的"业务逻辑":把收到的数据原封不动地发回客户端。注意我们不用担心conn->send(msg)是否完整地发送了数据,因为muduo网络库会帮我们管理发送缓冲区。
EchoServer::onConnection和EchoServer::onMessage这两个函数体现了"基于事件编程"的经典做法,即程序主体是被动等待事件发生,事件发生之后网络库会调用(回调的方式)事先注册的事件处理函数(event handler)。
在EchoServer::onConnection函数中,conn参数是TcpConnection对象的shared_ptr,TcpConnection::connected函数返回一个bool值,表明目前连接是建立还是断开,TcpConnection 的 peerAddress() 和 localAddress()成员函数分别返回对方和本地的地址(以InetAddress 对象表示的IP和port)。
在onMessage()函数中,conn 参数是收到数据的那个TCP 连接;buf 是已经收到的数据,buf 的数据会累积,直到用户从中取走 (retrieve )数。注意 buf是指针,表明用户代码可以修改 (消费) buffer; time 是收到数据的确切时间,即epoll_wait(2)返回的时间,注意这个时间通常比 read(2)发生的时间略早,可以用于正确测量程序的消息处理延迟。另外,Timestamp 对象采用 pass-by-value,而不是pass-by-(const)reference,这是有意的,因为在 x86-64 上可以直接通过寄存器传参。
在main() 里用 EventLoop 让整个程序跑起来:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#include "echo.h"
#include "EventLoop.h"
#include "Logging.h"
#include <unistd.h>
// using namespace muduo;
// using namespace muduo::net;
int main() {
LOG_INFO << "pid = " << getpid();
muduo::net::EventLoop loop;
muduo::net::InetAddress listenAddr(2007);
EchoServer server(&loop, listenAddr);
server.start();
loop.loop();
}
|
2. 七步实现finger服务
2.1 拒绝连接
什么都不做,程序空等。
1
2
3
4
5
6
7
8
9
|
#include "EventLoop.h"
using namespace muduo;
using namespace muduo::net;
int main() {
EventLoop loop;
loop.loop();
}
|
2.2 接受新连接
在1079端口侦听新连接,接受连接之后什么都不做,程序空muduo会自动丢弃收到的数据。
1
2
3
4
5
6
7
8
9
10
11
12
|
#include "EventLoop.h"
#include "TcpServer.h"
using namespace muduo;
using namespace muduo::net;
int main() {
EventLoop loop;
TcpServer server(&loop, InetAddress(1079), "Finger");
server.start();
loop.loop();
}
|
2.3 主动断开连接
接受新连接之后主动断开。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#include "EventLoop.h"
#include "TcpServer.h"
using namespace muduo;
using namespace muduo::net;
void onConnection(const TcpConnectionPtr &conn) {
if (conn->connected()) {
conn->shutdown();
}
}
int main() {
EventLoop loop;
TcpServer server(&loop, InetAddress(1079), "Finger");
server.setConnectionCallback(onConnection);
server.start();
loop.loop();
}
|
2.4 读取用户名,然后断开连接
如果读到一行以\r\n 结尾的消息,就开连接。注意这段代码有安全问题,如果恶意客户端不断发送数据而不换行,会撑爆服务端的内存。另外,Buffer::findCRLF()是线性查找,如果客户端每次发一个字节,服务端的时间复杂度为 O(N2),会消耗 CPU资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#include "EventLoop.h"
#include "TcpServer.h"
using namespace muduo;
using namespace muduo::net;
void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
Timestamp receiveTime) {
if (buf->findCRLF()) {
conn->shutdown();
}
}
int main() {
EventLoop loop;
TcpServer server(&loop, InetAddress(1079), "Finger");
server.setMessageCallback(onMessage);
server.start();
loop.loop();
}
|
2.5 读取用户名、输出错误信息,然后断开连接
如果读到一行以\r\n 结尾的消息,就发送一条出错信息,然后断开连接。安全问题同上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#include "EventLoop.h"
#include "TcpServer.h"
using namespace muduo;
using namespace muduo::net;
void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
Timestamp receiveTime) {
if (buf->findCRLF()) {
conn->send("No such user\r\n");
conn->shutdown();
}
}
int main() {
EventLoop loop;
TcpServer server(&loop, InetAddress(1079), "Finger");
server.setMessageCallback(onMessage);
server.start();
loop.loop();
}
|
2.6 从空的UserMap 里查找用户
从一行消息中拿到用户名(user)在 UserMap里查找,然后返回结果。安全问题同上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
#include "EventLoop.h"
#include "TcpServer.h"
#include <map>
using namespace muduo;
using namespace muduo::net;
typedef std::map<string, string> UserMap;
UserMap users;
string getUser(const string &user) {
string result = "No such user";
UserMap::iterator it = users.find(user);
if (it != users.end()) {
result = it->second;
}
return result;
}
void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
Timestamp receiveTime) {
const char *crlf = buf->findCRLF();
if (crlf) {
string user(buf->peek(), crlf);
conn->send(getUser(user) + "\r\n");
buf->retrieveUntil(crlf + 2);
conn->shutdown();
}
}
int main() {
EventLoop loop;
TcpServer server(&loop, InetAddress(1079), "Finger");
server.setMessageCallback(onMessage);
server.start();
loop.loop();
}
|
2.7 往UserMap里添加一个用户
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
#include "EventLoop.h"
#include "TcpServer.h"
#include <map>
using namespace muduo;
using namespace muduo::net;
typedef std::map<string, string> UserMap;
UserMap users;
string getUser(const string &user) {
string result = "No such user";
UserMap::iterator it = users.find(user);
if (it != users.end()) {
result = it->second;
}
return result;
}
void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
Timestamp receiveTime) {
const char *crlf = buf->findCRLF();
if (crlf) {
string user(buf->peek(), crlf);
conn->send(getUser(user) + "\r\n");
buf->retrieveUntil(crlf + 2);
conn->shutdown();
}
}
int main() {
users.insert({"cjt", "super vip"});
EventLoop loop;
TcpServer server(&loop, InetAddress(1079), "Finger");
server.setMessageCallback(onMessage);
server.start();
loop.loop();
}
|
3. 五个简单TCP示例
本节将介绍五个简单TCP 网络服务程序,包括 echo(RFC 862)、discard (RFC863)、chargen (RFC 864)、daytime (RFC 867)、time (RFC 868)这五个协议,以及 time 协议的客户端。各程序的协议简介如下:
- discard:丢弃所有收到的数据。
- daytime:服务端accept 连接之后,以字符串形式发送当前时间,然后主动断开连接。
- time:服务端accept 连接之后,以二进制形式发送当前时间 (从Epoch 到现在的秒数),然后主动断开连接;我们需要一个客户程序来把收到的时间转换为字符串。
- echo:回显服务,把收到的数据发回客户端。
- chargen:服务端accept 连接之后,不停地发送测试数据。
以上五个协议使用不同的端口,可以放到同一个进程中实现,且不必使用多线程。
3.1 discard
discard 恐怕算是最简单的长连接 TCP 应用层协议,它只需要关注"三个半事件"中的"消息/数据到达"事件,事件处理函数如下:
1
2
3
4
5
6
|
void DiscardServer::onMessage(const TcpConnectionPtr &conn, Buffer *buf,
Timestamp time) {
string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << " discards " << msg.size()
<< " bytes received at " << time.toString();
}
|
3.2 daytime
daytime 是短连接协议,在发送完当前时间后,由服务端主动断开连接。它只需要关注"三个半事件"中的"连接已建立"事件,事件处理函数如下:
1
2
3
4
5
6
7
8
9
|
void DaytimeServer::onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << "DaytimeServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected()) {
conn->send(Timestamp::now().toFormattedString() + "\n");
conn->shutdown();
}
}
|
如果连接已建立,发送时间字符串,主动断开连接。
用netcat扮演客户端,运行结果如下:
1
2
|
$ nc 127.0.0.1 2013
20221224 15:43:38.226748 # 服务器返回的事件字符串,UTC 时区
|
3.3 time
time 协议与 daytime极为类似,只不过它返回的不是日期时间字符串,而是一个32-bit 整数,表示从1970-01-01 00:00:00Z 到现在的秒数。当然,这个协议有"2038年问题"。服务端只需要关注"三个半事件"中的"连接已建立"事件,事件处理函数如下:
1
2
3
4
5
6
7
8
9
10
11
|
void TimeServer::onConnection(const muduo::net::TcpConnectionPtr &conn) {
LOG_INFO << "TimeServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected()) {
time_t now = ::time(NULL);
int32_t be32 = sockets::hostToNetwork32(static_cast<int32_t>(now));
conn->send(&be32, sizeof be32);
conn->shutdown();
}
}
|
当连接已建立,取当前时间并转换为网络字节序(Big Endian),发送32-bit整数后主动断开连接。
安装hexdump:
1
2
3
4
5
|
# Step 1: Update system
sudo apt-get update
# Step 2: Install: libdata-hexdump-perl Architecture: all Version: 0.02-1
sudo apt-get install libdata-hexdump-perl
|
可以使用netcat 扮演客户端,并用hexdump来打印二进制数据:
1
2
|
# 暂不清楚什么原因,没效果
nc 127.0.0.1 2037 | hexdump -C
|
time客户端:
因为 time 服务端发送的是二进制数据,不便直接阅读,我们编写一个客户端来解析并打印收到的4个字节数据。这个程序只需要关注"三个半事件"中的"消息/数据到达"事件,事件处理函数如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
Timestamp receiveTime) {
if (buf->readableBytes() >= sizeof(int32_t)) {
const void *data = buf->peek();
int32_t be32 = *static_cast<const int32_t *>(data);
buf->retrieve(sizeof(int32_t));
time_t time = sockets::networkToHost32(be32);
Timestamp ts(implicit_cast<uint64_t>(time) *
Timestamp::kMicroSecondsPerSecond);
LOG_INFO << "Server time = " << time << ", " << ts.toFormattedString();
} else {
LOG_INFO << conn->name() << " no enough data " << buf->readableBytes()
<< " at " << receiveTime.toFormattedString();
}
}
|
注意其中考虑到了如果数据没有一次性收全,已经收到的数据会累积在 Buffer里(在else 分支里没有调用 Buffer::retrieve* 系列函数),以等待后续数据到达程序也不会阻寨。这样即便服务器一个字节一个字节地发送数据,代码还是能正常工作,这也是非阻塞网络编程必须在用户态使用接收缓冲的主要原因。
完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
class TimeClient : boost::noncopyable {
public:
TimeClient(EventLoop *loop, const InetAddress &serverAddr)
: loop_(loop), client_(loop, serverAddr, "TimeClient") {
client_.setConnectionCallback(
std::bind(&TimeClient::onConnection, this, _1));
client_.setMessageCallback(
std::bind(&TimeClient::onMessage, this, _1, _2, _3));
// client_.enableRetry();
}
void connect() { client_.connect(); }
private:
EventLoop *loop_;
TcpClient client_;
void onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (!conn->connected()) {
loop_->quit();
}
}
void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
Timestamp receiveTime) {
if (buf->readableBytes() >= sizeof(int32_t)) {
const void *data = buf->peek();
int32_t be32 = *static_cast<const int32_t *>(data);
buf->retrieve(sizeof(int32_t));
time_t time = sockets::networkToHost32(be32);
Timestamp ts(implicit_cast<uint64_t>(time) *
Timestamp::kMicroSecondsPerSecond);
LOG_INFO << "Server time = " << time << ", " << ts.toFormattedString();
} else {
LOG_INFO << conn->name() << " no enough data " << buf->readableBytes()
<< " at " << receiveTime.toFormattedString();
}
}
};
int main(int argc, char *argv[]) {
LOG_INFO << "pid = " << getpid();
if (argc > 1) {
EventLoop loop;
InetAddress serverAddr(argv[1], 2037);
TimeClient timeClient(&loop, serverAddr);
timeClient.connect();
loop.loop();
} else {
printf("Usage: %s host_ip\n", argv[0]);
}
}
|
如果连接断开了,就退出事件循环,程序也就结束了。
注意 TcpConnection 对象表示"一次"TCP 连接,连接断开之后不能重建Tcpclient 重试之后新建的连接会是另一个 TcpConnection 对象。
3.4 echo
前面几个协议都是单向接收或发送数据,echo 是我们遇到的第一个双向的协议:服务端把客户端发过来的数据原封不动地传回去。它只需要关注"三个半事件"中的"消息/数据到达"事件,事件处理函数已在前面列出,这里复制一遍。
1
2
3
4
5
6
7
|
void EchoServer::onMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buf, muduo::Timestamp time) {
muduo::string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
<< "data received at " << time.toFormattedString();
conn->send(msg);
}
|
这段代码实现的不是行回显 (line echo)服务,而是有一点数据就发送一点数据。这样可以避免客户端恶意地不发送换行字符,而服务端又必须缓存已经收到的数据,导致服务器内存暴涨。但这个程序还是有一个安全漏洞,即如果客户端故意不断发送数据,但从不接收,那么服务端的发送缓冲区会一直堆积,导致内存暴涨。解决办法可以参考下面的 chargen 协议,或者在发送缓冲区累积到一定大小时主动断开连接。一般来说,非阻塞网络编程中正确处理数据发送比接收数据要困难,因为要应对对方接收缓慢的情况。
3.5 chargen
Chargen 协议很特殊,它只发送数据,不接收数据。而且,它发送数据的速度不能快过客户端接收的速度,因此需要关注"三个半事件"中的半个"消息/数据发送完毕"事件(onWriteComplete),事件处理函数如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
void ChargenServer::onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << "ChargenServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected()) {
conn->setTcpNoDelay(true);
conn->send(message_);
}
}
void ChargenServer::onMessage(const TcpConnectionPtr &conn, Buffer *buf,
Timestamp time) {
string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << " discards " << msg.size()
<< " bytes received at " << time.toString();
}
void ChargenServer::onWriteComplete(const TcpConnectionPtr &conn) {
transferred_ += message_.size();
conn->send(message_);
}
|
在连接建立时发送一次数据,在数据发送完成事件触发时再发送一次数据。
3.6 五合一
前面五个程序都用到了 EventLoop。这其实是个 Reactor,用于注册和分发IO事件。muduo 遵循 one loop per thread 模型,多个服务端(TcpServer)和客户端(Tcpclient)可以共享同一个EventLoop,也可以分配到多个EventLoop 上以发挥多核多线程的好处。这里我们把五个服务端用同一个EventLoop 跑起来,程序还是单线程的,功能却强大了很多:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
int main() {
LOG_INFO << "pid = " << getpid();
EventLoop loop; // one loop shared by multiple servers
ChargenServer chargenServer(&loop, InetAddress(2019));
chargenServer.start();
DaytimeServer daytimeServer(&loop, InetAddress(2013));
daytimeServer.start();
DiscardServer discardServer(&loop, InetAddress(2009));
discardServer.start();
EchoServer echoServer(&loop, InetAddress(2007));
echoServer.start();
TimeServer timeServer(&loop, InetAddress(2037));
timeServer.start();
loop.loop();
}
|
这个例子充分展示了 Reactor 模式复用线程的能力,让一个单线程程序同时具备多个网络服务功能。一个容易想到的例子是 httpd 同时侦听 80 端口和43 端口,另一个例子是程序中有多个 Tcpclient,分别和数据库、Redis、Sudoku Solver 等后台服务打交道。对于初次接触这种编程模型的读者,值得跟踪代码运行的详细过程,弄清楚每个事件每个回调发生的时机与条件。
以上几个协议的消息格式都非常简单,没有涉及 TCP 网络编程中常见的分包处理,在后文 Boost.Asio 的聊天服务器时我们再来讨论这个问题。
4. 文件传输
本节用发送文件的例子来说明 TcpConnection::send()的使用。到目前为止我们用到了 TcpConnection::send() 的两个重载,分别是 send(const string&)和send(const void* message, size_t len)。
TcpConnection 目前提供了三个 send() 重载函数,原型如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
///
/// TCP connection, for both client and server usage.
///
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection> {
public:
void send(const void *message, size_t len);
void send(const StringPiece &message);
void send(Buffer *message); // this one will swap data without copying
// void send(string&& message); // C++11
// void send(Buffer&& message); // C++11
}
|
在非阻塞网络编程中,发送消息通常是由网络库完成的,用户代码不会直接调用write(2)或 send(2) 等系统调用。原因是"Tcponnection 必须要有 outputbuffer"。在使用 TpConnection::send() 时值得注意的有点:
- send() 的返回类型是 void,意味着用户不必关心调用 send() 时成功发送了多少字节,muduo 库会保证把数据发送给对方。
- send() 是非阻塞的。意味着客户代码只管把一条消息准备好,调用 send()来发送,即便 TCP 的发送窗口满了,也绝对不会阻塞当前调用线程。
- send() 是线程安全、原子的。多个线程可以同时调用 send(),消息之间不会混叠或交织。但是多个线程同时发送的消息的先后顺序是不确定的,muduo只能保证每个消息本身的完整性。另外,send()在多线程下仍然是非阻塞的。
- send(const void* message,size_t en)这个重载最平淡无奇,可以发送任意字节序列。
- send(const StringPiece& message)这个重载可以发送std::string利constchar*,其中StringPiece是Google 发明的专门用于传递字符串参数的 class,这样程序里就不必为 const char* 和 const std::string& 提供两份重载了。
- send(Buffer*) 有点特殊,它以指针为参数,而不是常见的 const 引用,因为函数中可能用 Buffer::swap()来高效地交换数据,避免内存拷贝,起到类似C++ 右值引用的效果。
- 如果将来支持 C++11,那么可以增加对右值引用的重载,这样可以用 move 语义来避免内存拷贝。
下面我们来实现一个发送文件的命令行小工具,这个工具的协议很简单,在启动时通过命令行参数指定要发送的文件,然后在 2021 端口侦听,每当有新连接进来就把文件内容完整地发送给对方。
如果不考虑并发,那么这个功能用 netcat 加重定向就能实现。这里展示的版本更加健壮,比方说发送 100MB 的文件,支持上万个并发客户连接;内存消耗只与并发连接数有关,跟文件大小无关;任何连接可以在任何时候断开,程序不会有内存泄漏或崩溃。
一共写了三个版本:
- 一次性把文件读入内存,一次性调用 send(const string&)发送完毕。这个版本满足除了"内存消耗只与并发连接数有关,跟文件大小无关"之外的健壮性要求。
- 一块一块地发送文件,减少内存使用,用到了 writeCompleteCallback。这个版本满足了上述全部健壮性要求。
- 同2,但是采用 shared_ptr 来管理 FILE*,避免手动调用 ::fclose(3)。
4.1 版本1
在建立好连接之后,把文件的全部内容读入一个 string,一次性调用 TcpConnection::send() 发送。不用担心文件发送不完整。也不用担心 send() 之后立刻shutdown()会有什么问题,见下一节的说明。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
const char *g_file = NULL;
// FIXME: use FileUtil::readFile()
string readFile(const char *filename) {
string content;
FILE *fp = ::fopen(filename, "rb");
if (fp) {
// inefficient!!!
const int kBufSize = 1024 * 1024;
char iobuf[kBufSize];
::setbuffer(fp, iobuf, sizeof iobuf);
char buf[kBufSize];
size_t nread = 0;
while ((nread = ::fread(buf, 1, sizeof buf, fp)) > 0) {
content.append(buf, nread);
}
::fclose(fp);
}
return content;
}
void onHighWaterMark(const TcpConnectionPtr &conn, size_t len) {
LOG_INFO << "HighWaterMark " << len;
}
void onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << "FileServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected()) {
LOG_INFO << "FileServer - Sending file " << g_file << " to "
<< conn->peerAddress().toIpPort();
conn->setHighWaterMarkCallback(onHighWaterMark, 64 * 1024);
string fileContent = readFile(g_file);
conn->send(fileContent);
conn->shutdown();
LOG_INFO << "FileServer - done";
}
}
int main(int argc, char *argv[]) {
LOG_INFO << "pid = " << getpid();
if (argc > 1) {
g_file = argv[1];
EventLoop loop;
InetAddress listenAddr(2021);
TcpServer server(&loop, listenAddr, "FileServer");
server.setConnectionCallback(onConnection);
server.start();
loop.loop();
} else {
fprintf(stderr, "Usage: %s file_for_downloading\n", argv[0]);
}
}
|
注意每次建立连接的时候我们都去重新读一遍文件,这是考虑到文件有可能被其他程序修改。如果文件是 immutable 的,整个程序就可以共享同一个 fileContent对象。
这个版本有一个明显的缺陷,即内存消耗与(并发连接数 文件大小)成正比文件越大内存消耗越多,如果文件大小上 GB,那几乎就是灾难了。只需要建立少量并发连接就能把服务器的内存耗尽,因此我们有了版本2。
4.2 版本2
为了解决版本一占用内存过多的问题,我们采用流水线的思路,当新建连接时,先发送文件的前 64KiB 数据,等这块数据发送完毕时再继续发送下64KiB 数据,如此往复直到文件内容全部发送完毕。代码中使用了 TcpConnection::setContext()和getContext()来保存 Tcponnection 的用户上下文(这里是 FILE*),因此不必使用额外的std::map<TcponnectionPtr,FILE*>来记住每个连接的当前文件位置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
void onHighWaterMark(const TcpConnectionPtr &conn, size_t len) {
LOG_INFO << "HighWaterMark " << len;
}
const int kBufSize = 64 * 1024;
const char *g_file = NULL;
void onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << "FileServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected()) {
LOG_INFO << "FileServer - Sending file " << g_file << " to "
<< conn->peerAddress().toIpPort();
conn->setHighWaterMarkCallback(onHighWaterMark, kBufSize + 1);
FILE *fp = ::fopen(g_file, "rb");
if (fp) {
conn->setContext(fp);
char buf[kBufSize];
size_t nread = ::fread(buf, 1, sizeof buf, fp);
conn->send(buf, static_cast<int>(nread));
} else {
conn->shutdown();
LOG_INFO << "FileServer - no such file";
}
} else {
if (!conn->getContext().empty()) {
FILE *fp = boost::any_cast<FILE *>(conn->getContext());
if (fp) {
::fclose(fp);
}
}
}
}
void onWriteComplete(const TcpConnectionPtr &conn) {
FILE *fp = boost::any_cast<FILE *>(conn->getContext());
char buf[kBufSize];
size_t nread = ::fread(buf, 1, sizeof buf, fp);
if (nread > 0) {
// 会触发onWriteComplete回调函数
conn->send(buf, static_cast<int>(nread));
} else {
::fclose(fp);
fp = NULL;
conn->setContext(fp);
conn->shutdown();
LOG_INFO << "FileServer - done";
}
}
int main(int argc, char *argv[]) {
LOG_INFO << "pid = " << getpid();
if (argc > 1) {
g_file = argv[1];
EventLoop loop;
InetAddress listenAddr(2021);
TcpServer server(&loop, listenAddr, "FileServer");
server.setConnectionCallback(onConnection);
server.setWriteCompleteCallback(onWriteComplete);
server.start();
loop.loop();
} else {
fprintf(stderr, "Usage: %s file_for_downloading\n", argv[0]);
}
}
|
在onWriteComplete()回调函数中读取下一块文件数据,继续发送。
注意每次建立连接的时候我们都去重新打开那个文件,使得程序中文件描述符的数量翻倍(每个连接占一个 socket fd 和一个 file fd ),这是考到文件有可能被其他程序修改。如果文件是immutable 的,一种改进措施是:整个程序可以共享同一个文件描述符,然后每个连接记住自已当前的偏移量,在onWriteComplete() 回调函数里用 pread(2)来读取数据。
这个版本也存在一个问题,如果客户端故意只发起连接,不接收数据,那么要么把服务器进程的文件描述符耗尽,要么占用很多服务端内存(因为每个连接有 64KiB的发送缓冲区)。解决办法可参考后文"限制服器的最并发连接数"和"用timing wheel踢掉空闲连接"。必须说明的是,muduo并不是设计来编写面向公网的网络服务程序,这种服务程序需要在安全性方面下很多工夫,我个人对此不在行,我更关心实现内网(不一定是局域网 )的高效服务程序。
4.3 版本3
用 shared_ptr的custom deleter 来减轻资源管理负担,使得 FILE* 的生命期和TcpConnection 一样长,代码也更简单了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
void onHighWaterMark(const TcpConnectionPtr &conn, size_t len) {
LOG_INFO << "HighWaterMark " << len;
}
const int kBufSize = 64 * 1024;
const char *g_file = NULL;
typedef std::shared_ptr<FILE> FilePtr;
void onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << "FileServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected()) {
LOG_INFO << "FileServer - Sending file " << g_file << " to "
<< conn->peerAddress().toIpPort();
conn->setHighWaterMarkCallback(onHighWaterMark, kBufSize + 1);
FILE *fp = ::fopen(g_file, "rb");
if (fp) {
FilePtr ctx(fp, ::fclose);
conn->setContext(ctx);
char buf[kBufSize];
size_t nread = ::fread(buf, 1, sizeof buf, fp);
conn->send(buf, static_cast<int>(nread));
} else {
conn->shutdown();
LOG_INFO << "FileServer - no such file";
}
}
}
void onWriteComplete(const TcpConnectionPtr &conn) {
const FilePtr &fp = boost::any_cast<const FilePtr &>(conn->getContext());
char buf[kBufSize];
size_t nread = ::fread(buf, 1, sizeof buf, boost::get_pointer(fp));
if (nread > 0) {
conn->send(buf, static_cast<int>(nread));
} else {
conn->shutdown();
LOG_INFO << "FileServer - done";
}
}
int main(int argc, char *argv[]) {
LOG_INFO << "pid = " << getpid();
if (argc > 1) {
g_file = argv[1];
EventLoop loop;
InetAddress listenAddr(2021);
TcpServer server(&loop, listenAddr, "FileServer");
server.setConnectionCallback(onConnection);
server.setWriteCompleteCallback(onWriteComplete);
server.start();
loop.loop();
} else {
fprintf(stderr, "Usage: %s file_for_downloading\n", argv[0]);
}
}
|
以上代码体现了现代 C++ 的资源管理思路,即无须手动释放资源,而是通过将资源与对象生命期绑定,在对象析构的时候自动释放资源,从而把资源管理转换为对象生命期管理,而后者是早已解决了的问题。这正是 C++ 最重要的编程技法:RAII。
4.4 为什么TcpConnection::shutdown()没有直接关闭TCP连接?
“在simple的daytime示例中,服务端主动关闭时调用的是如下函数序列,这不是只是关闭了连接上的写操作吗,怎么是关闭了整个连接?”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
void DaytimeServer::onConnection(const TcpConnectionPtr &conn) {
if (conn->connected()) {
conn->send(Timestamp::now().toFormattedString() + "\n");
conn->shutdown();
}
}
void TcpConnection::shutdown() {
// FIXME: use compare and swap
if (state_ == kConnected) {
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
}
}
void TcpConnection::shutdownInLoop() {
loop_->assertInLoopThread();
// 如果当前没有发送数据
if (!channel_->isWriting()) {
// we are not writing
socket_->shutdownWrite();
}
}
void Socket::shutdownWrite() { sockets::shutdownWrite(sockfd_); }
// 只关闭写的这一半,进入半关闭状态(close SHUT_WR)
void sockets::shutdownWrite(int sockfd) {
if (::shutdown(sockfd, SHUT_WR) < 0) {
LOG_SYSERR << "sockets::shutdownWrite";
}
}
|
muduo TcpConnection 没有提供 close(),而只提供 shutdown(),这么做是为了收发数据的完整性。
TCP 是一个全双工协议,同一个文件描述符既可读又可写,shutdownWrite()关闭了"写"方向的连接,保留了"读"方向,这称为 TCP half-close。如果直接close(socket_fd),那么 socket_fd 就不能读或写了。
用 shutdown 而不用close 的效果是,如果对方已经发送了数据,这些数据还"在路上",那么muduo 不会漏收这些数据。换句话说,muduo 在 TCP这一层面解决了"当你打算关闭网络连接的时候,如何得知对方是否发了一些数据而你还没有收到?这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据就可以直接断开连接。
也就是说 muduo 把"主动关闭连接"这件事情分成两步来做,如果要主动关闭连接,它会先关本地"写"端,等对方关闭之后,再关本地"读"端。
另外,如果当前 output buffer 里还有数据尚未发出的话,muduo 也不会立刻调用 shutdownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// 内核发送缓冲区有空间了,回调该函数
void TcpConnection::handleWrite() {
loop_->assertInLoopThread();
if (channel_->isWriting()) {
ssize_t n = sockets::write(channel_->fd(), outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) // 发送缓冲区已清空
{
channel_->disableWriting(); // 停止关注POLLOUT事件,以免出现busy loop
if (writeCompleteCallback_) // 回调writeCompleteCallback_
{
// 应用层发送缓冲区被清空,就回调用writeCompleteCallback_
loop_->queueInLoop(
boost::bind(writeCompleteCallback_, shared_from_this()));
}
// 发送缓冲区已清空并且连接状态是kDisconnecting,要关闭连接
if (state_ == kDisconnecting) {
shutdownInLoop(); // 只关闭写的这一半,进入半关闭状态(close SHUT_WR)
}
} else {
LOG_TRACE << "I am going to write more data";
}
} else {
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
} else {
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
|
思考题:为什么sockets::write只写一次?
这是因为当应用层发送缓冲区的数据没全部写到内核发送缓冲区中,就不会停止关注POLLOUT事件;所以当内核发送缓冲区有空间了,回调TcpConnection::handleWrite()函数,直至应用层发送缓冲区的数据全都写入到了内核发送缓冲区里。
muduo这种关闭连接的方式对对方也有要求,那就是对方 read()到0字节之后会主动关闭连接(无论 shutdownWrite()还是close()),一般的网络序都会这样不是什么问题。当然,这么做有一个潜在的安全漏洞,万一对方故意不关闭连接,那么muduo 的连接就一直半开着,消耗系统资源。必要时可以调用 TcpConnection::handleClose()来强行关闭连接,这需要将 handleClose() 改为 public 成员函数。
完整的流程见下图。我们发完了数据,于是 shutdownWrite,发送TCP FIN 分节,对方会读到0字节,然后对方通常会关闭连接。这样 muduo 会读到0字节,然后muduo 关闭连接。
思考题:在 shutdown()之后,muduo 回调 connection callback的时间间隔大约是一个round-trip time,为什么?
如果有必要,对方可以在 read() 返回 0之后继续发送数据,这是直接利用了half-close TCP连接。muduo 不会漏收这些数据。
那么muduo什么时候真正 close socket 呢?在 TcpConnection 对象析构的时候。TcpConnection持有一个Socket 对象,Socket 是一个RAIIhandler,它的析构函数会close(sockfd_)。这样,如果发生 TcpConnection 对象泄漏,那么我们从/proc/pid/fd/就能找到没有关闭的文件描述符,便于查错。
muduo 在 read() 返回0的时候会回调 connection callback,TcpServer 或 Tcp-client 把 TcpConnection 的引用计数减一。如果引用计数降到零,则表明用户代码也不持有 TcpConnection,它就会析构了。
5. Boost.Asio的聊天服务器
本节将介绍一个与 Boost.Asio 的示例代码中的聊天服务器功能类似的网络服务程序,包括客户端与服务端的 muduo 实现。这个例子的主要目的是介绍如何处理分包,并初步涉及muduo的多线程功能。
5.1 TCP分包
“五个简单TCP示例"中处理的协议没有涉及分包,在TCP这种字节流协议上做应用层分包是网络编程的基本需求。分包指的是在发生一个消息 (message或一帧 (frame) 数据时,通过一定的处理,让接收方能从字节流中识别并截取(还原)出一个个消息。“粘包问题"是个伪问题。
对于短连接的 TCP 服务,分包不是一个问题,只要发送方主动关闭连接,就表示一条消息发送完毕,接收方 read() 返回0从而知道消息的结尾。例如daytime 和 time 协议。
对于长连接的 TCP 服务,分包有四种方法:
- 消息长度固定,比如 muduo 的roundtrip 示例就采用了固定的 16 字节消息。
- 使用特殊的字符或字符串作为消息的边界,例如 HTTP 协议的 headers 以"r\n"为字段的分隔符。
- 在每条消息的头部加一个长度字段,这恐怕是最常见的做法,本文的聊天协议也采用这一办法。
- 利用消息本身的格式来分包,例如XML 格式的消息中 …的配对,或者JSON 格式中的{ … }的配对。解析这种消息格式通常会用到状态机 ( state machine )
在后文的代码讲解中还会仔细讨论用长度字段分包的常见陷阱。
聊天服务
本节实现的聊天服务非常简单,由服务端程序和客户端程序组成,协议如下:
- 服务端程序在某个端口侦听(listen)新的连接。
- 客户端向服务端发起连接。
- 连接建立之后,客户端随时准备接收服务端的消息并在屏幕上显示出来.
- 客户端接受键盘输入,以回车为界,把消息发送给服务端。
- 服务端接收到消息之后,依次发送给每个连接到它的客户端;原来发送消息的客户端进程也会收到这条消息。
- 一个服务端进程可以同时服务多个客户端进程。当有消息到达服务端后,每个客户端进程都会收到同一条消息,服务端广播发送消息的顺序是任意的,不一定哪个客户端会先收到这条消息。
- (可选)如果消息A 先于消息 B 到达服务端,那每个客户端都会先收到A再收到 B。
这实际上是一个简单的基于 TCP 的应用层广播协议,由服务端负责把消息发送给每个连接到它的客户端。参与"聊天"的既可以是人,也可以是程序。在后文,我将介绍一个稍微复杂一点的例子 hub,它有"聊天室"的功能,客户端可以注册特定的 topic(s),并往某个 topic 发送消息,这样代码更有意思。
我在"谈一谈网络编程学习经验”(附录A)中把聊天服务列为"最主要的三个例子"之一,其与前面的"五个简单 TCP 协议"不同,聊天服务的特点是"连接之间的数据有交流,从a 连接收到的数据要发给 b 连接。这样对连接管理提出了更高的要求:如何用一个程序同时处理多个连接?fork()-per-connection 似乎是不行的。如何防止串话?b 有可能随时断开连接,而新建立的连接 可能恰好复用了 b的文件描述符,那么 a 会不会错误地把消息发给 c?“muduo 的这个例子充分展示了解决以上问题的手法。
5.2 消息格式
本聊天服务的消息格式非常简单,“消息"本身是一个字符串,每条消息有一个4 字节的头部,以网络序存放字符串的长度。消息之间没有间隙,字符串也不要求以’结尾。比方说有两条消息"hello"和"chenshuo”,那么打包后的字节流共有21字节:
1
2
|
0x00, 0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o',
0x00, 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'
|
打包的代码 这段代码把 string message 打包为muduo::net::Buffer,并通过conn发送。
1
2
3
4
5
6
7
8
9
10
|
// codec.h
void send(muduo::net::TcpConnection *conn,
const muduo::StringPiece &message) {
muduo::net::Buffer buf;
buf.append(message.data(), message.size());
int32_t len = static_cast<int32_t>(message.size());
int32_t be32 = muduo::net::sockets::hostToNetwork32(len);
buf.prepend(&be32, sizeof be32);
conn->send(&buf);
}
|
muduo Buffer 有一个很好的功能,它在头部预留了8 个字节的空间,这样prepend()操作就不需要移动已有的数据,效率较高。
分包的代码 解析数据往往比生成数据更复杂,分包、打包也不例外。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// codec.h
void onMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buf, muduo::Timestamp receiveTime) {
while (buf->readableBytes() >= kHeaderLen) // kHeaderLen == 4
{
// FIXME: use Buffer::peekInt32()
const void *data = buf->peek();
int32_t be32 = *static_cast<const int32_t *>(data); // SIGBUS
const int32_t len = muduo::net::sockets::networkToHost32(be32);
if (len > 65536 || len < 0) {
LOG_ERROR << "Invalid length " << len;
conn->shutdown(); // FIXME: disable reading
break;
}
// 代表一组消息已到达
else if (buf->readableBytes() >= len + kHeaderLen) {
buf->retrieve(kHeaderLen); //移动4字节
muduo::string message(buf->peek(), len);
messageCallback_(conn, message, receiveTime); //处理读取的消息
buf->retrieve(len);
} else {
break;
}
}
}
|
onMessage()中构造完整的消息,通过 messageCallback_回调用户代码。有潜在的问题,在某些不支持非对齐内存访问的体系结构上会造成SIGBUScore dump,读取消息长度应该改用 Buffer::peekInt32()。上面这段代码用了 while 循环来反复读取数据,直到 Buffer 中的数据不够一条完整的消息。请读者思考,(如果换成 if (buf->readableBytes() >= kHeaderLen) 会有什么后果。
以前面提到的两条消息的字节流为例:
1
2
|
0x00, 0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o',
0x00, 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'
|
假设数据最终都全部到达,onMessage()至少要能正确处理以下各种数据到达的次序每种情况下messageCallback_都应该被调用两次:
- 每次收到一个字节的数据,onMessage() 被调用21次;
- 数据分两次到达,第一次收到 2个字节,不足消息的度字段;
- 数据分两次到达,第一次收到4 个字节,刚好够长度字段,但是没有 body;
- 数据分两次到达,第一次收到8 个字节,长度完整,但 body 不完整;
- 数据分两次到达,第一次收到9 个字节,长度完整,body 也完整;
- 数据分两次到达,第一次收到 10 个字节,第一条消息的长度完整、body 也完整,第二条消息长度不完整;
- 请自行移动和增加分点,验证各种情况:一共有超过 100 万种可能(2^21-1)。
- 数据一次就全部到达,这时必须用 while 循环来读出两条消息,否则消息会堆积在Buffer中。
请读者验证 onMessage() 是否做到了以上几点。这个例子充分说明了 nonblocking read 必须和input buffer 一起使用。而且在写 decoder 的时候一定要在收到完整的消息之后再 retrieve 整条消息,除非接收方使用复杂的状态机来解码。
5.3 编解码器
有人评论 muduo 的接收缓冲区不能设置回调函数的触发条件,确实如此。每当 socket 可读时,muduo 的TpConnection 会读取数据并存 input buffer,然后回调用户的函数。不过,一个简单的间接层就能解决问题,让用户代码只关心"消息到达"而不是"数据到达”,如本例中的 LengthHeaderCodec 所展示的那样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// codec.h
class LengthHeaderCodec : boost::noncopyable {
public:
typedef std::function<void(const muduo::net::TcpConnectionPtr &,
const muduo::string &message, muduo::Timestamp)>
StringMessageCallback;
explicit LengthHeaderCodec(const StringMessageCallback &cb)
: messageCallback_(cb) {}
void onMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buf, muduo::Timestamp receiveTime) {
while (buf->readableBytes() >= kHeaderLen) // kHeaderLen == 4
{
// FIXME: use Buffer::peekInt32()
const void *data = buf->peek();
int32_t be32 = *static_cast<const int32_t *>(data); // SIGBUS
const int32_t len = muduo::net::sockets::networkToHost32(be32);
if (len > 65536 || len < 0) {
LOG_ERROR << "Invalid length " << len;
conn->shutdown(); // FIXME: disable reading
break;
}
// 代表一组消息已到达
else if (buf->readableBytes() >= len + kHeaderLen) {
buf->retrieve(kHeaderLen); //移动4字节
muduo::string message(buf->peek(), len);
messageCallback_(conn, message, receiveTime); //处理读取的消息
buf->retrieve(len);
} else {
break;
}
}
}
// FIXME: TcpConnectionPtr
void send(muduo::net::TcpConnection *conn,
const muduo::StringPiece &message) {
muduo::net::Buffer buf;
buf.append(message.data(), message.size());
int32_t len = static_cast<int32_t>(message.size());
int32_t be32 = muduo::net::sockets::hostToNetwork32(len);
buf.prepend(&be32, sizeof be32);
conn->send(&buf);
}
private:
StringMessageCallback messageCallback_;
const static size_t kHeaderLen = sizeof(int32_t);
};
|
这段代码把以 Buffer* 为参数的 MessageCallback 转换成了以const string&为参数的 StringMessageCallback,让用户代码不必关心分包操作。如果编程语言相同,客户端和服务端可以(应该)共享同一个codec,这样既节省工作量,又避免因对协议理解不一致而导致的错误。
5.4 服务端的实现
聊天服务器的服务端代码小于 100 行,不到 asio 的一半。
除了经常见到的 EventLoop 和TcpServer,ChatServer 还定义了 codec_和 connections_作为成员,后者存放目前已建立的客户连接。在收到消息之后,服务器会遍历整个容器,把消息广播给其中的每一个TCP连接(onStringMessage())。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
class ChatServer : boost::noncopyable {
public:
ChatServer(EventLoop *loop, const InetAddress &listenAddr)
: server_(loop, listenAddr, "ChatServer"),
codec_(std::bind(&ChatServer::onStringMessage, this, _1, _2, _3)) {
server_.setConnectionCallback(
std::bind(&ChatServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
}
void start() { server_.start(); }
private:
void onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected()) {
connections_.insert(conn);
} else {
connections_.erase(conn);
}
}
void onStringMessage(const TcpConnectionPtr &, const string &message,
Timestamp) {
for (ConnectionList::iterator it = connections_.begin();
it != connections_.end(); ++it) {
codec_.send(get_pointer(*it), message);
}
}
typedef std::set<TcpConnectionPtr> ConnectionList;
TcpServer server_;
LengthHeaderCodec codec_;
ConnectionList connections_;
};
|
首先,在构造函数里注册回调;
这里有儿点值得注意,在以往的代码里是直接把本 class 的 onMessage() 注册给 server_; 这里我们把 LengthHeaderCodec::onMessage()注册给 server_,然后向codec_注册了 ChatServer::onStringMessage(),等于说让 codec_负责解析消息,然后把完整的消息回调给 chatServer。这正是我前面提到的"一个简单的间接层”,在不增加 muduo 库的复杂度的前提下,提供了足够的灵活性让我们在用户代码里完成
需要的工作。
另外,server_.start() 绝对不能在构造函数里调用,这么做将来会有线程安全的问题。
以下是处理连接的建立和断开的代码,注意它把新建的连接加人到 connections_容器中,把已断开的连接从容器中删除。这么做是为了避免内存和资源泄漏,TcpConnectionPtr是boost::shared_ptr,是muduo 里唯一一个默认采用 shared_ptr 来管理生命期的对象。
1
2
3
4
5
6
7
8
9
10
11
|
void onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected()) {
connections_.insert(conn);
} else {
connections_.erase(conn);
}
}
|
以下是服务端处理消息的代码,它遍历整个 connections_容器,把消息打包发送给各个客户连接。
1
2
3
4
5
6
7
|
void onStringMessage(const TcpConnectionPtr &, const string &message,
Timestamp) {
for (ConnectionList::iterator it = connections_.begin();
it != connections_.end(); ++it) {
codec_.send(get_pointer(*it), message);
}
}
|
5.5 客户端的实现
我有时觉得服务端的程序常常比客户端的更容易写,聊天服务器再次验证了我的看法。客户端的复杂性来自于它要读取键盘输人,而 EventLoop 是独占线程的,所以我用了两个线程:main()函数所在的线负责读键盘,另外用一个EventLoopThread来处理网络IO。
现在来看代码,首先,在构造函数里注册回调,并使用了跟前面一样的 LengthHeaderCodec 作为中间层,负责打包、分包。
write()会由 main 线程调用,所以要加锁,这个锁不是为了保护TcpConnection而是为了保护shared_ptr。
1
2
3
4
5
6
|
void write(const StringPiece &message) {
MutexLockGuard lock(mutex_);
if (connection_) {
codec_.send(get_pointer(connection_), message);
}
}
|
onConnection()会由 EventLoop 线程调用,所以要加锁以保护 shared_ptr。
1
2
3
4
5
6
7
8
9
10
11
12
|
void onConnection(const TcpConnectionPtr &conn) {
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
MutexLockGuard lock(mutex_);
if (conn->connected()) {
connection_ = conn;
} else {
connection_.reset();
}
}
|
把收到的消息打印到屏幕,这个函数由 EventLoop 线程调用,但是不用加锁,因为 printf() 是线程安全的。注意这里不能用 std::cout«,它不是线程安全的。
1
2
3
4
|
void onStringMessage(const TcpConnectionPtr &, const string &message,
Timestamp) {
printf("<<< %s\n", message.c_str());
}
|
main()函数里除了例行公事,还要启动 EventLoop 线程和读取键盘输人。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
int main(int argc, char *argv[]) {
LOG_INFO << "pid = " << getpid();
if (argc > 2) {
EventLoopThread loopThread;
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress serverAddr(argv[1], port);
ChatClient client(loopThread.startLoop(), serverAddr);
client.connect();
std::string line;
while (std::getline(std::cin, line)) {
client.write(line); // 发送数据行
}
client.disconnect();
CurrentThread::sleepUsec(
1000 * 1000); // wait for disconnect, see ace/logging/client.cc
} else {
printf("Usage: %s host_ip port\n", argv[0]);
}
}
|
Chatclient 使用 EventLoopThread 的 EventLoop,而不是通常的主线程的EventLoop。
简单测试
打开三个命令行窗口,在第一个窗口运行:
1
|
$ ./asio_chat_server 3000
|
在第二个窗口运行:
1
|
$ ./asio_chat_client 127.0.0.1 3000
|
在第三个窗口运行同样的命令:
1
|
$ ./asio_chat_client 127.0.0.1 3000
|
这样就有两个客户端进程参与聊天。在第二个窗口里输人一些字符并回车,字符会出现在本窗口和第三个窗口中。
代码示例中还有另外三个 server 程序,都是多线程的,详细介绍在 p.260。
- server_threaded.cc 使用多线程 TcpServer,并用mutex 来保护共享数据。
- server_threaded_eficient.cc 对共享数据以"借 shared_ptr 实现 copy-on-write"的手法来降低锁竞争。
- server_threaded_highperformance.cc 采用thread local变量,实现多线程高效转发,这个例子值得仔细阅读理解。