muduo网络库的使用

1. echo服务的实现

muduo的使用非常简单,不需要从指定的类派生,也不用覆写虚函数,只需要注册几个回调函数去处理"三个半的事件"就行了。

以经典的echo回显服务为例,定义EchoServer class,不需要派生自任何基类。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#ifndef MUDUO_EXAMPLES_SIMPLE_ECHO_ECHO_H
#define MUDUO_EXAMPLES_SIMPLE_ECHO_ECHO_H

#include "TcpServer.h"

// RFC 862
class EchoServer {
public:
  EchoServer(muduo::net::EventLoop *loop,
             const muduo::net::InetAddress &listenAddr);

  void start(); // calls server_.start();

private:
  void onConnection(const muduo::net::TcpConnectionPtr &conn);

  void onMessage(const muduo::net::TcpConnectionPtr &conn,
                 muduo::net::Buffer *buf, muduo::Timestamp time);

  muduo::net::TcpServer server_;
};

#endif // MUDUO_EXAMPLES_SIMPLE_ECHO_ECHO_H
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include "echo.h"

#include "Logging.h"

using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;

// using namespace muduo;
// using namespace muduo::net;

EchoServer::EchoServer(muduo::net::EventLoop *loop,
                       const muduo::net::InetAddress &listenAddr)
    : server_(loop, listenAddr, "EchoServer") {
  server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));
  server_.setMessageCallback(
      std::bind(&EchoServer::onMessage, this, _1, _2, _3));
}

void EchoServer::start() { server_.start(); }

void EchoServer::onConnection(const muduo::net::TcpConnectionPtr &conn) {
  LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
}

void EchoServer::onMessage(const muduo::net::TcpConnectionPtr &conn,
                           muduo::net::Buffer *buf, muduo::Timestamp time) {
  muduo::string msg(buf->retrieveAllAsString());
  LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
           << "data received at " << time.toFormattedString();
  conn->send(msg);
}

首先EchoServer会在构造函数内注册回调函数:

1
2
3
server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
    std::bind(&EchoServer::onMessage, this, _1, _2, _3));

EchoServer::onMessage函数是echo服务的"业务逻辑":把收到的数据原封不动地发回客户端。注意我们不用担心conn->send(msg)是否完整地发送了数据,因为muduo网络库会帮我们管理发送缓冲区。

EchoServer::onConnection和EchoServer::onMessage这两个函数体现了"基于事件编程"的经典做法,即程序主体是被动等待事件发生,事件发生之后网络库会调用(回调的方式)事先注册的事件处理函数(event handler)。

在EchoServer::onConnection函数中,conn参数是TcpConnection对象的shared_ptr,TcpConnection::connected函数返回一个bool值,表明目前连接是建立还是断开,TcpConnection 的 peerAddress() 和 localAddress()成员函数分别返回对方和本地的地址(以InetAddress 对象表示的IP和port)。

在onMessage()函数中,conn 参数是收到数据的那个TCP 连接;buf 是已经收到的数据,buf 的数据会累积,直到用户从中取走 (retrieve )数。注意 buf是指针,表明用户代码可以修改 (消费) buffer; time 是收到数据的确切时间,即epoll_wait(2)返回的时间,注意这个时间通常比 read(2)发生的时间略早,可以用于正确测量程序的消息处理延迟。另外,Timestamp 对象采用 pass-by-value,而不是pass-by-(const)reference,这是有意的,因为在 x86-64 上可以直接通过寄存器传参

在main() 里用 EventLoop 让整个程序跑起来:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#include "echo.h"

#include "EventLoop.h"
#include "Logging.h"

#include <unistd.h>

// using namespace muduo;
// using namespace muduo::net;

int main() {
  LOG_INFO << "pid = " << getpid();
  muduo::net::EventLoop loop;
  muduo::net::InetAddress listenAddr(2007);
  EchoServer server(&loop, listenAddr);
  server.start();
  loop.loop();
}

2. 七步实现finger服务

2.1 拒绝连接

什么都不做,程序空等。

1
2
3
4
5
6
7
8
9
#include "EventLoop.h"

using namespace muduo;
using namespace muduo::net;

int main() {
  EventLoop loop;
  loop.loop();
}

2.2 接受新连接

在1079端口侦听新连接,接受连接之后什么都不做,程序空muduo会自动丢弃收到的数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#include "EventLoop.h"
#include "TcpServer.h"

using namespace muduo;
using namespace muduo::net;

int main() {
  EventLoop loop;
  TcpServer server(&loop, InetAddress(1079), "Finger");
  server.start();
  loop.loop();
}

2.3 主动断开连接

接受新连接之后主动断开。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#include "EventLoop.h"
#include "TcpServer.h"

using namespace muduo;
using namespace muduo::net;

void onConnection(const TcpConnectionPtr &conn) {
  if (conn->connected()) {
    conn->shutdown();
  }
}

int main() {
  EventLoop loop;
  TcpServer server(&loop, InetAddress(1079), "Finger");
  server.setConnectionCallback(onConnection);
  server.start();
  loop.loop();
}

2.4 读取用户名,然后断开连接

如果读到一行以\r\n 结尾的消息,就开连接。注意这段代码有安全问题,如果恶意客户端不断发送数据而不换行,会撑爆服务端的内存。另外,Buffer::findCRLF()是线性查找,如果客户端每次发一个字节,服务端的时间复杂度为 O(N2),会消耗 CPU资源。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
#include "EventLoop.h"
#include "TcpServer.h"

using namespace muduo;
using namespace muduo::net;

void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
               Timestamp receiveTime) {
  if (buf->findCRLF()) {
    conn->shutdown();
  }
}

int main() {
  EventLoop loop;
  TcpServer server(&loop, InetAddress(1079), "Finger");
  server.setMessageCallback(onMessage);
  server.start();
  loop.loop();
}

2.5 读取用户名、输出错误信息,然后断开连接

如果读到一行以\r\n 结尾的消息,就发送一条出错信息,然后断开连接。安全问题同上。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#include "EventLoop.h"
#include "TcpServer.h"

using namespace muduo;
using namespace muduo::net;

void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
               Timestamp receiveTime) {
  if (buf->findCRLF()) {
    conn->send("No such user\r\n");
    conn->shutdown();
  }
}

int main() {
  EventLoop loop;
  TcpServer server(&loop, InetAddress(1079), "Finger");
  server.setMessageCallback(onMessage);
  server.start();
  loop.loop();
}

2.6 从空的UserMap 里查找用户

