目录
- 同步
- 异步
- https 异步请求:
- 协程
- 1.为什么会要协程?
- 2.异步的运行流程是什么
- 3.协程的原语操作
- 4.协程的定义?
- 5.调度器的定义?
- 6.调度的策略?
- 7. api封装, hook
- 8.多核的模式?
- 9.协程的性能?
- 10.要有哪些案例?
- nty_server
- nty_ mysql_client.c
- nty_ mysql oper.c
- nty_ rediscli.c
同步
同步是在一个函数体内,
read,业务处理,等待IO可写,write是阻塞的可能返回-1;
异步
异步不好理解,流程不清晰:
在一个函数体内,
read,业务处理,要进行write就直接注册写事件(epoll_ctl),在回调函数中epoll_wait来处理事件。(主从reactor?)
https 异步请求:
client :发送http请求,不用等待http的reponse返回,把等待事件加到epoll(进行epoll_wait),客户端直接进行下一个http请求的发送
server :读完fd之后,把事件加入epoll中,判断是否可写,如果可写,就进行写操作,不可行进行下一个read
async_read(fd, );
if (poll(fd))//如果有就绪
read;
else//如果没有就绪
epoll ctl(epfd, fd);async_recv_from :
判断是否可读
可读:recv_from()
不可读:加入到epoll中,就返回(还需要一个callback(){epoll_wait})
协程
有异步的性能,同步的编程方式(发起请求,等待结果)
在一个函数内部commit
发送请求
等待结果
协程–> ntyco, libco:都是基于epoll做io调度。
1.为什么会要协程?
同步的编程方式,实行异步
2.异步的运行流程是什么
3.协程的原语操作
resume恢复
yield让出
switch()实现方式;
- longjmp/setjmp
- ucontext;
- 汇编实现
switch()协程切换:
1、协程切换只涉及基本的CPU上下文切换。
当前协程的 CPU 寄存器状态保存起来,然后将需要切换进来的协程的 CPU 寄存器状态加载的 CPU 寄存器上就 ok 了。
2、而且完全在用户态进行,
4.协程的定义?
5.调度器的定义?
管理所有协程
6.调度的策略?
如果两个时间一样,红黑树不支持同一个k有两个v,所以当插入的时候,我们先看红黑树是否有,有的话,可以+1微妙再插 入;
7. api封装, hook
Hook函数又可以叫做钩子函数,其本质就是一种函数劫持调用。简单来说,就是在用户调用系统函数的过程中,劫持用户的调用请求并注入一些自定义代码,转而再去调用目标系统函数。
也就是说,我们通过自定义系统同名函数,劫持到用户的调用,然后去执行自定义的操作。
通过Hook技术可以在调用方无感知的情况下进行了自定义的操作。
例如read函数,它的功能是从套接字中读取一定字节的数据,它的原型如下:
#include <unistd.h>ssize_t read(int fd, void *buf, size_t count);
然而read往往不能满足我们对网络性能的需求,因为它是同步的,也就是说,当调用read之后,我们就只能等待期返回结果,在这期间不能做任何事情了。
有没有方法hook系统的read函数,实现自定义的异步read呢。当然可以,结合协程的思想,我们可以这样设计hook的read函数:
当不满足可读条件时,先Yield当前协程,让出CPU。
当read条件就绪时,主协程唤醒read协程,read协程再去调用系统本身的read函数。
注意到这里有一点,主协程怎么知道IO条件是否就绪呢。幸运的是,Linux上有个东西可以帮助我们监听套接字的读写条件。这当然就是Epoll了(当然poll、select也可以)。
因此自定义的read可以这样实现:
dlsym把read函数劫持,然后执行read_f才是执行内核的read这个函数
ssize_t read(fd, buf, count) {1. 注册 fd 的可读事件到 epoll上2. yield当前协程3. 等待yield返回后,说明该协程被唤醒,那么可读条件一定满足。直接调用系统 read_f 函数即可。
}
整个hook过程中程序执行流如下图所示:
8.多核的模式?
9.协程的性能?
性能和非阻塞epoll差不多,只是业务逻辑按同步来写,好排查、好写
加入协程IO性能提高,只是因为异步性能高
管理io情况:协程(有上下文切换)< 非阻塞reactor/epoll
性能测试:
1.并发量,fd数量协程的数量
2.每秒接入量, fd --> coroutine create
3.断开连接,coroutine destroy
4. 1G数据, iperf
10.要有哪些案例?
nty_server
/** Author : WangBoJing , email : 1989wangbojing@gmail.com* * Copyright Statement:* --------------------* This software is protected by Copyright and the information contained* herein is confidential. The software may not be copied and the information* contained herein may not be used or disclosed except with the written* permission of Author. (C) 2017* ***** ***** ******** * ** ****** * * * *** ** * * ** *** ** * * ** ** ** * ** ** ** ** * *** *** ** * *********** ***** ***** ** ***** ** * ** ** ** ** ** *** ** * ** ** * ** * *** ** * ** * * ** ** *** ** * ** ** * ** * *** ** * ** * * ** ** *** ** * ** ** * ** ** *** ** * ** ** * ** ** *** ** * ** * * ** ** *** ** * ** ** * ** * ** *** *** ** * * ** * ** *** *** ** * ** * * ** *** ** ** * ** ** * ** *** ** ** * * ** * ** **
***** * **** * ***** *****************/#include "nty_coroutine.h"#include <arpa/inet.h>#define MAX_CLIENT_NUM 1000000
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)void server_reader(void *arg) {int fd = *(int *)arg;int ret = 0;struct pollfd fds;fds.fd = fd;fds.events = POLLIN;while (1) {char buf[1024] = {0};ret = nty_recv(fd, buf, 1024, 0);if (ret > 0) {if(fd > MAX_CLIENT_NUM) printf("read from server: %.*s\n", ret, buf);ret = nty_send(fd, buf, strlen(buf), 0);if (ret == -1) {nty_close(fd);break;}} else if (ret == 0) { nty_close(fd);break;}}
}void server(void *arg) {unsigned short port = *(unsigned short *)arg;free(arg);int fd = nty_socket(AF_INET, SOCK_STREAM, 0);if (fd < 0) return ;struct sockaddr_in local, remote;local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;bind(fd, (struct sockaddr*)&local, sizeof(struct sockaddr_in));listen(fd, 20);printf("listen port : %d\n", port);struct timeval tv_begin;gettimeofday(&tv_begin, NULL);while (1) {socklen_t len = sizeof(struct sockaddr_in);int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);if (cli_fd % 1000 == 999) {struct timeval tv_cur;memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));gettimeofday(&tv_begin, NULL);int time_used = TIME_SUB_MS(tv_begin, tv_cur);printf("client fd : %d, time_used: %d\n", cli_fd, time_used);}printf("new client comming\n");nty_coroutine *read_co;nty_coroutine_create(&read_co, server_reader, &cli_fd);}}int main(int argc, char *argv[]) {nty_coroutine *co = NULL;int i = 0;unsigned short base_port = 9096;for (i = 0;i < 100;i ++) {unsigned short *port = calloc(1, sizeof(unsigned short));*port = base_port + i;nty_coroutine_create(&co, server, port); no run}nty_schedule_run(); //runreturn 0;
}
调度器单线程
nty_ mysql_client.c
#include "nty_coroutine.h"#include <stdio.h>
#include <string.h>
#include <mysql.h>void func (void *arg) {MYSQL* m_mysql = mysql_init(NULL);if (!m_mysql) {printf("mysql_init failed\n");return ;}if (!mysql_real_connect(m_mysql, "192.168.233.133", "abc", "123456","KING_DB", 3306,NULL, CLIENT_FOUND_ROWS)) {printf("mysql_real_connect failed: %s\n", mysql_error(m_mysql));return ;} else{printf("mysql_real_connect success\n");}}int main() {
#if 1init_hook();nty_coroutine *co = NULL;nty_coroutine_create(&co, func, NULL);nty_schedule_run(); //run
#elsefunc(NULL);#endif
}
协程使用:不用考虑用户层的协议,我们只用把io改为异步的;(放大象到冰箱,打开,放,关)
nty_ mysql oper.c
#include "nty_coroutine.h"
#include <mysql.h>#define KING_DB_SERVER_IP "192.168.233.133"
#define KING_DB_SERVER_PORT 3306#define KING_DB_USERNAME "king"
#define KING_DB_PASSWORD "123456"#define KING_DB_DEFAULTDB "KING_DB"#define SQL_INSERT_TBL_USER "INSERT TBL_USER(U_NAME, U_GENDER) VALUES('King', 'man');"
#define SQL_SELECT_TBL_USER "SELECT * FROM TBL_USER;"#define SQL_DELETE_TBL_USER "CALL PROC_DELETE_USER('King')"
#define SQL_INSERT_IMG_USER "INSERT TBL_USER(U_NAME, U_GENDER, U_IMG) VALUES('King', 'man', ?);"#define SQL_SELECT_IMG_USER "SELECT U_IMG FROM TBL_USER WHERE U_NAME='King';"#define FILE_IMAGE_LENGTH (64*1024)
// C U R D -->
// int king_mysql_select(MYSQL *handle) { //// mysql_real_query --> sqlif (mysql_real_query(handle, SQL_SELECT_TBL_USER, strlen(SQL_SELECT_TBL_USER))) {printf("mysql_real_query : %s\n", mysql_error(handle));return -1;}// store --> MYSQL_RES *res = mysql_store_result(handle);if (res == NULL) {printf("mysql_store_result : %s\n", mysql_error(handle));return -2;}// rows / fieldsint rows = mysql_num_rows(res);printf("rows: %d\n", rows);int fields = mysql_num_fields(res);printf("fields: %d\n", fields);// fetchMYSQL_ROW row;while ((row = mysql_fetch_row(res))) {int i = 0;for (i = 0;i < fields;i ++) {printf("%s\t", row[i]);}printf("\n");}mysql_free_result(res);return 0;
}// filename : path + file name
// buffer : store image dataint read_image(char *filename, char *buffer) {if (filename == NULL || buffer == NULL) return -1;FILE *fp = fopen(filename, "rb"); //if (fp == NULL) {printf("fopen failed\n");return -2;}// file sizefseek(fp, 0, SEEK_END);int length = ftell(fp); // file sizefseek(fp, 0, SEEK_SET);int size = fread(buffer, 1, length, fp);if (size != length) {printf("fread failed: %d\n", size);return -3;}fclose(fp);return size;}// filename :
// buffer :
// length :int write_image(char *filename, char *buffer, int length) {if (filename == NULL || buffer == NULL || length <= 0) return -1;FILE *fp = fopen(filename, "wb+"); //if (fp == NULL) {printf("fopen failed\n");return -2;}int size = fwrite(buffer, 1, length, fp);if (size != length) {printf("fwrite failed: %d\n", size);return -3;}fclose(fp);return size;
}int mysql_write(MYSQL *handle, char *buffer, int length) {if (handle == NULL || buffer == NULL || length <= 0) return -1;MYSQL_STMT *stmt = mysql_stmt_init(handle);int ret = mysql_stmt_prepare(stmt, SQL_INSERT_IMG_USER, strlen(SQL_INSERT_IMG_USER));if (ret) {printf("mysql_stmt_prepare : %s\n", mysql_error(handle));return -2;}MYSQL_BIND param = {0};param.buffer_type = MYSQL_TYPE_LONG_BLOB;param.buffer = NULL;param.is_null = 0;param.length = NULL;ret = mysql_stmt_bind_param(stmt, ¶m);if (ret) {printf("mysql_stmt_bind_param : %s\n", mysql_error(handle));return -3;}ret = mysql_stmt_send_long_data(stmt, 0, buffer, length);if (ret) {printf("mysql_stmt_send_long_data : %s\n", mysql_error(handle));return -4;}ret = mysql_stmt_execute(stmt);if (ret) {printf("mysql_stmt_execute : %s\n", mysql_error(handle));return -5;}ret = mysql_stmt_close(stmt);if (ret) {printf("mysql_stmt_close : %s\n", mysql_error(handle));return -6;}return ret;
}int mysql_read(MYSQL *handle, char *buffer, int length) {if (handle == NULL || buffer == NULL || length <= 0) return -1;MYSQL_STMT *stmt = mysql_stmt_init(handle);int ret = mysql_stmt_prepare(stmt, SQL_SELECT_IMG_USER, strlen(SQL_SELECT_IMG_USER));if (ret) {printf("mysql_stmt_prepare : %s\n", mysql_error(handle));return -2;}MYSQL_BIND result = {0};result.buffer_type = MYSQL_TYPE_LONG_BLOB;unsigned long total_length = 0;result.length = &total_length;ret = mysql_stmt_bind_result(stmt, &result);if (ret) {printf("mysql_stmt_bind_result : %s\n", mysql_error(handle));return -3;}ret = mysql_stmt_execute(stmt);if (ret) {printf("mysql_stmt_execute : %s\n", mysql_error(handle));return -4;}ret = mysql_stmt_store_result(stmt);if (ret) {printf("mysql_stmt_store_result : %s\n", mysql_error(handle));return -5;}while (1) {ret = mysql_stmt_fetch(stmt);if (ret != 0 && ret != MYSQL_DATA_TRUNCATED) break; // int start = 0;while (start < (int)total_length) {result.buffer = buffer + start;result.buffer_length = 1;mysql_stmt_fetch_column(stmt, &result, 0, start);start += result.buffer_length;}}mysql_stmt_close(stmt);return total_length;}void coroutine_func(void *arg) {MYSQL mysql;printf("coroutine_func\n");if (NULL == mysql_init(&mysql)) {printf("mysql_init : %s\n", mysql_error(&mysql));return ;}if (!mysql_real_connect(&mysql, KING_DB_SERVER_IP, KING_DB_USERNAME, KING_DB_PASSWORD, KING_DB_DEFAULTDB, KING_DB_SERVER_PORT, NULL, 0)) {printf("mysql_real_connect : %s\n", mysql_error(&mysql));goto Exit;}// mysql --> insert printf("case 1 : mysql --> insert \n");
#if 1if (mysql_real_query(&mysql, SQL_INSERT_TBL_USER, strlen(SQL_INSERT_TBL_USER))) {printf("mysql_real_query : %s\n", mysql_error(&mysql));goto Exit;}
#endifking_mysql_select(&mysql);// mysql --> delete printf("case 2 : mysql --> delete \n");
#if 1if (mysql_real_query(&mysql, SQL_DELETE_TBL_USER, strlen(SQL_DELETE_TBL_USER))) {printf("mysql_real_query : %s\n", mysql_error(&mysql));goto Exit;}
#endifking_mysql_select(&mysql);printf("case 3 : mysql --> read image and write mysql\n");char buffer[FILE_IMAGE_LENGTH] = {0};int length = read_image("0voice.jpg", buffer);if (length < 0) goto Exit;mysql_write(&mysql, buffer, length); /// printf("case 4 : mysql --> read mysql and write image\n");memset(buffer, 0, FILE_IMAGE_LENGTH);length = mysql_read(&mysql, buffer, FILE_IMAGE_LENGTH);write_image("a.jpg", buffer, length);Exit:mysql_close(&mysql);return ;}int main() {
#if 1//init_hook();nty_coroutine *co = NULL;nty_coroutine_create(&co, coroutine_func, NULL);nty_schedule_run(); //run
#elsecoroutine_func(NULL);#endif
}
nty_ rediscli.c
#include "nty_coroutine.h"#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <hiredis.h>void coroutine_func(void *arg) {unsigned int j, isunix = 0;redisContext *c;redisReply *reply;const char *hostname = "192.168.233.133";int port = 6379;struct timeval timeout = { 1, 500000 }; // 1.5 secondsif (isunix) {c = redisConnectUnixWithTimeout(hostname, timeout);} else {c = redisConnectWithTimeout(hostname, port, timeout);}if (c == NULL || c->err) {if (c) {printf("Connection error: %s\n", c->errstr);redisFree(c);} else {printf("Connection error: can't allocate redis context\n");}exit(1);}/* PING server */reply = redisCommand(c,"PING");printf("PING: %s\n", reply->str);freeReplyObject(reply);/* Set a key */reply = redisCommand(c,"SET %s %s", "foo", "hello world");printf("SET: %s\n", reply->str);freeReplyObject(reply);/* Set a key using binary safe API */reply = redisCommand(c,"SET %b %b", "bar", (size_t) 3, "hello", (size_t) 5);printf("SET (binary API): %s\n", reply->str);freeReplyObject(reply);/* Try a GET and two INCR */reply = redisCommand(c,"GET foo");printf("GET foo: %s\n", reply->str);freeReplyObject(reply);reply = redisCommand(c,"INCR counter");printf("INCR counter: %lld\n", reply->integer);freeReplyObject(reply);/* again ... */reply = redisCommand(c,"INCR counter");printf("INCR counter: %lld\n", reply->integer);freeReplyObject(reply);/* Create a list of numbers, from 0 to 9 */reply = redisCommand(c,"DEL mylist");freeReplyObject(reply);for (j = 0; j < 10; j++) {char buf[64];snprintf(buf,64,"%u",j);reply = redisCommand(c,"LPUSH mylist element-%s", buf);freeReplyObject(reply);}/* Let's check what we have inside the list */reply = redisCommand(c,"LRANGE mylist 0 -1");if (reply->type == REDIS_REPLY_ARRAY) {for (j = 0; j < reply->elements; j++) {printf("%u) %s\n", j, reply->element[j]->str);}}freeReplyObject(reply);/* Disconnects and frees the context */redisFree(c);return ;
}int main(int argc, char **argv) {
#if 1//init_hook();nty_coroutine *co = NULL;nty_coroutine_create(&co, coroutine_func, NULL);nty_schedule_run(); //run
#elsecoroutine_func(NULL);}