Socket编程(六)

I/O复用

Posted by Gavin on December 8, 2019

直道相思了无益

未妨惆怅是清狂

概念

前言

I/O复用能够使程序同时监听多个文件描述符,提高程序性能,一般来说下列情况需要使用I/O复用:

  • 客户端程序需要同时处理多个socket,比如非阻塞connect技术
  • 客户端程序需要同时处理用户输入和网络连接,比如聊天室程序
  • TCP服务器要同时处理监听socket和连接socket,这是I/O复用使用最多的场合
  • 服务器要同时处理TCP和UDP请求,比如回射服务器
  • 服务器要同时监听多个端口或处理多个服务,比如xinetd服务器

PS:I/O复用虽然能同时监听多个文件描述符,但是它本身是阻塞的,多个文件描述符准备完毕后,如果不采取措施,也只能按顺序执行,要实现并发,只能引入多进程或多线程等其他手段

select系统调用

select系统调用是在一定指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。

#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, timeval *timeout)

nfds指定被监听的文件描述符总数,通常为监听的所有文件描述符最大值加1,因为文件描述符从0开始计数

文件描述符就绪条件:

  • 可读
    • socket内核接收缓冲区中的字节数大于或等于SO_RCVLOWWAT,此时无阻塞读,并且读操作返回字节数大于0
    • socket通信对方关闭连接,此时读操作返回0
    • 监听socket上有新请求
    • socket上有未处理的错误,使用getsockopt来读取和清除该错误
  • 可写
    • socket内核发送缓冲区中的字节数大于或等于SO_SNDLOWWAT,此时无阻塞写,并且写操作返回字节数大于0
    • socket写操作被关闭,对写操作被关闭的socket执行写操作将触发SIGPIPE信号
    • socket使用非阻塞connect连接成功或者失败(超时)
    • socket上有未处理的错误,使用getsockopt来读取和清除该错误
#include <iostream>
#include <string>

#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <assert.h>
using namespace std;

int main(int argc, char const *argv[])
{
	int sock = socket(PF_INET, SOCK_STREAM, 0);
	assert(sock != -1);

	sockaddr_in server;
	server.sin_family = PF_INET;
	server.sin_port = 8080;
	server.sin_addr.s_addr = inet_addr("127.0.0.1");

	int bind_status = bind(sock,(sockaddr*)&server,sizeof(server));
	assert(bind_status == 0);

	int listen_status = listen(sock, 5);
	assert(listen_status == 0);

	socklen_t len = sizeof(server);

	string buf;
	fd_set read_fds;
	fd_set excepton_fds;
	FD_ZERO(&read_fds);
	FD_ZERO(&excepton_fds);

	while (true) {
		buf = string(512,'\0');

		int client = accept(sock, (sockaddr*)&server, &len);
		assert(client != -1);

		FD_SET(client,&read_fds);
		FD_SET(client,&excepton_fds);
		
		int ret = select(client+1, &read_fds, NULL, &excepton_fds, NULL);
		if (ret < 0) cout << "select failed!" << endl;
		if (FD_ISSET(client,&read_fds)) {
			ret = recv(client, const_cast<char*>(buf.c_str()), buf.length()-1, 0);
			assert(ret != -1);
			cout << buf << endl;
			FD_ZERO(&read_fds);
		}
		else if (FD_ISSET(client,&excepton_fds)) {
			ret = recv(client, const_cast<char*>(buf.c_str()), buf.length()-1, MSG_OOB);
			FD_ZERO(&excepton_fds);
		}
		close(client);
	}

	close(sock);
	return 0;
}

poll系统调用

poll系统调用和select作用类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout)

epoll系统调用

epoll是Linux特有的I/O复用函数,epoll采用一组函数来实现功能,而非单个函数。epoll把用户关心的文件描述符上的事件放在内核的一个事件表中,从而无需像select、poll那样每次调用都需要重复传入文件描述符集或事件集。但epoll需要一个额外的文件描述符来唯一标识内核中的这个事件表,这个文件描述符利用如下函数创建:

#include <sys/epoll.h>
int epoll_create(int size)

int epoll_ctl(int epfd, int op, in fd, struct epoll_event *event)