从一行消息中拿到用户名(user)在 UserMap里查找,然后返回结果。安全问题同上。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include "EventLoop.h"
#include "TcpServer.h"

#include <map>

using namespace muduo;
using namespace muduo::net;

typedef std::map<string, string> UserMap;
UserMap users;

string getUser(const string &user) {
  string result = "No such user";
  UserMap::iterator it = users.find(user);
  if (it != users.end()) {
    result = it->second;
  }
  return result;
}

void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
               Timestamp receiveTime) {
  const char *crlf = buf->findCRLF();
  if (crlf) {
    string user(buf->peek(), crlf);
    conn->send(getUser(user) + "\r\n");
    buf->retrieveUntil(crlf + 2);
    conn->shutdown();
  }
}

int main() {
  EventLoop loop;
  TcpServer server(&loop, InetAddress(1079), "Finger");
  server.setMessageCallback(onMessage);
  server.start();
  loop.loop();
}

2.7 往UserMap里添加一个用户

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include "EventLoop.h"
#include "TcpServer.h"

#include <map>

using namespace muduo;
using namespace muduo::net;

typedef std::map<string, string> UserMap;
UserMap users;

string getUser(const string &user) {
  string result = "No such user";
  UserMap::iterator it = users.find(user);
  if (it != users.end()) {
    result = it->second;
  }
  return result;
}

void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
               Timestamp receiveTime) {
  const char *crlf = buf->findCRLF();
  if (crlf) {
    string user(buf->peek(), crlf);
    conn->send(getUser(user) + "\r\n");
    buf->retrieveUntil(crlf + 2);
    conn->shutdown();
  }
}

int main() {
  users.insert({"cjt", "super vip"});

  EventLoop loop;
  TcpServer server(&loop, InetAddress(1079), "Finger");
  server.setMessageCallback(onMessage);
  server.start();
  loop.loop();
}

3. 五个简单TCP示例

本节将介绍五个简单TCP 网络服务程序,包括 echo(RFC 862)、discard (RFC863)、chargen (RFC 864)、daytime (RFC 867)、time (RFC 868)这五个协议,以及 time 协议的客户端。各程序的协议简介如下:

  • discard:丢弃所有收到的数据。
  • daytime:服务端accept 连接之后,以字符串形式发送当前时间,然后主动断开连接。
  • time:服务端accept 连接之后,以二进制形式发送当前时间 (从Epoch 到现在的秒数),然后主动断开连接;我们需要一个客户程序来把收到的时间转换为字符串。
  • echo:回显服务,把收到的数据发回客户端。
  • chargen:服务端accept 连接之后,不停地发送测试数据。

以上五个协议使用不同的端口,可以放到同一个进程中实现,且不必使用多线程。

3.1 discard

discard 恐怕算是最简单的长连接 TCP 应用层协议,它只需要关注"三个半事件"中的"消息/数据到达"事件,事件处理函数如下:

1
2
3
4
5
6
void DiscardServer::onMessage(const TcpConnectionPtr &conn, Buffer *buf,
                              Timestamp time) {
  string msg(buf->retrieveAllAsString());
  LOG_INFO << conn->name() << " discards " << msg.size()
           << " bytes received at " << time.toString();
}

3.2 daytime

daytime 是短连接协议,在发送完当前时间后,由服务端主动断开连接。它只需要关注"三个半事件"中的"连接已建立"事件,事件处理函数如下:

1
2
3
4
5
6
7
8
9
void DaytimeServer::onConnection(const TcpConnectionPtr &conn) {
  LOG_INFO << "DaytimeServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
  if (conn->connected()) {
    conn->send(Timestamp::now().toFormattedString() + "\n");
    conn->shutdown();
  }
}

如果连接已建立,发送时间字符串,主动断开连接。

用netcat扮演客户端,运行结果如下:

1
2
$ nc 127.0.0.1 2013
20221224 15:43:38.226748    # 服务器返回的事件字符串,UTC 时区

3.3 time

time 协议与 daytime极为类似,只不过它返回的不是日期时间字符串,而是一个32-bit 整数,表示从1970-01-01 00:00:00Z 到现在的秒数。当然,这个协议有"2038年问题"。服务端只需要关注"三个半事件"中的"连接已建立"事件,事件处理函数如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void TimeServer::onConnection(const muduo::net::TcpConnectionPtr &conn) {
  LOG_INFO << "TimeServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
  if (conn->connected()) {
    time_t now = ::time(NULL);
    int32_t be32 = sockets::hostToNetwork32(static_cast<int32_t>(now));
    conn->send(&be32, sizeof be32);
    conn->shutdown();
  }
}

当连接已建立,取当前时间并转换为网络字节序(Big Endian),发送32-bit整数后主动断开连接。

安装hexdump:

1
2
3
4
5
# Step 1: Update system
sudo apt-get update

# Step 2: Install: libdata-hexdump-perl Architecture: all Version: 0.02-1
sudo apt-get install libdata-hexdump-perl

可以使用netcat 扮演客户端,并用hexdump来打印二进制数据:

1
2
# 暂不清楚什么原因,没效果
nc 127.0.0.1 2037 | hexdump -C

time客户端:

因为 time 服务端发送的是二进制数据,不便直接阅读,我们编写一个客户端来解析并打印收到的4个字节数据。这个程序只需要关注"三个半事件"中的"消息/数据到达"事件,事件处理函数如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
                Timestamp receiveTime) {
  if (buf->readableBytes() >= sizeof(int32_t)) {
    const void *data = buf->peek();
    int32_t be32 = *static_cast<const int32_t *>(data);
    buf->retrieve(sizeof(int32_t));
    time_t time = sockets::networkToHost32(be32);
    Timestamp ts(implicit_cast<uint64_t>(time) *
                  Timestamp::kMicroSecondsPerSecond);
    LOG_INFO << "Server time = " << time << ", " << ts.toFormattedString();
  } else {
    LOG_INFO << conn->name() << " no enough data " << buf->readableBytes()
              << " at " << receiveTime.toFormattedString();
  }
}

注意其中考虑到了如果数据没有一次性收全,已经收到的数据会累积在 Buffer里(在else 分支里没有调用 Buffer::retrieve* 系列函数),以等待后续数据到达程序也不会阻寨。这样即便服务器一个字节一个字节地发送数据,代码还是能正常工作,这也是非阻塞网络编程必须在用户态使用接收缓冲的主要原因。

