第九章 线程(二) 多线程轮转调度
本文是对《操作系统真象还原》第九章(二)学习的笔记,欢迎大家一起交流,目前所有代码已托管至 fdx-xdf/MyTinyOS 。
上一节中成功创建了线程并运行,这一节要实现的是多线程轮转调度。
链表数据结构准备
我们要把线程组织起来,采取的是双向链表形式,将各个线程串联起来,达到如下效果。
list.h
位于lib/kernel/list.h
链表节点定义如下:
/********** 定义链表结点成员结构 ***********
*结点中不需要数据成元,只要求前驱和后继结点指针*/
struct list_elem {struct list_elem* prev; // 前躯结点struct list_elem* next; // 后继结点
};
链表结构如下:
/* 链表结构,用来实现队列 */
struct list {
/* head是队首,是固定不变的,不是第1个元素,第1个元素为head.next */struct list_elem head;
/* tail是队尾,同样是固定不变的 */struct list_elem tail;
};
定义好的操作函数如下:
/* 自定义函数类型function,用于在list_traversal中做回调函数 */
typedef bool (function)(struct list_elem*, int arg);void list_init (struct list*);
void list_insert_before(struct list_elem* before, struct list_elem* elem);
void list_push(struct list* plist, struct list_elem* elem);
void list_iterate(struct list* plist);
void list_append(struct list* plist, struct list_elem* elem);
void list_remove(struct list_elem* pelem);
struct list_elem* list_pop(struct list* plist);
bool list_empty(struct list* plist);
uint32_t list_len(struct list* plist);
struct list_elem* list_traversal(struct list* plist, function func, int arg);
bool elem_find(struct list* plist, struct list_elem* obj_elem);
list.c
lib/kernel/list.c
实现操作函数,如下:
#include "list.h"
#include "interrupt.h"/* 初始化双向链表list */
void list_init (struct list* list) {list->head.prev = NULL;list->head.next = &list->tail;list->tail.prev = &list->head;list->tail.next = NULL;
}/* 把链表元素elem插入在元素before之前 */
void list_insert_before(struct list_elem* before, struct list_elem* elem) { enum intr_status old_status = intr_disable();/* 将before前驱元素的后继元素更新为elem, 暂时使before脱离链表*/ before->prev->next = elem; /* 更新elem自己的前驱结点为before的前驱,* 更新elem自己的后继结点为before, 于是before又回到链表 */elem->prev = before->prev;elem->next = before;/* 更新before的前驱结点为elem */before->prev = elem;intr_set_status(old_status);
}/* 添加元素到列表队首,类似栈push操作 */
void list_push(struct list* plist, struct list_elem* elem) {list_insert_before(plist->head.next, elem); // 在队头插入elem
}/* 追加元素到链表队尾,类似队列的先进先出操作 */
void list_append(struct list* plist, struct list_elem* elem) {list_insert_before(&plist->tail, elem); // 在队尾的前面插入
}/* 使元素pelem脱离链表 */
void list_remove(struct list_elem* pelem) {enum intr_status old_status = intr_disable();pelem->prev->next = pelem->next;pelem->next->prev = pelem->prev;intr_set_status(old_status);
}/* 将链表第一个元素弹出并返回,类似栈的pop操作 */
struct list_elem* list_pop(struct list* plist) {struct list_elem* elem = plist->head.next;list_remove(elem);return elem;
} /* 从链表中查找元素obj_elem,成功时返回true,失败时返回false */
bool elem_find(struct list* plist, struct list_elem* obj_elem) {struct list_elem* elem = plist->head.next;while (elem != &plist->tail) {if (elem == obj_elem) {return true;}elem = elem->next;}return false;
}/* 把列表plist中的每个元素elem和arg传给回调函数func,* arg给func用来判断elem是否符合条件.* 本函数的功能是遍历列表内所有元素,逐个判断是否有符合条件的元素。* 找到符合条件的元素返回元素指针,否则返回NULL. */
struct list_elem* list_traversal(struct list* plist, function func, int arg) {struct list_elem* elem = plist->head.next;
/* 如果队列为空,就必然没有符合条件的结点,故直接返回NULL */if (list_empty(plist)) { return NULL;}while (elem != &plist->tail) {if (func(elem, arg)) { // func返回ture则认为该元素在回调函数中符合条件,命中,故停止继续遍历return elem;} // 若回调函数func返回true,则继续遍历elem = elem->next; }return NULL;
}/* 返回链表长度 */
uint32_t list_len(struct list* plist) {struct list_elem* elem = plist->head.next;uint32_t length = 0;while (elem != &plist->tail) {length++; elem = elem->next;}return length;
}/* 判断链表是否为空,空时返回true,否则返回false */
bool list_empty(struct list* plist) { // 判断队列是否为空return (plist->head.next == &plist->tail ? true : false);
}
较简单,不多解释,唯一要注意的是对链表上数据进行修改时要保持原子操作,即修改前关中断,修改完再置为原来的状态。
多线程轮转调度
数据结构准备
我们主要通过时钟中断进行线程调度,其主要过程如下:
- 每个线程在运行之前都被分配一个时间片,这个时间片其实就是PCB中的优先级
priority
- 假如
thread_work_a
这个线程被分配的时间片为31个时钟周期,那么每当线程thread_work_a
运行一个时钟周期(也就是没发生一次时钟中断)之后,时间片就减一- 因此我们需要有一个变量来记录线程可运行的剩余时间,也就是PCB当中的
ticks
- 当ticks的值减为0的时候,就表示该线程的时间片用完了
- 此时调度函数便将该线程的状态从运行态修改为就绪态,然后将其插入到就绪态的末尾,并从就绪队列的头部弹出一个新的线程上cpu运行,然后继续上述过程,每运行一个时钟就减去一个时钟,直到可用的时钟用完继续切换
经过以上描述,我们需要为线程的PCB新增一些数据成员:
- 用以表示该线程可运行的剩余时间(时钟数):
ticks
- 用以表示线程从运行开始到当前时间一共运行的时间:
elapsed_ticks
- 用以将所有处于就绪状态的PCB连接起来,使其成为一个就绪队列的节点成员:
general_tag
- 用以将所有线程PCB连接起来(只要是线程就连接,不管其处于什么状态),使其成为一个记录所有线程的队列的节点成员:
all_list_tag
如下:
/* 进程或线程的pcb,程序控制块, 此结构体用于存储线程的管理信息*/
struct task_struct {uint32_t* self_kstack; // 用于存储线程的栈顶位置,栈顶放着线程要用到的运行信息enum task_status status;uint8_t priority; // 线程优先级char name[16]; //用于存储自己的线程的名字uint8_t ticks; //线程允许上处理器运行还剩下的滴答值,因为priority不能改变,所以要在其之外另行定义一个值来倒计时uint32_t elapsed_ticks; //此任务自上cpu运行后至今占用了多少cpu嘀嗒数, 也就是此任务执行了多久*/struct list_elem general_tag; //general_tag的作用是用于线程在一般的队列(如就绪队列或者等待队列)中的结点struct list_elem all_list_tag; //all_list_tag的作用是用于线程队列thread_all_list(这个队列用于管理所有线程)中的结点uint32_t* pgdir; // 进程自己页表的虚拟地址uint32_t stack_magic; //如果线程的栈无限生长,总会覆盖地pcb的信息,那么需要定义个边界数来检测是否栈已经到了PCB的边界};
然后定义一些线程队列和其他数据结构,下面会看到他们的作用。
struct task_struct *main_thread; // 主线程PCB
struct list thread_ready_list; // 就绪队列
struct list thread_all_list; // 所有任务队列
static struct list_elem *thread_tag; // 用于保存队列中的线程结点
轮转调度前的准备
pcb初始化的改变
由于上面pcb结构体改变了,所以pcb初始化函数也要改变
/* 初始化线程基本信息 , pcb中存储的是线程的管理信息,此函数用于根据传入的pcb的地址,线程的名字等来初始化线程的管理信息*/
void init_thread(struct task_struct *pthread, char *name, int prio)
{memset(pthread, 0, sizeof(*pthread)); // 把pcb初始化为0if (pthread == main_thread)pthread->status = TASK_RUNNING;elsepthread->status = TASK_READY;strcpy(pthread->name, name);pthread->priority = prio;/* self_kstack是线程自己在内核态下使用的栈顶地址 */pthread->self_kstack = (uint32_t *)((uint32_t)pthread + PG_SIZE); // 本操作系统比较简单,线程不会太大,就将线程栈顶定义为pcb地址//+4096的地方,这样就留了一页给线程的信息(包含管理信息与运行信息)空间pthread->ticks = prio;pthread->elapsed_ticks = 0;pthread->pgdir = NULL;pthread->stack_magic = 0x19870916; // /定义的边界数字,随便选的数字来判断线程的栈是否已经生长到覆盖pcb信息了
}
5-8行可以看到,如果新线程是主线程,则是TASK_RUNNING状态,否则置为TASK_READY状态。
然后14-16行又加了时钟数相关东西,由于控制在cpu运行时间,另外,PCB
中新增的pgdir变量,表示进程自己的页表虚拟地址,该信息是给将来的进程使用的,线程共享进程的虚拟地址空间,因此该值在这里应该为NULL
。
线程创建初始化的改变
当PCB和线程栈的运行信息初始化后,就需要开始着手启动线程,但这是之前的逻辑,但在这里,我们需要统一调度逻辑,因此,我们需要将准备好的PCB插入就绪队列和全队列中,将来由调度器从就绪队列中选择线程然后上CPU运行
/* 创建一优先级为prio的线程,线程名为name,线程所执行的函数是function(func_arg) */
struct task_struct *thread_start(char *name, int prio, thread_func function, void *func_arg)
{/* pcb都位于内核空间,包括用户进程的pcb也是在内核空间 */struct task_struct *thread = get_kernel_pages(1); // 为线程的pcb申请4K空间的起始地址init_thread(thread, name, prio); // 初始化线程的pcbthread_create(thread, function, func_arg); // 初始化线程的线程栈// 确保之前不在就绪队列ASSERT(!elem_find(&thread_ready_list, &thread->general_tag));// 加入就绪队列list_append(&thread_ready_list, &thread->general_tag);// 确保不在队列ASSERT(!elem_find(&thread_all_list, &thread->all_list_tag));// 加入队列list_append(&thread_all_list, &thread->all_list_tag);return thread;
}
kernel_thread函数也有变化,我们需要先开启中断,否则后面的线程无法执行
// 线程启动器
/* 由kernel_thread去执行function(func_arg) , 这个函数就是线程中去开启我们要运行的函数*/
static void kernel_thread(thread_func *function, void *func_arg)
{/* 执行function前要开中断,避免后面的时钟中断被屏蔽,而无法调度其它线程 */intr_enable();function(func_arg);
}
main线程的初始化
其实从BIOS开始到MBR,到loader,到kernel,主线程一直都在运行,但是现在需要多线程轮转了,我们需要对其进行初始化,如下:
/* 将kernel中的main函数完善为主线程 */
static void make_main_thread(void)
{/* 因为main线程早已运行,咱们在loader.S中进入内核时的mov esp,0xc009f000,就是为其预留了tcb,地址为0xc009e000,因此不需要通过get_kernel_page另分配一页*/main_thread = running_thread();init_thread(main_thread, "main", 31);/* main函数是当前线程,当前线程不在thread_ready_list中,* 所以只将其加在thread_all_list中. */ASSERT(!elem_find(&thread_all_list, &main_thread->all_list_tag));list_append(&thread_all_list, &main_thread->all_list_tag);
}
由于main现在正在运行,所以不需要加入就绪队列,加入全队列即可。,running_thread定义如下,用于获取当前线程PCB
struct task_struct *running_thread(void)
{uint32_t esp;asm("mov %%esp, %0" : "=g"(esp));/* 取esp整数部分即pcb起始地址 */return (struct task_struct *)(esp & 0xfffff000);
}
该函数的原理也很简单,我们指定pcb在一个自然页的起始位置,线程栈在高地址,所以将16进制的栈顶的低3位直接清0即可。
初始化总结
我们的总初始化函数如下:
- 初始化PCB
- 初始化线程栈的运行信息
- 将初始化所有信息的线程PCB插入就绪队列和全队列,将来由调度统一选择调度
- 初始化main线程的PCB
其中在加载kernel时完成的应是第4步,如下:
/* 初始化线程环境 */
void thread_init(void)
{put_str("thread_init start\n");list_init(&thread_ready_list);list_init(&thread_all_list);/* 将当前main函数创建为线程 */make_main_thread();put_str("thread_init done\n");
}
轮转调度的实现
时钟中断函数
我们知道每次时钟到达时都要进行中断,然后当前线程时间片减1,减为0时就要进行调度
uint32_t ticks; // ticks是内核自中断开启以来总共的嘀嗒数/* 时钟的中断处理函数 */
static void intr_timer_handler(void)
{struct task_struct *cur_thread = running_thread();ASSERT(cur_thread->stack_magic == 0x19870916); // 检查栈是否溢出cur_thread->elapsed_ticks++;ticks++; // 从内核第一次处理时间中断后开始至今的滴哒数,内核态和用户态总共的嘀哒数if (cur_thread->ticks == 0)schedule();elsecur_thread->ticks--;
}
然后在初始化的时候对该中断函数注册
/* 初始化PIT8253 */
void timer_init()
{put_str("timer_init start\n");/* 设置8253的定时周期,也就是发中断的周期 */frequency_set(CONTRER0_PORT, COUNTER0_NO, READ_WRITE_LATCH, COUNTER_MODE, COUNTER0_VALUE);register_handler(0x20, intr_timer_handler);put_str("timer_init done\n");
}
其中register_handler
注册函数在interrupt.c
中定义如下:
/* 在中断处理程序数组第vector_no个元素中注册安装中断处理程序function */
void register_handler(uint8_t vector_no, intr_handler function)
{idt_table[vector_no] = function;
}
就是直接把c语言版本的中断处理函数指向自定义的处理函数
调度实现
schedule
我们在时钟中断中进行判断,如果当前时间片为0,就执行schedule
函数,定义在thread.c
中
/* 实现任务调度 */
void schedule(void)
{ASSERT(intr_get_status() == INTR_OFF);struct task_struct* cur = running_thread(); if (cur->status == TASK_RUNNING) { // 若此线程只是cpu时间片到了,将其加入到就绪队列尾ASSERT(!elem_find(&thread_ready_list, &cur->general_tag));list_append(&thread_ready_list, &cur->general_tag);cur->ticks = cur->priority; // 重新将当前线程的ticks再重置为其priority;cur->status = TASK_READY;} else { /* 若此线程需要某事件发生后才能继续上cpu运行,不需要将其加入队列,因为当前线程不在就绪队列中。*/}ASSERT(!list_empty(&thread_ready_list));thread_tag = NULL; // thread_tag清空/* 将thread_ready_list队列中的第一个就绪线程弹出,准备将其调度上cpu. */thread_tag = list_pop(&thread_ready_list); struct task_struct* next = elem2entry(struct task_struct, general_tag, thread_tag);next->status = TASK_RUNNING;switch_to(cur, next); }
首先先判断当前中断状态是不是关闭的,由于是从时钟中断过来的,处于关中断状态
然后获取当前线程的pcb,看是由于时钟到期还是别的原因下处理机,如果是由于时钟的问题就放到就绪队列。
然后从就绪列表中取出来下一个,获取其PCB,然后进行调度,这里用elem2entry获取其PCB,elem2entry声明如下:
#define offset(struct_type,member) (int)(&((struct_type*)0)->member)
#define elem2entry(struct_type, struct_member_name, elem_ptr) \(struct_type*)((int)elem_ptr - offset(struct_type, struct_member_name))
之前我们说过低3位清0获取PCB的方法,这里用的是PCB基地址=真实地址-偏移地址
的方法,因为每一项成员的偏移都是固定的。
switch_to
switch_to函数的声明如下:
[bits 32]
section .text
global switch_to
switch_to:;栈中此处是返回地址 push esi ;这4条就是对应压入线程栈中预留的ABI标准要求保存的,esp会保存在其他地方push edipush ebxpush ebpmov eax, [esp + 20] ; 得到栈中的参数cur, cur = [esp+20]mov [eax], esp ; 保存栈顶指针esp. task_struct的self_kstack字段,; self_kstack在task_struct中的偏移为0,; 所以直接往thread开头处存4字节便可。;------------------ 以上是备份当前线程的环境,下面是恢复下一个线程的环境 ----------------mov eax, [esp + 24] ; 得到栈中的参数next, next = [esp+24]mov esp, [eax] ; pcb的第一个成员是self_kstack成员,用来记录0级栈顶指针,; 用来上cpu时恢复0级栈,0级栈中保存了进程或线程所有信息,包括3级栈指针pop ebppop ebxpop edipop esiret ; 返回到上面switch_to下面的那句注释的返回地址,; 未由中断进入,第一次执行时会返回到kernel_thread
首先push一些寄存器,这是对应ABI标准(这里可以理解为c与汇编的约定)的,这些寄存器的顺序与thread_stack
结构体也是对应的,此时栈中布局如下
11-12行将 cur对应pcb的地址放到eax中,然后把esp放到cur的self_kstack
中,即cur.self_kstack
指向栈顶
16-17行恢复next的栈顶,然后pop寄存器,此时pop的都是next之前保存的,可不要与cur的搞混,因为我们的栈已经换了,然后一步步ret,直到去执行函数
结果
实验现象
main.c如下:
#include "print.h"
#include "init.h"
#include "thread.h"
#include "interrupt.h"void k_thread_a(void*);
void k_thread_b(void*);
int main(void) {put_str("I am kernel\n");init_all();thread_start("k_thread_a", 1, k_thread_a, "argA ");thread_start("k_thread_b", 1, k_thread_b, "argB ");intr_enable(); // 打开中断,使时钟中断起作用while(1) {put_str("Main ");};return 0;
}/* 在线程中运行的函数 */
void k_thread_a(void* arg) {
/* 用void*来通用表示参数,被调用的函数知道自己需要什么类型的参数,自己转换再用 */char* para = arg;while(1) {put_str(para);}
}/* 在线程中运行的函数 */
void k_thread_b(void* arg) {
/* 用void*来通用表示参数,被调用的函数知道自己需要什么类型的参数,自己转换再用 */char* para = arg;while(1) {put_str(para);}
}
我们可以看到交替打印的现象,证明调转没错
需要思考的三个问题
这是我在看b站时一个博主提出来的,他自己也进行了解答,链接:https://www.bilibili.com/video/BV1Pg4y1K7TS?t=5217.1
- 初始化的过程
- main->A的过程,A还没有上过处理机
- B->main的过程,main已经上过处理机
初始化的过程
这里我们只讨论线程初始化,其实还是比较简单的
main->A的过程,A还没有上过处理机
在switch_to函数中,首先是对cur的寄存器进行保存,此时PCB布局如下:
然后对next进行处理,由上节可知,新建完一个线程之后其PCB布局如下
紧接着对其进行pop操作,直到PCB如下布局,此时ret去执行函数
B->main的过程,main已经上过处理机
此时与上一种情况不同的是,main已经上过处理及运行,所以上图执行函数的函数指针
也不复存在,在相应位置应是schedule
函数的ret,然后继续退回到时钟中断函数,继续ret到kernel.s中汇编版本的中断处理函数,如下:
%macro VECTOR 2 ; 声明一个宏, 名字VECTOR 接受两个参数
section .text
intr%1entry: ; 每个中断处理程序都要压入中断向量号,所以一个中断类型一个中断处理程序,自己知道自己的中断向量号是多少,此标号来表示中断处理程序的入口%2 ; 这一步是根据宏传入参数的变化而变化的push ds ; 以下是保存上下文环境push espush fspush gspushad; 如果是从片上进入的中断,除了往从片上发送EOI外,还要往主片上发送EOI mov al, 0x20 ; 中断结束命令EOIout 0xa0, al ; 向从片发送out 0x20, al ; 向主片发送push %1call [idt_table+%1*4]jmp intr_exitsection .data ; 这个段就是存的此中断处理函数的地址dd intr%1entry ; 存储各个中断入口程序的地址,形成intr_entry_table数组,定义的地址是4字节,32位
%endmacro ; 宏结束section .text
global intr_exit
intr_exit: ; 以下是恢复上下文环境add esp, 4 ; 跳过中断号popadpop gspop fspop espop dsadd esp, 4 ;对于会压入错误码的中断会抛弃错误码(这个错误码是执行中断处理函数之前CPU自动压入的),对于不会压入错误码的中断,就会抛弃上面push的0 iretd ; 从中断返回,32位下iret等同指令iretd
会继续pop我们之前保存的寄存器,恢复main之前的堆栈布局,继续执行,如此反复以往