epoll系列函数的主要接口是epoll_wait函数,它在一段时间内等待一组文件描述符上的事件:

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *event, int maxevents, int timeout)

epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(epfd)中复制到第二个参数指向的数组中,这个数组只输出epoll_wait检测到的就绪事件,不像select和poll的数组那样,既传入用户注册的事件,又输出内核检测到的就绪事件,这提高了程序索引就绪文件描述符的效率。

epoll和poll使用差别:

//poll
int ret = poll(fds, MAX_EVENT_NUMBER,-1)
for (int i = 0; i < MAX_EVENT_NUMBER; ++i) {
	if (fds[i].revents & POLLIN) int sockfd = fds[i].fd;
}

//epoll
int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
for (int i = 0; i < ret; ++i) int sockfd = events[i].data.fd;

epoll操作文件描述符的模式:

  • LT(水平触发)
    默认,相当于高效poll,epoll_wait检测到其上有事件发生并将此事通知应用程序后,应用程序可以不立即处理该事件,这样,应用程序下次调用epoll_wait时,应用程序还会再被通告此事件,直到被处理。
  • ET(边缘触发)
    检测到其上有事件发生并将此事通知应用程序后,应用程序必须立即处理该事件

比较

系统调用 select poll epoll
事件集合 用户分别传入感兴趣的可读、可写、异常事件,内核通过对此进行修改反馈就绪事件,因而,每次调用select都需要重置这三个参数 统一处理所有事件类型,用户通过pollfd.events注册感兴趣的事件,内核通过修改revents反馈就绪事件 内核通过一个事件表直接管理用户感兴趣的所有事件,每次调用epoll_wait的events仅反馈就绪事件
索引就绪文件时间复杂度 \(O(n)\) \(O(n)\) \(O(1)\)
最大支持文件描述符数目 一般有最大限制 65535 65535
工作模式 LT LT LT & ET
内核实现 轮询方式检测就绪事件 轮询方式检测就绪事件 回调方式检测就绪事件

实践

非阻塞connect

#include <iostream>
#include <string>

#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <errno.h>

using namespace std;

int main(int argc, char const *argv[])
{
	int sock = socket(PF_INET, SOCK_STREAM, 0);
	assert(sock != -1);

	sockaddr_in client;
	client.sin_family = PF_INET;
	client.sin_port = 8080;
	client.sin_addr.s_addr = inet_addr("127.0.0.1");

	int status = fcntl(sock, F_GETFL, 0);
	fcntl(sock, F_SETFL, status | O_NONBLOCK);

	int ret = connect(sock,(sockaddr*)&client,sizeof(client));
	if (ret == 0) {
		fcntl(sock, F_SETFL, status);
		string data = "Hello World";
		if (send(sock,data.c_str(),data.length(),0) == 0) cout << "Send success immediately!" << endl;
	}
	else {
		if (errno != EINPROGRESS) cout << "Connect failed!" << endl;
		fd_set read_fds, write_fds;
		//struct timeval timeout;

		FD_ZERO(&read_fds);
		FD_ZERO(&write_fds);

		FD_SET(sock,&read_fds);
		FD_SET(sock,&write_fds);

		//timeout.tv_sec = 100;
		//timeout.tv_usec = 100;

		ret = select(sock+1, &read_fds, &write_fds, nullptr, NULL);
		if (ret < 0) cout << "Timeout!!" << endl;

		if (FD_ISSET(sock,&write_fds)) {
			if (FD_ISSET(sock,&read_fds)) {
				if (connect(sock,(sockaddr*)&client,sizeof(client)) != 0) {
					int error = 0;
					socklen_t len = sizeof(errno);
					if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len)) {
						cout << "clear errors" << endl;
						close(sock);
					}
					if (errno != EISCONN) {
						cout << "connect error!" << endl;
						close(sock);
					}
				}
			}
			fcntl(sock, F_SETFL, status);
			string data = "Hello World";
			if (send(sock,data.c_str(),data.length(),0) == 0) cout << "Send success immediately!" << endl;
		}
		else {
			cout << "connect failed!" << endl;
			close(sock);
		}
	}

	return 0;
}