完整代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class TimeClient : boost::noncopyable {
public:
  TimeClient(EventLoop *loop, const InetAddress &serverAddr)
      : loop_(loop), client_(loop, serverAddr, "TimeClient") {
    client_.setConnectionCallback(
        std::bind(&TimeClient::onConnection, this, _1));
    client_.setMessageCallback(
        std::bind(&TimeClient::onMessage, this, _1, _2, _3));
    // client_.enableRetry();
  }

  void connect() { client_.connect(); }

private:
  EventLoop *loop_;
  TcpClient client_;

  void onConnection(const TcpConnectionPtr &conn) {
    LOG_INFO << conn->localAddress().toIpPort() << " -> "
             << conn->peerAddress().toIpPort() << " is "
             << (conn->connected() ? "UP" : "DOWN");

    if (!conn->connected()) {
      loop_->quit();
    }
  }

  void onMessage(const TcpConnectionPtr &conn, Buffer *buf,
                 Timestamp receiveTime) {
    if (buf->readableBytes() >= sizeof(int32_t)) {
      const void *data = buf->peek();
      int32_t be32 = *static_cast<const int32_t *>(data);
      buf->retrieve(sizeof(int32_t));
      time_t time = sockets::networkToHost32(be32);
      Timestamp ts(implicit_cast<uint64_t>(time) *
                   Timestamp::kMicroSecondsPerSecond);
      LOG_INFO << "Server time = " << time << ", " << ts.toFormattedString();
    } else {
      LOG_INFO << conn->name() << " no enough data " << buf->readableBytes()
               << " at " << receiveTime.toFormattedString();
    }
  }
};

int main(int argc, char *argv[]) {
  LOG_INFO << "pid = " << getpid();
  if (argc > 1) {
    EventLoop loop;
    InetAddress serverAddr(argv[1], 2037);

    TimeClient timeClient(&loop, serverAddr);
    timeClient.connect();
    loop.loop();
  } else {
    printf("Usage: %s host_ip\n", argv[0]);
  }
}

如果连接断开了,就退出事件循环,程序也就结束了。

注意 TcpConnection 对象表示"一次"TCP 连接,连接断开之后不能重建Tcpclient 重试之后新建的连接会是另一个 TcpConnection 对象。

3.4 echo

前面几个协议都是单向接收或发送数据,echo 是我们遇到的第一个双向的协议:服务端把客户端发过来的数据原封不动地传回去。它只需要关注"三个半事件"中的"消息/数据到达"事件,事件处理函数已在前面列出,这里复制一遍。

1
2
3
4
5
6
7
void EchoServer::onMessage(const muduo::net::TcpConnectionPtr &conn,
                           muduo::net::Buffer *buf, muduo::Timestamp time) {
  muduo::string msg(buf->retrieveAllAsString());
  LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
           << "data received at " << time.toFormattedString();
  conn->send(msg);
}

这段代码实现的不是行回显 (line echo)服务,而是有一点数据就发送一点数据。这样可以避免客户端恶意地不发送换行字符,而服务端又必须缓存已经收到的数据,导致服务器内存暴涨。但这个程序还是有一个安全漏洞,即如果客户端故意不断发送数据,但从不接收,那么服务端的发送缓冲区会一直堆积,导致内存暴涨。解决办法可以参考下面的 chargen 协议,或者在发送缓冲区累积到一定大小时主动断开连接。一般来说,非阻塞网络编程中正确处理数据发送比接收数据要困难,因为要应对对方接收缓慢的情况。

3.5 chargen

Chargen 协议很特殊,它只发送数据,不接收数据。而且,它发送数据的速度不能快过客户端接收的速度,因此需要关注"三个半事件"中的半个"消息/数据发送完毕"事件(onWriteComplete),事件处理函数如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
void ChargenServer::onConnection(const TcpConnectionPtr &conn) {
  LOG_INFO << "ChargenServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
  if (conn->connected()) {
    conn->setTcpNoDelay(true);
    conn->send(message_);
  }
}

void ChargenServer::onMessage(const TcpConnectionPtr &conn, Buffer *buf,
                              Timestamp time) {
  string msg(buf->retrieveAllAsString());
  LOG_INFO << conn->name() << " discards " << msg.size()
           << " bytes received at " << time.toString();
}

void ChargenServer::onWriteComplete(const TcpConnectionPtr &conn) {
  transferred_ += message_.size();
  conn->send(message_);
}

在连接建立时发送一次数据,在数据发送完成事件触发时再发送一次数据。

3.6 五合一

前面五个程序都用到了 EventLoop。这其实是个 Reactor,用于注册和分发IO事件。muduo 遵循 one loop per thread 模型,多个服务端(TcpServer)和客户端(Tcpclient)可以共享同一个EventLoop,也可以分配到多个EventLoop 上以发挥多核多线程的好处。这里我们把五个服务端用同一个EventLoop 跑起来,程序还是单线程的,功能却强大了很多:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
int main() {
  LOG_INFO << "pid = " << getpid();
  EventLoop loop; // one loop shared by multiple servers

  ChargenServer chargenServer(&loop, InetAddress(2019));
  chargenServer.start();

  DaytimeServer daytimeServer(&loop, InetAddress(2013));
  daytimeServer.start();

  DiscardServer discardServer(&loop, InetAddress(2009));
  discardServer.start();

  EchoServer echoServer(&loop, InetAddress(2007));
  echoServer.start();

  TimeServer timeServer(&loop, InetAddress(2037));
  timeServer.start();

  loop.loop();
}

这个例子充分展示了 Reactor 模式复用线程的能力,让一个单线程程序同时具备多个网络服务功能。一个容易想到的例子是 httpd 同时侦听 80 端口和43 端口,另一个例子是程序中有多个 Tcpclient,分别和数据库、Redis、Sudoku Solver 等后台服务打交道。对于初次接触这种编程模型的读者,值得跟踪代码运行的详细过程,弄清楚每个事件每个回调发生的时机与条件。

以上几个协议的消息格式都非常简单,没有涉及 TCP 网络编程中常见的分包处理,在后文 Boost.Asio 的聊天服务器时我们再来讨论这个问题。

4. 文件传输

本节用发送文件的例子来说明 TcpConnection::send()的使用。到目前为止我们用到了 TcpConnection::send() 的两个重载,分别是 send(const string&)和send(const void* message, size_t len)。

TcpConnection 目前提供了三个 send() 重载函数,原型如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
///
/// TCP connection, for both client and server usage.
///
class TcpConnection : boost::noncopyable,
                      public boost::enable_shared_from_this<TcpConnection> {
public:
  void send(const void *message, size_t len);
  void send(const StringPiece &message);
  void send(Buffer *message); // this one will swap data without copying
  // void send(string&& message); // C++11
  // void send(Buffer&& message); // C++11
}

