io_uring.c
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>// 定义事件类型
#define EVENT_ACCEPT 0 // 接受新连接事件
#define EVENT_READ 1 // 读事件
#define EVENT_WRITE 2 // 写事件// 外部声明的协议处理函数
extern int kvs_protocol(char *msg, int length, char *response);// 定义连接信息结构体
struct conn_info {int fd; // 文件描述符int event; // 事件类型
};// 初始化服务器并监听指定端口
int p_init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0); // 创建TCP套接字if (sockfd < 0) {perror("socket creation failed");return -1;}struct sockaddr_in serveraddr;memset(&serveraddr, 0, sizeof(struct sockaddr_in)); // 初始化地址结构serveraddr.sin_family = AF_INET; // IPv4serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定到所有网络接口serveraddr.sin_port = htons(port); // 设置监听端口if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {perror("bind"); // 绑定失败时打印错误信息return -1;}listen(sockfd, 10); // 开始监听,最大连接队列长度为10return sockfd; // 返回监听套接字
}// 定义常量
#define ENTRIES_LENGTH 1024 // io_uring队列的大小
#define BUFFER_LENGTH 1024 // 数据缓冲区大小// 设置接收事件
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取一个SQE(提交队列条目)if (!sqe) {return -1; // 如果队列已满,返回错误}struct conn_info accept_info = {.fd = sockfd,.event = EVENT_READ,};io_uring_prep_recv(sqe, sockfd, buf, len, flags); // 准备接收操作memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info)); // 将连接信息存储到user_data中return 0;
}// 设置发送事件
int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取一个SQEif (!sqe) {return -1; // 如果队列已满,返回错误}struct conn_info accept_info = {.fd = sockfd,.event = EVENT_WRITE,};io_uring_prep_send(sqe, sockfd, buf, len, flags); // 准备发送操作memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info)); // 将连接信息存储到user_data中return 0;
}// 设置接受新连接事件
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取一个SQEif (!sqe) {return -1; // 如果队列已满,返回错误}struct conn_info accept_info = {.fd = sockfd,.event = EVENT_ACCEPT,};io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags); // 准备接受新连接操作memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info)); // 将连接信息存储到user_data中return 0;
}// 定义消息处理函数类型
typedef int (*msg_handler)(char *msg, int length, char *response);
static msg_handler kvs_handler; // 全局消息处理函数指针// 启动Proactor模型的服务器
int proactor_start(unsigned short port, msg_handler handler) {int sockfd = p_init_server(port); // 初始化服务器并监听端口if (sockfd < 0) {return -1; // 初始化失败时返回错误}kvs_handler = handler; // 设置消息处理函数// 初始化io_uring参数struct io_uring_params params;memset(¶ms, 0, sizeof(params));// 初始化io_uring实例struct io_uring ring;io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);// 定义客户端地址结构struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);// 注册接受新连接事件set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);// 定义数据缓冲区和响应缓冲区char buffer[BUFFER_LENGTH] = {0};char response[BUFFER_LENGTH] = {0};// 主循环while (1) {io_uring_submit(&ring); // 提交SQE到内核// 等待CQE(完成队列条目)struct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe);// 提取多个CQEstruct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // 获取批量完成事件for (int i = 0; i < nready; i++) {struct io_uring_cqe *entries = cqes[i]; // 当前完成事件struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info)); // 提取连接信息if (result.event == EVENT_ACCEPT) { // 处理接受新连接事件set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0); // 注册新的接受事件int connfd = entries->res; // 获取新连接的文件描述符set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0); // 注册读事件} else if (result.event == EVENT_READ) { // 处理读事件int ret = entries->res; // 获取读取的字节数if (ret == 0) { // 客户端关闭连接close(result.fd); // 关闭文件描述符} else if (ret > 0) { // 成功读取数据// 调用消息处理函数ret = kvs_handler(buffer, ret, response);// 注册写事件set_event_send(&ring, result.fd, response, ret, 0);}} else if (result.event == EVENT_WRITE) { // 处理写事件int ret = entries->res; // 获取写入的字节数set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0); // 注册读事件}}io_uring_cq_advance(&ring, nready); // 提交处理完成的CQE}
}