卷2:调试与案例分析
第一章 并发与同步
画了两张简图,方便理解,如下:
针对并发源的问题,我接触的项目中都是SMP系统,目前大部分也都是SMP系统;
对于SMP系统,情况会更复杂。
□ 同一类型的中断处理程序不会并发执行,但是不同类型的中断可能送达不同的CPU, 因此不同类型的中断处理程序可能会并发执行。
□ 同一类型的软中断会在不同的CPU上并发执行。
□ 同一类型的tasklet是串行执行的,不会在多个CPU上并发执行。
□ 不同CPU上的进程上下文会并发执行。
一般实际项目开发中,需要考虑的问题,书中有写到几种场景,也是比较容易忽略的:
- 进程呈下文在操作某个临界区中的资源时发生了中断, 恰巧在对应中断处理程序中也访问
了这个资源。如果不使用内核同步机制来保护,那么可能会发生并发访问的bug。 - 如果进程上下文正在访问和修改临界区中的资源时发生了抢占调度,可能会发生并发访问的bug。
- 如果在自旋锁的临界区中主动睡眠以让出CPU,那这也可能是一个并发访问的bug。
- 如果两个CPU同时修改临界区中的一个资源,那这也可能是一个bug。
- 对临界区数据来说需要考虑从中断处理程序、工作线程(worker)处理程序、tasklet处理程序、软中断处理程序等有没有可能并发访问?
- 若从当前内核代码路径访问该数据时发生被抢占,被调度、执行的进程会不会访问该数据?
- 进程会不会进入睡眠状态以等待该数据?
1.1 原子操作
原子操作是指“原子地"(不间断地)完成’'读-修改-回写”机制,中间不能被打断,保证数据的有效修改;
举个例子:
static int i =0;
//线程A函数void thread A func()
{i++;
}
//线程B函数void thread B func()
{i++;
}
存在数据段中的变量i的结果是多少?有人说是2,有人说不是2;
代码执行过程如下:
从上面的代码执行过程来看,最终结果可能等于1。因为变量i是临界区的一个,CPU0和CPU1
可能同时访问,发生并发访问。
有的读者认为可以使用加锁的方式,如使用自旋锁来保证i++操作的原子性,但是加锁操作会导致比较大的开销,用在这里有些浪费。杀鸡焉用牛刀!
Linux内核提供了 atomic类型的原子变量,atomic_t类型的具体定义为如下。
<include/linux/types.h>typedef struct ( int counter; } atomic_t;
1. 基本原子操作函数
Linux内核提供最基本的原子操作函数包括atomic_read()函数和atomic_set()函数。
<include/asm-generic/atomic.h>#define ATOMIC_INIT (i) //原子变量初始化为 i
#define atomic_read (v) //读取原子变量的值
#define atomic_set (v, i) //设置变量 v 的值为 i
上述两个函数直接调用READ_ONCE()宏或者WRITE_ONCE()宏来实现,不包括“读-修改-回写”机制,直接使用上述函数容易引发并发访问。
2. 不带返回值的原子操作函数
atomic_inc(v):原子地给v 加 1
atomic_dec(v):原子地给 v 减 1
atomic_add(i,v):原子地给 v 加 i
atomic_and(i,v):原子地给v和i做“与”操作。
atomic_or(i,v):原子地给v和i做"或”操作。
atomic_xor(i,v):原子地给v和i做“异或”操作。
在这里我不对所有的api接口展开说,有兴趣可以看下下面路径头文件中的API, 讲述了所有关于原子操作的API接口;
kernel/linux/linux-5.15.73/include/linux/atomic/atomic-instrumented.h
接下来我们试图探一下原理:
我们以atomic_add为例
//kernel/linux/linux-5.15.73/include/linux/atomic/atomic-instrumented.h
atomic_add(int i, atomic_t *v)
{instrument_atomic_read_write(v, sizeof(*v));arch_atomic_add(i, v);
}
kernel/linux/linux-5.15.73/include/asm-generic/atomic.h
#define arch_atomic_add_return generic_atomic_add_return
#define arch_atomic_sub_return generic_atomic_sub_return#define arch_atomic_fetch_add generic_atomic_fetch_add
#define arch_atomic_fetch_sub generic_atomic_fetch_sub
#define arch_atomic_fetch_and generic_atomic_fetch_and
#define arch_atomic_fetch_or generic_atomic_fetch_or
#define arch_atomic_fetch_xor generic_atomic_fetch_xor#define arch_atomic_add generic_atomic_add
#define arch_atomic_sub generic_atomic_sub
#define arch_atomic_and generic_atomic_and
#define arch_atomic_or generic_atomic_or
#define arch_atomic_xor generic_atomic_xor
kernel/linux/linux-5.15.73/include/asm-generic/atomic.h
#ifdef CONFIG_SMP //多核
#define ATOMIC_OP(op, c_op) \
static inline void generic_atomic_##op(int i, atomic_t *v) \
{ \int c, old; \\c = v->counter; \while ((old = arch_cmpxchg(&v->counter, c, c c_op i)) != c) \c = old; \
}
#else
#define ATOMIC_OP(op, c_op) \
static inline void generic_atomic_##op(int i, atomic_t *v) \
{ \unsigned long flags; \\raw_local_irq_save(flags); \v->counter = v->counter c_op i; \raw_local_irq_restore(flags); \
}
#endif
看到如果是多核,最终会调到arch_cmpxchg函数,如果是单核,就调用raw_local_irq_save,保存并禁用本地中断,防止抢占;
继续分析多核调用;
//kernel/linux/linux-5.15.73/arch/arm64/include/asm/cmpxchg.h #define arch_cmpxchg(...) __cmpxchg_wrapper( _mb, __VA_ARGS__) //展开__cmpxchg_wrapper( _mb, &v->counter, c, c c_op i)
#define __cmpxchg_wrapper(sfx, ptr, o, n) \
({ \__typeof__(*(ptr)) __ret; \__ret = (__typeof__(*(ptr))) \__cmpxchg##sfx((ptr), (unsigned long)(o), \(unsigned long)(n), sizeof(*(ptr))); \__ret; \
})
// 展开__cmpxchg_mb(&v->counter, c, c c_op i, sizeof(&v->counter)), 64位系统sizeof(&v->counter)=8#define __CMPXCHG_GEN(sfx) \
static __always_inline unsigned long __cmpxchg##sfx(volatile void *ptr, \unsigned long old, \unsigned long new, \int size) \
{ \switch (size) { \case 1: \return __cmpxchg_case##sfx##_8(ptr, old, new); \case 2: \return __cmpxchg_case##sfx##_16(ptr, old, new); \case 4: \return __cmpxchg_case##sfx##_32(ptr, old, new); \case 8: \return __cmpxchg_case##sfx##_64(ptr, old, new); \default: \BUILD_BUG(); \} \\unreachable(); \
}//展开__cmpxchg_case_mb_64(&v->counter, c, c c_op i)#define __CMPXCHG_CASE(name, sz) \
static inline u##sz __cmpxchg_case_##name##sz(volatile void *ptr, \u##sz old, \u##sz new) \
{ \return __lse_ll_sc_body(_cmpxchg_case_##name##sz, \ptr, old, new); \
}//展开 __lse_ll_sc_body(__cmpxchg_case_mb_64, &v->counter, c, c c_op i)
根据上述逐级展开,可以追寻到下面定义,坚持下快找到真相了。
kernel/linux/linux-5.15.73/arch/arm64/include/asm/lse.h
#ifdef CONFIG_ARM64_LSE_ATOMICS...#define __lse_ll_sc_body(op, ...) \
({ \system_uses_lse_atomics() ? \__lse_##op(__VA_ARGS__) : \__ll_sc_##op(__VA_ARGS__); \
})...#else /* CONFIG_ARM64_LSE_ATOMICS */#define __lse_ll_sc_body(op, ...) __ll_sc_##op(__VA_ARGS__)...
#endif /* CONFIG_ARM64_LSE_ATOMICS */
可以看到如果定义CONFIG_ARM64_LSE_ATOMICS这个宏,走LSE(是否支持大系统扩展),若定义了这个宏走下面LSE流程实现
kernel/linux/linux-5.15.73/arch/arm64/include/asm/atomic_lse.h
#define __CMPXCHG_CASE(w, sfx, name, sz, mb, cl...) \
static __always_inline u##sz \
__lse__cmpxchg_case_##name##sz(volatile void *ptr, \u##sz old, \u##sz new) \
{ \register unsigned long x0 asm ("x0") = (unsigned long)ptr; \register u##sz x1 asm ("x1") = old; \register u##sz x2 asm ("x2") = new; \unsigned long tmp; \\asm volatile( \__LSE_PREAMBLE \" mov %" #w "[tmp], %" #w "[old]\n" \" cas" #mb #sfx "\t%" #w "[tmp], %" #w "[new], %[v]\n" \" mov %" #w "[ret], %" #w "[tmp]" \: [ret] "+r" (x0), [v] "+Q" (*(unsigned long *)ptr), \[tmp] "=&r" (tmp) \: [old] "r" (x1), [new] "r" (x2) \: cl); \\return x0; \
}
w:表示位宽,支持8位、16位、32位以及64位。
sfx: cas指令的位宽后缀,8位宽使用b后缀,16位宽使用h后缀。
name: 表示内存屏障类型,如“acq_”表示支持加载-获取内存屏障原语,“rel_”表示支持存储-释放内存屏障原语,“mb_”表示同时支持加载-获取和存储-释放内存屏障原语。
sz:位宽大小。
mb:组成cas指令的内存屏障后缀,"a"表示加载-获取内存屏障原语,“l” 表示存储释放内存屏障原语,“al"表示同时支持加载-获取和存储-释放内存屏障原语。
c1:内嵌汇编的损坏部。
从上述内嵌汇编可以看到这段内嵌汇编实现了一个包含原子比较并交换(CAS)指令的操作(比较晦涩难懂),看个大概吧;
若没有定义这个宏走下面流程实现
//kernel/linux/linux-5.15.73/arch/arm64/include/asm/atomic_ll_sc.h #define __CMPXCHG_CASE(w, sfx, name, sz, mb, acq, rel, cl, constraint) \
static inline u##sz \
__ll_sc__cmpxchg_case_##name##sz(volatile void *ptr, \unsigned long old, \u##sz new) \
{ \unsigned long tmp; \u##sz oldval; \\/* \* Sub-word sizes require explicit casting so that the compare \* part of the cmpxchg doesn't end up interpreting non-zero \* upper bits of the register containing "old". \*/ \if (sz < 32) \old = (u##sz)old; \\asm volatile( \__LL_SC_FALLBACK( \" prfm pstl1strm, %[v]\n" \"1: ld" #acq "xr" #sfx "\t%" #w "[oldval], %[v]\n" \" eor %" #w "[tmp], %" #w "[oldval], %" #w "[old]\n" \" cbnz %" #w "[tmp], 2f\n" \" st" #rel "xr" #sfx "\t%w[tmp], %" #w "[new], %[v]\n" \" cbnz %w[tmp], 1b\n" \" " #mb "\n" \"2:") \: [tmp] "=&r" (tmp), [oldval] "=&r" (oldval), \[v] "+Q" (*(u##sz *)ptr) \: [old] __stringify(constraint) "r" (old), [new] "r" (new) \: cl); \\return oldval; \
}
从上述内嵌汇编中可以看到使用Idxr和stxr 独占访问指令的组合;
====================================================================================================
接下来我们简单学习下CAS指令和LDXR/STXR独占访问指令
CAS指令:
ARM64处理器提供了比较并交换指令— cas指令。cas指令根据不同的内存屏障属性分成4类:
□ 隐含了加载-获取内存屏障原语。
□ 隐含了存储-释放内存屏障原语。
□ 同时隐含了加载-获取和存储-释放内存屏障原语。
□ 不隐含内存屏障原语。
LDXR/STXR独占访问指令:
ARMv8体系结构都提供了独占访问的的指令,在A64指令集中,LDXR指令尝试在内存总线中申请一个独占访问的锁,然后访问一个内存地址。STXR指令会往刚才LDXR指令已经申请独占访问的内存地址中写一个新内容,通常组合使用LDXR和STXR指令来完成一些同步操作;
它们的操作格式如下:
LDXR 指令:
LDXR <Wt>, [<Xn|SP>{, #<imm>}]<Wt>:目标寄存器,用于存储从内存中加载的值。
[<Xn|SP>{, #<imm>}]:源内存地址,可以通过基址寄存器 <Xn> 或栈指针寄存器 SP 加上可选的偏移量 <imm> 来指定。
STXR 指令:
STXR <Ws>, <Wt>, [<Xn|SP>{, #<imm>}]<Ws>:用于指示是否成功执行存储操作的结果寄存器。
<Wt>:要存储到内存中的值的寄存器。
[<Xn|SP>{, #<imm>}]:目标内存地址,可以通过基址寄存器 <Xn> 或栈指针寄存器 SP 加上可选的偏移量 <imm> 来指定。
在这两个指令中,Xn 和 SP 分别代表基址寄存器和栈指针寄存器,imm 为可选的偏移量。这些指令允许对内存进行原子操作,并且通常与并发控制相关的算法一起使用,以确保对共享内存的原子性访问。
还有多字节(16字节)独占访问的扩展指令LDXP和STXP,有兴趣可以自行学习;