在非阻塞网络编程中,发送消息通常是由网络库完成的,用户代码不会直接调用write(2)或 send(2) 等系统调用。原因是"Tcponnection 必须要有 outputbuffer"。在使用 TpConnection::send() 时值得注意的有点:

  • send() 的返回类型是 void,意味着用户不必关心调用 send() 时成功发送了多少字节,muduo 库会保证把数据发送给对方。
  • send() 是非阻塞的。意味着客户代码只管把一条消息准备好,调用 send()来发送,即便 TCP 的发送窗口满了,也绝对不会阻塞当前调用线程。
  • send() 是线程安全、原子的。多个线程可以同时调用 send(),消息之间不会混叠或交织。但是多个线程同时发送的消息的先后顺序是不确定的,muduo只能保证每个消息本身的完整性。另外,send()在多线程下仍然是非阻塞的。
  • send(const void* message,size_t en)这个重载最平淡无奇,可以发送任意字节序列。
  • send(const StringPiece& message)这个重载可以发送std::string利constchar*,其中StringPiece是Google 发明的专门用于传递字符串参数的 class,这样程序里就不必为 const char* 和 const std::string& 提供两份重载了。
  • send(Buffer*) 有点特殊,它以指针为参数,而不是常见的 const 引用,因为函数中可能用 Buffer::swap()来高效地交换数据,避免内存拷贝,起到类似C++ 右值引用的效果。
  • 如果将来支持 C++11,那么可以增加对右值引用的重载,这样可以用 move 语义来避免内存拷贝。

下面我们来实现一个发送文件的命令行小工具,这个工具的协议很简单,在启动时通过命令行参数指定要发送的文件,然后在 2021 端口侦听,每当有新连接进来就把文件内容完整地发送给对方。

如果不考虑并发,那么这个功能用 netcat 加重定向就能实现。这里展示的版本更加健壮,比方说发送 100MB 的文件,支持上万个并发客户连接;内存消耗只与并发连接数有关,跟文件大小无关;任何连接可以在任何时候断开,程序不会有内存泄漏或崩溃。

一共写了三个版本:

  1. 一次性把文件读入内存,一次性调用 send(const string&)发送完毕。这个版本满足除了"内存消耗只与并发连接数有关,跟文件大小无关"之外的健壮性要求。
  2. 一块一块地发送文件,减少内存使用,用到了 writeCompleteCallback。这个版本满足了上述全部健壮性要求。
  3. 同2,但是采用 shared_ptr 来管理 FILE*,避免手动调用 ::fclose(3)。

4.1 版本1

在建立好连接之后,把文件的全部内容读入一个 string,一次性调用 TcpConnection::send() 发送。不用担心文件发送不完整。也不用担心 send() 之后立刻shutdown()会有什么问题,见下一节的说明。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
const char *g_file = NULL;

// FIXME: use FileUtil::readFile()
string readFile(const char *filename) {
  string content;
  FILE *fp = ::fopen(filename, "rb");
  if (fp) {
    // inefficient!!!
    const int kBufSize = 1024 * 1024;
    char iobuf[kBufSize];
    ::setbuffer(fp, iobuf, sizeof iobuf);

    char buf[kBufSize];
    size_t nread = 0;
    while ((nread = ::fread(buf, 1, sizeof buf, fp)) > 0) {
      content.append(buf, nread);
    }
    ::fclose(fp);
  }
  return content;
}

void onHighWaterMark(const TcpConnectionPtr &conn, size_t len) {
  LOG_INFO << "HighWaterMark " << len;
}

void onConnection(const TcpConnectionPtr &conn) {
  LOG_INFO << "FileServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
  if (conn->connected()) {
    LOG_INFO << "FileServer - Sending file " << g_file << " to "
             << conn->peerAddress().toIpPort();
    conn->setHighWaterMarkCallback(onHighWaterMark, 64 * 1024);
    string fileContent = readFile(g_file);
    conn->send(fileContent);
    conn->shutdown();
    LOG_INFO << "FileServer - done";
  }
}

int main(int argc, char *argv[]) {
  LOG_INFO << "pid = " << getpid();
  if (argc > 1) {
    g_file = argv[1];

    EventLoop loop;
    InetAddress listenAddr(2021);
    TcpServer server(&loop, listenAddr, "FileServer");
    server.setConnectionCallback(onConnection);
    server.start();
    loop.loop();
  } else {
    fprintf(stderr, "Usage: %s file_for_downloading\n", argv[0]);
  }
}

注意每次建立连接的时候我们都去重新读一遍文件,这是考虑到文件有可能被其他程序修改。如果文件是 immutable 的,整个程序就可以共享同一个 fileContent对象。

这个版本有一个明显的缺陷,即内存消耗与(并发连接数 文件大小)成正比文件越大内存消耗越多,如果文件大小上 GB,那几乎就是灾难了。只需要建立少量并发连接就能把服务器的内存耗尽,因此我们有了版本2。

4.2 版本2

为了解决版本一占用内存过多的问题,我们采用流水线的思路,当新建连接时,先发送文件的前 64KiB 数据,等这块数据发送完毕时再继续发送下64KiB 数据,如此往复直到文件内容全部发送完毕。代码中使用了 TcpConnection::setContext()和getContext()来保存 Tcponnection 的用户上下文(这里是 FILE*),因此不必使用额外的std::map<TcponnectionPtr,FILE*>来记住每个连接的当前文件位置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void onHighWaterMark(const TcpConnectionPtr &conn, size_t len) {
  LOG_INFO << "HighWaterMark " << len;
}

const int kBufSize = 64 * 1024;
const char *g_file = NULL;

void onConnection(const TcpConnectionPtr &conn) {
  LOG_INFO << "FileServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
  if (conn->connected()) {
    LOG_INFO << "FileServer - Sending file " << g_file << " to "
             << conn->peerAddress().toIpPort();
    conn->setHighWaterMarkCallback(onHighWaterMark, kBufSize + 1);

    FILE *fp = ::fopen(g_file, "rb");
    if (fp) {
      conn->setContext(fp);
      char buf[kBufSize];
      size_t nread = ::fread(buf, 1, sizeof buf, fp);
      conn->send(buf, static_cast<int>(nread));
    } else {
      conn->shutdown();
      LOG_INFO << "FileServer - no such file";
    }
  } else {
    if (!conn->getContext().empty()) {
      FILE *fp = boost::any_cast<FILE *>(conn->getContext());
      if (fp) {
        ::fclose(fp);
      }
    }
  }
}

void onWriteComplete(const TcpConnectionPtr &conn) {
  FILE *fp = boost::any_cast<FILE *>(conn->getContext());
  char buf[kBufSize];
  size_t nread = ::fread(buf, 1, sizeof buf, fp);
  if (nread > 0) {
    // 会触发onWriteComplete回调函数
    conn->send(buf, static_cast<int>(nread));
  } else {
    ::fclose(fp);
    fp = NULL;
    conn->setContext(fp);
    conn->shutdown();
    LOG_INFO << "FileServer - done";
  }
}

int main(int argc, char *argv[]) {
  LOG_INFO << "pid = " << getpid();
  if (argc > 1) {
    g_file = argv[1];

    EventLoop loop;
    InetAddress listenAddr(2021);
    TcpServer server(&loop, listenAddr, "FileServer");
    server.setConnectionCallback(onConnection);
    server.setWriteCompleteCallback(onWriteComplete);
    server.start();
    loop.loop();
  } else {
    fprintf(stderr, "Usage: %s file_for_downloading\n", argv[0]);
  }
}

在onWriteComplete()回调函数中读取下一块文件数据,继续发送。

注意每次建立连接的时候我们都去重新打开那个文件,使得程序中文件描述符的数量翻倍(每个连接占一个 socket fd 和一个 file fd ),这是考到文件有可能被其他程序修改。如果文件是immutable 的,一种改进措施是:整个程序可以共享同一个文件描述符,然后每个连接记住自已当前的偏移量,在onWriteComplete() 回调函数里用 pread(2)来读取数据。

这个版本也存在一个问题,如果客户端故意只发起连接,不接收数据,那么要么把服务器进程的文件描述符耗尽,要么占用很多服务端内存(因为每个连接有 64KiB的发送缓冲区)。解决办法可参考后文"限制服器的最并发连接数"和"用timing wheel踢掉空闲连接"。必须说明的是,muduo并不是设计来编写面向公网的网络服务程序,这种服务程序需要在安全性方面下很多工夫,我个人对此不在行,我更关心实现内网(不一定是局域网 )的高效服务程序。

4.3 版本3

用 shared_ptr的custom deleter 来减轻资源管理负担,使得 FILE* 的生命期和TcpConnection 一样长,代码也更简单了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void onHighWaterMark(const TcpConnectionPtr &conn, size_t len) {
  LOG_INFO << "HighWaterMark " << len;
}

const int kBufSize = 64 * 1024;
const char *g_file = NULL;
typedef std::shared_ptr<FILE> FilePtr;

void onConnection(const TcpConnectionPtr &conn) {
  LOG_INFO << "FileServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
  if (conn->connected()) {
    LOG_INFO << "FileServer - Sending file " << g_file << " to "
             << conn->peerAddress().toIpPort();
    conn->setHighWaterMarkCallback(onHighWaterMark, kBufSize + 1);

    FILE *fp = ::fopen(g_file, "rb");
    if (fp) {
      FilePtr ctx(fp, ::fclose);
      conn->setContext(ctx);
      char buf[kBufSize];
      size_t nread = ::fread(buf, 1, sizeof buf, fp);
      conn->send(buf, static_cast<int>(nread));
    } else {
      conn->shutdown();
      LOG_INFO << "FileServer - no such file";
    }
  }
}

void onWriteComplete(const TcpConnectionPtr &conn) {
  const FilePtr &fp = boost::any_cast<const FilePtr &>(conn->getContext());
  char buf[kBufSize];
  size_t nread = ::fread(buf, 1, sizeof buf, boost::get_pointer(fp));
  if (nread > 0) {
    conn->send(buf, static_cast<int>(nread));
  } else {
    conn->shutdown();
    LOG_INFO << "FileServer - done";
  }
}

int main(int argc, char *argv[]) {
  LOG_INFO << "pid = " << getpid();
  if (argc > 1) {
    g_file = argv[1];

    EventLoop loop;
    InetAddress listenAddr(2021);
    TcpServer server(&loop, listenAddr, "FileServer");
    server.setConnectionCallback(onConnection);
    server.setWriteCompleteCallback(onWriteComplete);
    server.start();
    loop.loop();
  } else {
    fprintf(stderr, "Usage: %s file_for_downloading\n", argv[0]);
  }
}

以上代码体现了现代 C++ 的资源管理思路,即无须手动释放资源,而是通过将资源与对象生命期绑定,在对象析构的时候自动释放资源,从而把资源管理转换为对象生命期管理,而后者是早已解决了的问题。这正是 C++ 最重要的编程技法:RAII。

4.4 为什么TcpConnection::shutdown()没有直接关闭TCP连接?

“在simple的daytime示例中,服务端主动关闭时调用的是如下函数序列,这不是只是关闭了连接上的写操作吗,怎么是关闭了整个连接?”

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void DaytimeServer::onConnection(const TcpConnectionPtr &conn) {
  if (conn->connected()) {
    conn->send(Timestamp::now().toFormattedString() + "\n");
    conn->shutdown();
  }
}

void TcpConnection::shutdown() {
  // FIXME: use compare and swap
  if (state_ == kConnected) {
    setState(kDisconnecting);
    // FIXME: shared_from_this()?
    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
  }
}

void TcpConnection::shutdownInLoop() {
  loop_->assertInLoopThread();

  // 如果当前没有发送数据
  if (!channel_->isWriting()) {
    // we are not writing
    socket_->shutdownWrite();
  }
}

void Socket::shutdownWrite() { sockets::shutdownWrite(sockfd_); }

// 只关闭写的这一半,进入半关闭状态(close SHUT_WR)
void sockets::shutdownWrite(int sockfd) {
  if (::shutdown(sockfd, SHUT_WR) < 0) {
    LOG_SYSERR << "sockets::shutdownWrite";
  }
}

muduo TcpConnection 没有提供 close(),而只提供 shutdown(),这么做是为了收发数据的完整性

TCP 是一个全双工协议,同一个文件描述符既可读又可写,shutdownWrite()关闭了"写"方向的连接,保留了"读"方向,这称为 TCP half-close。如果直接close(socket_fd),那么 socket_fd 就不能读或写了。

用 shutdown 而不用close 的效果是,如果对方已经发送了数据,这些数据还"在路上",那么muduo 不会漏收这些数据。换句话说,muduo 在 TCP这一层面解决了"当你打算关闭网络连接的时候,如何得知对方是否发了一些数据而你还没有收到?这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据就可以直接断开连接。

也就是说 muduo 把"主动关闭连接"这件事情分成两步来做,如果要主动关闭连接,它会先关本地"写"端,等对方关闭之后,再关本地"读"端。

另外,如果当前 output buffer 里还有数据尚未发出的话,muduo 也不会立刻调用 shutdownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 内核发送缓冲区有空间了,回调该函数
void TcpConnection::handleWrite() {
  loop_->assertInLoopThread();
  if (channel_->isWriting()) {
    ssize_t n = sockets::write(channel_->fd(), outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0) {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0) // 发送缓冲区已清空
      {
        channel_->disableWriting(); // 停止关注POLLOUT事件,以免出现busy loop
        if (writeCompleteCallback_) // 回调writeCompleteCallback_
        {
          // 应用层发送缓冲区被清空,就回调用writeCompleteCallback_
          loop_->queueInLoop(
              boost::bind(writeCompleteCallback_, shared_from_this()));
        }
        // 发送缓冲区已清空并且连接状态是kDisconnecting,要关闭连接
        if (state_ == kDisconnecting) {
          shutdownInLoop(); // 只关闭写的这一半,进入半关闭状态(close SHUT_WR)
        }
      } else {
        LOG_TRACE << "I am going to write more data";
      }
    } else {
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
      //   shutdownInLoop();
      // }
    }
  } else {
    LOG_TRACE << "Connection fd = " << channel_->fd()
              << " is down, no more writing";
  }
}

思考题:为什么sockets::write只写一次?

这是因为当应用层发送缓冲区的数据没全部写到内核发送缓冲区中,就不会停止关注POLLOUT事件;所以当内核发送缓冲区有空间了,回调TcpConnection::handleWrite()函数,直至应用层发送缓冲区的数据全都写入到了内核发送缓冲区里。

muduo这种关闭连接的方式对对方也有要求,那就是对方 read()到0字节之后会主动关闭连接(无论 shutdownWrite()还是close()),一般的网络序都会这样不是什么问题。当然,这么做有一个潜在的安全漏洞,万一对方故意不关闭连接,那么muduo 的连接就一直半开着,消耗系统资源。必要时可以调用 TcpConnection::handleClose()来强行关闭连接,这需要将 handleClose() 改为 public 成员函数。

完整的流程见下图。我们发完了数据,于是 shutdownWrite,发送TCP FIN 分节,对方会读到0字节,然后对方通常会关闭连接。这样 muduo 会读到0字节,然后muduo 关闭连接。

思考题:在 shutdown()之后,muduo 回调 connection callback的时间间隔大约是一个round-trip time,为什么?

如果有必要,对方可以在 read() 返回 0之后继续发送数据,这是直接利用了half-close TCP连接。muduo 不会漏收这些数据。

那么muduo什么时候真正 close socket 呢?在 TcpConnection 对象析构的时候。TcpConnection持有一个Socket 对象,Socket 是一个RAIIhandler,它的析构函数会close(sockfd_)。这样,如果发生 TcpConnection 对象泄漏,那么我们从/proc/pid/fd/就能找到没有关闭的文件描述符,便于查错。

muduo 在 read() 返回0的时候会回调 connection callback,TcpServer 或 Tcp-client 把 TcpConnection 的引用计数减一。如果引用计数降到零,则表明用户代码也不持有 TcpConnection,它就会析构了。

5. Boost.Asio的聊天服务器

本节将介绍一个与 Boost.Asio 的示例代码中的聊天服务器功能类似的网络服务程序,包括客户端与服务端的 muduo 实现。这个例子的主要目的是介绍如何处理分包,并初步涉及muduo的多线程功能。

5.1 TCP分包

“五个简单TCP示例"中处理的协议没有涉及分包,在TCP这种字节流协议上做应用层分包是网络编程的基本需求。分包指的是在发生一个消息 (message或一帧 (frame) 数据时,通过一定的处理,让接收方能从字节流中识别并截取(还原)出一个个消息。“粘包问题"是个伪问题。

对于短连接的 TCP 服务,分包不是一个问题,只要发送方主动关闭连接,就表示一条消息发送完毕,接收方 read() 返回0从而知道消息的结尾。例如daytime 和 time 协议。

对于长连接的 TCP 服务,分包有四种方法:

  1. 消息长度固定,比如 muduo 的roundtrip 示例就采用了固定的 16 字节消息。
  2. 使用特殊的字符或字符串作为消息的边界,例如 HTTP 协议的 headers 以"r\n"为字段的分隔符。
  3. 在每条消息的头部加一个长度字段,这恐怕是最常见的做法,本文的聊天协议也采用这一办法。
  4. 利用消息本身的格式来分包,例如XML 格式的消息中 的配对,或者JSON 格式中的{ … }的配对。解析这种消息格式通常会用到状态机 ( state machine )

在后文的代码讲解中还会仔细讨论用长度字段分包的常见陷阱。

聊天服务

本节实现的聊天服务非常简单,由服务端程序和客户端程序组成,协议如下:

  • 服务端程序在某个端口侦听(listen)新的连接。
  • 客户端向服务端发起连接。
  • 连接建立之后,客户端随时准备接收服务端的消息并在屏幕上显示出来.
  • 客户端接受键盘输入,以回车为界,把消息发送给服务端。
  • 服务端接收到消息之后,依次发送给每个连接到它的客户端;原来发送消息的客户端进程也会收到这条消息。
  • 一个服务端进程可以同时服务多个客户端进程。当有消息到达服务端后,每个客户端进程都会收到同一条消息,服务端广播发送消息的顺序是任意的,不一定哪个客户端会先收到这条消息。
  • (可选)如果消息A 先于消息 B 到达服务端,那每个客户端都会先收到A再收到 B。

这实际上是一个简单的基于 TCP 的应用层广播协议,由服务端负责把消息发送给每个连接到它的客户端。参与"聊天"的既可以是人,也可以是程序。在后文,我将介绍一个稍微复杂一点的例子 hub,它有"聊天室"的功能,客户端可以注册特定的 topic(s),并往某个 topic 发送消息,这样代码更有意思。

我在"谈一谈网络编程学习经验”(附录A)中把聊天服务列为"最主要的三个例子"之一,其与前面的"五个简单 TCP 协议"不同,聊天服务的特点是"连接之间的数据有交流,从a 连接收到的数据要发给 b 连接。这样对连接管理提出了更高的要求:如何用一个程序同时处理多个连接?fork()-per-connection 似乎是不行的。如何防止串话?b 有可能随时断开连接,而新建立的连接 可能恰好复用了 b的文件描述符,那么 a 会不会错误地把消息发给 c?“muduo 的这个例子充分展示了解决以上问题的手法。

5.2 消息格式

本聊天服务的消息格式非常简单,“消息"本身是一个字符串,每条消息有一个4 字节的头部,以网络序存放字符串的长度。消息之间没有间隙,字符串也不要求以’结尾。比方说有两条消息"hello"和"chenshuo”,那么打包后的字节流共有21字节:

1
2
0x00, 0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o',
0x00, 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'

打包的代码 这段代码把 string message 打包为muduo::net::Buffer,并通过conn发送。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// codec.h
void send(muduo::net::TcpConnection *conn,
            const muduo::StringPiece &message) {
    muduo::net::Buffer buf;
    buf.append(message.data(), message.size());
    int32_t len = static_cast<int32_t>(message.size());
    int32_t be32 = muduo::net::sockets::hostToNetwork32(len);
    buf.prepend(&be32, sizeof be32);
    conn->send(&buf);
}

muduo Buffer 有一个很好的功能,它在头部预留了8 个字节的空间,这样prepend()操作就不需要移动已有的数据,效率较高。

分包的代码 解析数据往往比生成数据更复杂,分包、打包也不例外。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// codec.h
void onMessage(const muduo::net::TcpConnectionPtr &conn,
                 muduo::net::Buffer *buf, muduo::Timestamp receiveTime) {
  while (buf->readableBytes() >= kHeaderLen) // kHeaderLen == 4
  {
    // FIXME: use Buffer::peekInt32()
    const void *data = buf->peek();
    int32_t be32 = *static_cast<const int32_t *>(data); // SIGBUS
    const int32_t len = muduo::net::sockets::networkToHost32(be32);
    if (len > 65536 || len < 0) {
      LOG_ERROR << "Invalid length " << len;
      conn->shutdown(); // FIXME: disable reading
      break;
    }
    // 代表一组消息已到达
    else if (buf->readableBytes() >= len + kHeaderLen) {
      buf->retrieve(kHeaderLen); //移动4字节
      muduo::string message(buf->peek(), len);
      messageCallback_(conn, message, receiveTime); //处理读取的消息
      buf->retrieve(len);
    } else {
      break;
    }
  }
}

onMessage()中构造完整的消息,通过 messageCallback_回调用户代码。有潜在的问题,在某些不支持非对齐内存访问的体系结构上会造成SIGBUScore dump,读取消息长度应该改用 Buffer::peekInt32()。上面这段代码用了 while 循环来反复读取数据,直到 Buffer 中的数据不够一条完整的消息。请读者思考,(如果换成 if (buf->readableBytes() >= kHeaderLen) 会有什么后果。

以前面提到的两条消息的字节流为例:

1
2
0x00, 0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o',
0x00, 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'

假设数据最终都全部到达,onMessage()至少要能正确处理以下各种数据到达的次序每种情况下messageCallback_都应该被调用两次:

  1. 每次收到一个字节的数据,onMessage() 被调用21次;
  2. 数据分两次到达,第一次收到 2个字节,不足消息的度字段;
  3. 数据分两次到达,第一次收到4 个字节,刚好够长度字段,但是没有 body;
  4. 数据分两次到达,第一次收到8 个字节,长度完整,但 body 不完整;
  5. 数据分两次到达,第一次收到9 个字节,长度完整,body 也完整;
  6. 数据分两次到达,第一次收到 10 个字节,第一条消息的长度完整、body 也完整,第二条消息长度不完整;
  7. 请自行移动和增加分点,验证各种情况:一共有超过 100 万种可能(2^21-1)。
  8. 数据一次就全部到达,这时必须用 while 循环来读出两条消息,否则消息会堆积在Buffer中

请读者验证 onMessage() 是否做到了以上几点。这个例子充分说明了 nonblocking read 必须和input buffer 一起使用。而且在写 decoder 的时候一定要在收到完整的消息之后再 retrieve 整条消息,除非接收方使用复杂的状态机来解码。

5.3 编解码器

有人评论 muduo 的接收缓冲区不能设置回调函数的触发条件,确实如此。每当 socket 可读时,muduo 的TpConnection 会读取数据并存 input buffer,然后回调用户的函数。不过,一个简单的间接层就能解决问题,让用户代码只关心"消息到达"而不是"数据到达”,如本例中的 LengthHeaderCodec 所展示的那样。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// codec.h
class LengthHeaderCodec : boost::noncopyable {
public:
  typedef std::function<void(const muduo::net::TcpConnectionPtr &,
                             const muduo::string &message, muduo::Timestamp)>
      StringMessageCallback;

  explicit LengthHeaderCodec(const StringMessageCallback &cb)
      : messageCallback_(cb) {}

  void onMessage(const muduo::net::TcpConnectionPtr &conn,
                 muduo::net::Buffer *buf, muduo::Timestamp receiveTime) {
    while (buf->readableBytes() >= kHeaderLen) // kHeaderLen == 4
    {
      // FIXME: use Buffer::peekInt32()
      const void *data = buf->peek();
      int32_t be32 = *static_cast<const int32_t *>(data); // SIGBUS
      const int32_t len = muduo::net::sockets::networkToHost32(be32);
      if (len > 65536 || len < 0) {
        LOG_ERROR << "Invalid length " << len;
        conn->shutdown(); // FIXME: disable reading
        break;
      }
      // 代表一组消息已到达
      else if (buf->readableBytes() >= len + kHeaderLen) {
        buf->retrieve(kHeaderLen); //移动4字节
        muduo::string message(buf->peek(), len);
        messageCallback_(conn, message, receiveTime); //处理读取的消息
        buf->retrieve(len);
      } else {
        break;
      }
    }
  }

  // FIXME: TcpConnectionPtr
  void send(muduo::net::TcpConnection *conn,
            const muduo::StringPiece &message) {
    muduo::net::Buffer buf;
    buf.append(message.data(), message.size());
    int32_t len = static_cast<int32_t>(message.size());
    int32_t be32 = muduo::net::sockets::hostToNetwork32(len);
    buf.prepend(&be32, sizeof be32);
    conn->send(&buf);
  }

private:
  StringMessageCallback messageCallback_;
  const static size_t kHeaderLen = sizeof(int32_t);
};

这段代码把以 Buffer* 为参数的 MessageCallback 转换成了以const string&为参数的 StringMessageCallback,让用户代码不必关心分包操作。如果编程语言相同,客户端和服务端可以(应该)共享同一个codec,这样既节省工作量,又避免因对协议理解不一致而导致的错误。

5.4 服务端的实现

聊天服务器的服务端代码小于 100 行,不到 asio 的一半。

除了经常见到的 EventLoop 和TcpServer,ChatServer 还定义了 codec_和 connections_作为成员,后者存放目前已建立的客户连接。在收到消息之后,服务器会遍历整个容器,把消息广播给其中的每一个TCP连接(onStringMessage())。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ChatServer : boost::noncopyable {
public:
  ChatServer(EventLoop *loop, const InetAddress &listenAddr)
      : server_(loop, listenAddr, "ChatServer"),
        codec_(std::bind(&ChatServer::onStringMessage, this, _1, _2, _3)) {
    server_.setConnectionCallback(
        std::bind(&ChatServer::onConnection, this, _1));
    server_.setMessageCallback(
        std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
  }

  void start() { server_.start(); }

private:
  void onConnection(const TcpConnectionPtr &conn) {
    LOG_INFO << conn->peerAddress().toIpPort() << " -> "
             << conn->localAddress().toIpPort() << " is "
             << (conn->connected() ? "UP" : "DOWN");

    if (conn->connected()) {
      connections_.insert(conn);
    } else {
      connections_.erase(conn);
    }
  }

  void onStringMessage(const TcpConnectionPtr &, const string &message,
                       Timestamp) {
    for (ConnectionList::iterator it = connections_.begin();
         it != connections_.end(); ++it) {
      codec_.send(get_pointer(*it), message);
    }
  }

  typedef std::set<TcpConnectionPtr> ConnectionList;
  TcpServer server_;
  LengthHeaderCodec codec_;
  ConnectionList connections_;
};

首先,在构造函数里注册回调;

这里有儿点值得注意,在以往的代码里是直接把本 class 的 onMessage() 注册给 server_; 这里我们把 LengthHeaderCodec::onMessage()注册给 server_,然后向codec_注册了 ChatServer::onStringMessage(),等于说让 codec_负责解析消息,然后把完整的消息回调给 chatServer。这正是我前面提到的"一个简单的间接层”,在不增加 muduo 库的复杂度的前提下,提供了足够的灵活性让我们在用户代码里完成 需要的工作。

另外,server_.start() 绝对不能在构造函数里调用,这么做将来会有线程安全的问题。

以下是处理连接的建立和断开的代码,注意它把新建的连接加人到 connections_容器中,把已断开的连接从容器中删除。这么做是为了避免内存和资源泄漏,TcpConnectionPtr是boost::shared_ptr,是muduo 里唯一一个默认采用 shared_ptr 来管理生命期的对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void onConnection(const TcpConnectionPtr &conn) {
  LOG_INFO << conn->peerAddress().toIpPort() << " -> "
            << conn->localAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");

  if (conn->connected()) {
    connections_.insert(conn);
  } else {
    connections_.erase(conn);
  }
}

以下是服务端处理消息的代码,它遍历整个 connections_容器,把消息打包发送给各个客户连接。

1
2
3
4
5
6
7
void onStringMessage(const TcpConnectionPtr &, const string &message,
                      Timestamp) {
  for (ConnectionList::iterator it = connections_.begin();
        it != connections_.end(); ++it) {
    codec_.send(get_pointer(*it), message);
  }
}

5.5 客户端的实现

我有时觉得服务端的程序常常比客户端的更容易写,聊天服务器再次验证了我的看法。客户端的复杂性来自于它要读取键盘输人,而 EventLoop 是独占线程的,所以我用了两个线程:main()函数所在的线负责读键盘,另外用一个EventLoopThread来处理网络IO。

现在来看代码,首先,在构造函数里注册回调,并使用了跟前面一样的 LengthHeaderCodec 作为中间层,负责打包、分包。

write()会由 main 线程调用,所以要加锁,这个锁不是为了保护TcpConnection而是为了保护shared_ptr。

1
2
3
4
5
6
void write(const StringPiece &message) {
  MutexLockGuard lock(mutex_);
  if (connection_) {
    codec_.send(get_pointer(connection_), message);
  }
}

onConnection()会由 EventLoop 线程调用,所以要加锁以保护 shared_ptr。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void onConnection(const TcpConnectionPtr &conn) {
  LOG_INFO << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");

  MutexLockGuard lock(mutex_);
  if (conn->connected()) {
    connection_ = conn;
  } else {
    connection_.reset();
  }
}

把收到的消息打印到屏幕,这个函数由 EventLoop 线程调用,但是不用加锁,因为 printf() 是线程安全的。注意这里不能用 std::cout«,它不是线程安全的。

1
2
3
4
void onStringMessage(const TcpConnectionPtr &, const string &message,
                      Timestamp) {
  printf("<<< %s\n", message.c_str());
}

main()函数里除了例行公事,还要启动 EventLoop 线程和读取键盘输人。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
int main(int argc, char *argv[]) {
  LOG_INFO << "pid = " << getpid();
  if (argc > 2) {
    EventLoopThread loopThread;
    uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
    InetAddress serverAddr(argv[1], port);

    ChatClient client(loopThread.startLoop(), serverAddr);
    client.connect();
    std::string line;
    while (std::getline(std::cin, line)) {
      client.write(line); // 发送数据行
    }
    client.disconnect();
    CurrentThread::sleepUsec(
        1000 * 1000); // wait for disconnect, see ace/logging/client.cc
  } else {
    printf("Usage: %s host_ip port\n", argv[0]);
  }
}

Chatclient 使用 EventLoopThread 的 EventLoop,而不是通常的主线程的EventLoop。

简单测试

打开三个命令行窗口,在第一个窗口运行:

1
$ ./asio_chat_server 3000

在第二个窗口运行:

1
$ ./asio_chat_client 127.0.0.1 3000

在第三个窗口运行同样的命令:

1
$ ./asio_chat_client 127.0.0.1 3000

这样就有两个客户端进程参与聊天。在第二个窗口里输人一些字符并回车,字符会出现在本窗口和第三个窗口中。

代码示例中还有另外三个 server 程序,都是多线程的,详细介绍在 p.260。

  • server_threaded.cc 使用多线程 TcpServer,并用mutex 来保护共享数据。
  • server_threaded_eficient.cc 对共享数据以"借 shared_ptr 实现 copy-on-write"的手法来降低锁竞争。
  • server_threaded_highperformance.cc 采用thread local变量,实现多线程高效转发,这个例子值得仔细阅读理解。
0%