Chap1,2 Hello Concurrency与线程管理
参考书目:《C++ Concurrency in action》- By Anthony Willians, Second Edition
读书笔记
Contents
- 定义并发,多线程
- 使用并发,多线程
- 简单的C++多线程
1.1 什么是并发
1.1.1 计算机并发
- 多核心:真正并行
- 单核心:时间片切换。
1.1.2 并发方式
- 多进程并发
应用程序分成多个独立的进程同时进行。通过进程间通信渠道传递讯息(信号,套接字,文件,管道)。
缺点:1. 通信复杂速度缓慢,操作系统进行进程保护。2. 多个进程固定开销,操作系统需要资源。
- 多线程并发
单进程中有运行多个线程,共享地址空间,访问大部分数据。数据可以在线程中相互传递。
多线程需要保证每个线程访问得到数据相同一致。线程通信需要管控。
1.1.3 并发与并行
- 硬件提高数据处理:并行性。
- 任务分离和响应:并发性。
1.2 使用并发原因
-
分离关注点:将相关代码和无关代码分离。
-
提升性能。
-
两种并发:
-
单一任务分成多部分运行,降低运行总时间(任务并行)
-
每个线程在不同数据块上执行相同操作(数据并行)
-
不适应并发:收益>成本。线程资源有限,会使得操作系统变慢。需要线程池优化,采用硬件并发。
1.3 历史
jump
1.4 入门
Code 1.1
#include<iostream>
#include<thread> // 头文件的多线程
void hello()
{// 打印函数与主函数分离。std::cout << "Hello concurrent\n";
}
int main()
{std::thread t(hello); // 创建线程t.join(); // 等待std::thread创建线程。
}
Contents for Chap2
- 启动新线程
- 等待,分离
- 唯一标识符
2.1 基本操作
2.1.1 启动线程
-
构造
std::thread
对象启动线程。可以通过有函数操作符的实例进行构造。 -
传入线程构造函数时需要注意语法解析问题。
-
Most vexing problem
-
考虑以下函数:
struct Timer{};struct TimeKeeper
{explicit TimeKeeper(Timer t);int get_time();
}int main()
{TimeKeeper time_keeper((Timer())); // ?return time_keeper.get_time();
}
?处具有歧义,可以被解释为:
- 初始化time_keeper成为TimeKeeper类的一个变量,采用Timer类的匿名实例初始化。
- 声明:声明函数time_keeper,这个函数返回一个TimerKeeper,参数类型是一个(指向)返回timer对象的函数(或指针)。
C采用解释1,但是C++采用解释2.
解决方法:采用大括号初始化。
TimeKeeper time_keeper{Timer{}}
// 初始化匿名实例Timer后,用其来初始化TimeKeeper类变量time_keeper.TimeKeeper time_keeper((Timer()));
TimeKeeper time_keeper = TimeKeeper(Timer());
因此创建函数对象线程时采用大括号或多组括号避免问题。
std::thread my_thread((f()));
std::thread my_thread{f()};
// 或者采用lambda表达式
std::thread my_thread([]{do_a();do_b();
})
- 注意线程生命周期。在函数退出时,线程函数仍然持有函数局部的变量/指针的引用。
#include<iostream>
#include<thread>
void do_something(int i)
{printf("%d\n", i);
}
struct func
{int &i;func(int& i_):i(i_){}void operator()()//函数操作符重载,对象在调用时执行。不需要传入参数。{for (unsigned j=0; j<10000; j++)do_something(i);}
};
int main()
{int some_local = 4999;// 将some_local作为变量引用初始化结构体func的成员func my_func(some_local);std::thread my_thread(my_func);my_thread.detach(); // 不等待进程结束
}
这样会导致没有任何输出,因为线程访问了被销毁的变量。改成my_thread.join()
即可。
2.1.2 等待线程完成
join()
函数等待线程完成。一旦线程汇入,则不能再次汇入了。使用joinable()
返回false。而且还有其他的等待方式,会在后面引入。join()
函数执行后,当前线程的存储将被清空。
2.1.3 特殊情况下的等待
- 需要保证在使用
join()
或detach()
前,进程还没有被销毁。 - 分离线程一般位于线程开始后,因此一般不会有异常出现。但是等待线程执行则需要注意
join()
函数的调用位置。 - 当在调用join()之前进程已经抛出异常终止时,程序会因为没有进程(进程生命周期已结束)而终止。
例如前面的线程涉及到了对局部变量的访问。如果在进程抛出异常结束前线程没有结束,则会因为主进程结束而被迫结束。
#include<iostream>
#include<thread>
#include<stdexcept>
struct func
{int &i;func(int &i_): i(i_){}void operator()(){for(;i<100;i++){std::cout << i << ' ';}}
};
int main()
{int i=1;func my_func(i);std::thread t(my_func);throw std::invalid_argument("Error!\n");t.join();return 0;
}
这会导致线程没有办法完成它的任务。
加上try_catch块后
#include<iostream>
#include<thread>
#include<stdexcept>
struct func
{int &i;func(int &i_): i(i_){}void operator()(){for(;i<100;i++){std::cout << i << ' ';}std::cout << '\n';}
};
int main()
{int i=1;func my_func(i);std::thread t(my_func);try{throw std::invalid_argument("Error!\n");}catch(...){t.join();std::cout << "Waiting after the thread over\n";return -1;}t.join();return 0;
}
正常执行完线程才销毁进程。
但是try_catch过于冗杂和繁琐,并且我们很难判断哪里容易出现错误。我们还可以用RAII方式来在析构函数中来等待线程结束。
- RAII: Resource Acquisition Is Initialization. 在对象的生命周期内分配和释放(内存)资源。
这样构造一个安全线程类来管理线程的创造和析构。采用exlplicit来避免隐式转换出现。
class thread_guard
{private:std::thread & t;public:explicit thread_guard(std::thread& t_): t(t_){}~thread_guard(){if(t.joinable())t.join();}// 禁止复制构造函数。thread_guard(thread_guard const &) = delete;thread_guard & operator=(thread_guard const &) = delete;
};
struct func;
int main()
{int some_local_state=0;func my_func(some_local_state);std::thread t(my_func);thread_guard g(t);for(int i=0;i < 20; i++){std::cout << i << ' ';}
}
这样在main函数执行结束之后,析构函数自下而上调用,先进行析构的应该是thread_guard类的对象g。在进行析构时,需要等待g的成员t线程执行结束才进行析构。同样在main抛出异常时,也会等待析构函数运行后才销毁对象g。
在thread_guard中,先判断线程是否可以被汇入,再进行汇入,因为线程只能汇入一次。
复制构造函数和复制分配运算符=被标记成delete
防止编译器自动调用他们。复制或者分配这样的一个对象很危险,因为他们可能会存留在正在汇入的线程的生命周期之外。
不需要等待线程结束时调用detach()
即可。
2.1.4 在背后运行线程
对一个std::thread对象调用detach()
会使得他们在背后运行,没有直接的方法与之沟通。分离后的线程会在背后真正运行,所有权和控制权移交给C++运行时库,保证资源与线程的关联。
分离后的线程一般称为daemon threads(守护线程),运行在幕后执行长生命周期的任务例如监视屏幕,监视文件系统等等。另一个方面守护进程还可以监视进程是否完成。
分离后的进程无法汇入,因为他们已经不再与std::thread类的对象相互关联。同样也不能再被分离。
当然,需要注意的是,进程结束后,其唤起的线程也会结束。
2.2 向线程传递参数
假设我们调用一个线程回显我们的输入和输入的长度,我们可以有如下的程序:
void f(size_t length, ssize_t buffer_len,std::string const & s)
{std::cout << "The length of buffer is " << length << std::endl;std::cout << "Allocated buffer size is " << buffer_len << std::endl;std::cout << s << std::endl;
}
void foo()
{char *buffer;size_t len=0;ssize_t length;length=getline(&buffer, &len, stdin);std::thread t(f,length,len,buffer); // std::string(buffer)显式构造buffer。t.detach();
}
但是这个程序可能会存在传入空指针的问题,因为有可能转换还没有完成就分离了。因此我们需要显式构造string,避免悬垂指针影响。
但是反过来,传递一个非常量的引用对象给thread是不被允许的。这是因为std::thread构造函数不知道这个参数需要传入引用。构造函数只会盲目地复制提供的值而忽视掉函数的真正需要的参数类型,以右值的方式传递给函数。函数接收到了右值类型的变量,但他需要一个左值。因此导致错误。
这时候我们只需要调用std::ref()
将其包裹起来。
void f(size_t length, ssize_t buffer_len,std::string & s);
void foo()
{std::string buffer = "This is buffer!\n";size_t length = buffer.length();ssize_t len = 120;std::thread t(f,length,len,std::ref(buffer));
}
这样传递的就是一个切实的引用而非一个临时值。
在新线程中调用类里的成员函数,则需要
- 将成员函数在类内的地址传入第一个参数。
- 将类的一个对象作为地址传入thread构造函数的第二个参数。
- 剩下的将成为成员函数的参数。
例如:
#include<iostream>
#include<thread>
#include<functional>
class A
{public:void print_the_buffer(std::string & s){std::cout << s << '\n';}
};int main()
{A my_a;std::string s = "Hello Concurrency!";std::thread t(&A::print_the_buffer, &my_a, std::ref(s));//等同于 1auto newCallable = std::bind(&A::print_the_buffer, my_a, std::string("Hello Concurrency!"));std::thread t(newCallable);//等同于 2auto newCallable = std::bind(&A::print_the_buffer, my_a, s);std::thread t(newCallable);//等同于 3auto newCallable = std::bind(&A::print_the_buffer, my_a, std::placeholders::_1);std::thread t(newCallable, std::ref(s));t.join();return 0;
}
也就是参数绑定的产物传入std::thread的构造函数中。在std::bind()
中,首先传入指向成员函数的指针,接着传入对象调用该函数(也就是成为成员函数的*this),最后传入s参数。
另一个有意思的场景是:参数不可以被复制,仅可以被移动。存储在一个对象中的数据被传输给另一个另一个,使得原来拥有的对象成为了空的对象。例如std::unique_ptr
,提供给一个给定的对象自动化的内存管理机制。
只有一个std::unique_ptr
可以在同一时间内指向一个给定的对象。当实例被销毁时,指向的对象被删除。移动构造函数和移动复制函数允许一个对象的所有权在std::unique_ptr
的实例中传输。这样的传输会导致源对象成为一个空的指针。这样对数值的移动允许这样类型的对象成为函数参数或者被其他函数返回。当源对象是暂时变量时,移动会自动完成。但是当对象是一个已经命名的变量时,需要显式调用std::move()
。接下来的实例给予了我们将一个动态对象的所有权移交到线程中:
#include<iostream>
#include<thread>
#include<functional>
class big_object
{public:int a;std::string b;explicit big_object(int a_, std::string b_):a(a_),b(b_){}big_object(big_object &) = delete;big_object(const big_object &) = delete;
};
void print_big_project(std::unique_ptr<big_object> a)
{std::cout << "The value of the object: " << a -> a << std::endl;std::cout << "The name of the project: " << a -> b << std::endl;
}
int main()
{std::unique_ptr<big_object> p(new big_object(5,"Hello world!"));std::thread t(print_big_project, std::move(p)); // 显式移动已命名的变量。return 0;
}
所有权从:指针p指向->t线程内部存储->函数print_big_project。
我们可以看一下指针的情况。
std::unique_ptr<big_object> p(new big_object(5,"Hello world!"));std::cout << &*p << std::endl;std::thread t(print_big_project, std::move(p));if(t.joinable())t.join();std::cout << &*p << std::endl;return 0;
指针指向的地方已经成为空,在线程执行后。
其实线程和std::unique_ptr
具有相类似的所有权情况。尽管线程不像std::unique_ptr
一样具有对一个动态对象的所有权,但是他们确实拥有资源:每个实例都对管理线程的执行有责任。这样的归属权可以在实例之间进行传递,因为std::thread
确实也是可移动但不可复制的。这样确保了一个对象只能与一个特定的线程在同一时间内有关。
2.3 向线程传递所有权
- 一个方程创建一个新线程在幕后运行,传递这个新线程给调用函数而不是等待其结束。
- 创建一个线程并且传递其所有权给其他的一些需要等待其运行结束的函数
这些都涉及到了线程的传递。正如上所说,线程是可移动但不可复制的,允许了对于特定线程所有权的传递。如下例子所示:
int main()
{void (big_object::*a)() = &big_object::print_the_obj; // 定义指向类方法的函数指针。big_object my_object(5,"Hello");auto caller1 = std::bind(a, &my_object);big_object other_object(7,"hi");auto caller2 = std::bind(a, &other_object);std::thread t1(caller1);std::thread t2=std::move(t1); // 移交t1到t2;t1 = std::thread(caller2);std::thread t3=std::move(t2); // 程序崩溃(aborted)t1 = std::move(t3);
}
首先,新的线程创立,与t1关联。随后,将线程从t1移交给t2,随后,可以重新创建并且关联线程给t1,然后将t2线程所有权转交给t3。但是因为t1已经关联线程,所以最后一步会导致程序异常退出。
你不能暴力地扔掉一个线程,仅仅只是通过给他一个新的线程。
同样,线程也可以传入传出一个函数。
线程的可移动性可以使得先前我们的thread_guard类获得线程的所有权,防止任何这个类的对象生活周期长于其所关联的线程,保证线程在范围内完成。
#include<iostream>
#include<thread>
#include<functional>class scoped_thread
{std::thread t;public:// 线程采用移动语义。explicit scoped_thread(std::thread t_): // 移动构造函数t(std::move(t_)){if(!t.joinable())throw std::logic_error("No thread");}~scoped_thread(){t.join();}scoped_thread(scoped_thread const&) = delete; // 不允许常量复制函数。scoped_thread& operator=(scoped_thread const&) = delete; // 不允许右值引用构造。
};struct func
{int &i;func(int &i_): i(i_){}void operator()(){for(;i<100;i++){std::cout << i << ' ';}std::cout << '\n';}
};int main()
{int some_local_state = 5;scoped_thread t{std::thread{func{some_local_state}}};for(int i = 0; i < 30; i++)std::cout << "This main process: " << i << "\n";return 0;
}
这样,创建进程后线程就牢牢控制在scope_thread类中。遵循RAII原则。并且在构造函数中就进行了判断线程是否可以汇入。
C++17中的建议:创造一个joining_thread类型,如同scoped_thread那样在析构函数中自动汇入,其余和std::thread()一样。目前还没有形成共识。以下是一个可能的实现。
class joining_thread
{std::thread t;
public:// 允许你为默认构造函数、拷贝构造函数、移动构造函数、拷贝赋值运算符和移动赋值运算符指定异常规格,而不需要显式地定义这些函数。joining_thread() noexcept=default;template<typename Callable,typename ... Args> // 接受任何类型课调用变量以及任意数量的参数。explicit joining_thread(Callable&& func,Args&& ... args): // 接收右值引用(即将销毁的对象)和左值引用(存在名字的变量),并且实现完美转发(保持原有值的类别)t(std::forward<Callable>(func),std::forward<Args>(args)...){}explicit joining_thread(std::thread t_) noexcept:t(std::move(t_)){}joining_thread(joining_thread&& other) noexcept:t(std::move(other.t)){}joining_thread& operator=(joining_thread&& other) noexcept{if(joinable())join();t=std::move(other.t);return *this;}joining_thread& operator=(std::thread other) noexcept{if(joinable())join();t=std::move(other);return *this;}~joining_thread() noexcept{if(joinable())join();}void swap(joining_thread& other) noexcept{t.swap(other.t);}std::thread::id get_id() const noexcept{return t.get_id();}bool joinable() const noexcept{return t.joinable();}void join() {t.join(); }void detach(){t.detach();}std::thread& as_thread() noexcept{return t; }const std::thread& as_thread() const noexcept{return t; }
};
同样可以用std::vector<std::thread>
这样类型的允许移动的容器来生成一系列的线程。这样可以对线程进行自动化管理,而不是分开地创建各个变量再手动进行汇入。
2.4 在运行时选择线程的数量
采用std::thread::hardware_concurrency()
可以返回当前最多线程的数量。
以下是一个展示并行版本的std::accumulate
,将任务划分给多个线程,每个线程具有最少数量的元素防止线程过多。假设了操作不会抛出错误。
#include<iostream>
#include<numeric>
#include<thread>
#include<vector>
#include<chrono>#define ull unsigned long long
#define ul unsigned longtemplate<class Iterator, class T>
struct accumulate_block
{void operator()(Iterator begin, Iterator end, T& res){res = std::accumulate(begin, end,res);}
};template<class Iterator, class T>
T parallel_accumulate(Iterator begin, Iterator end, T init)
{ul const length = std::distance(begin,end);if(!length)return init;ul const min_per_thread=25;ul const max_threads = (length + min_per_thread-1) / min_per_thread;ul const hardware_requires = std::thread::hardware_concurrency();ul const num_threads = std::min(hardware_requires != 0 ? hardware_requires : 2, max_threads);ul const block_size = length / num_threads;std::vector<T> res(num_threads);std::vector<std::thread> threads(num_threads - 1);Iterator block_start = begin;for(ul i=0;i<(num_threads-1);i++){Iterator block_end = block_start;std::advance(block_end, block_size); // 迭代器向前/向后移动指定步数,// 分段计算部分和。比如:1 2 3 4 // 5 6 7 8 // 9 10 11 12threads[i] = std::thread(accumulate_block<Iterator, T>(),block_start, block_end, std::ref(res[i]));block_start = block_end;}accumulate_block<Iterator, T>()(block_start, end, res[num_threads-1]);for(auto &entry: threads)entry.join();return std::accumulate(res.begin(),res.end(),init);
}int main()
{std::vector<int> numbers(500000000);for (int i = 0; i < numbers.size(); i++)numbers[i]=i;int res = 0;auto start = std::chrono::high_resolution_clock::now();std::cout << parallel_accumulate<std::vector<int>::iterator, int>(numbers.begin(),numbers.end(),0) << std::endl;auto end = std::chrono::high_resolution_clock::now();std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms" << std::endl;auto start2 = std::chrono::high_resolution_clock::now();std::cout << std::accumulate(numbers.begin(),numbers.end(),res) << std::endl;auto end2 = std::chrono::high_resolution_clock::now();std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(end2 - start2).count() << "ms" << std::endl;}
具有接近一倍的加速比。
以上的函数尽管很长但是十分直接。
- 迭代器的范围为空时,直接返回初始值。
- 否则,寻找到最大的线程数,并将元素均等地分给每个线程进行处理。
- 最后函数处理剩余的元素和将多个进程的结果进行汇总。
线程的数量是计算的最大线程需要量与自身机器允许线程量之间的最小值。我们不想让每个线程都很清闲,也不想让单个线程很忙,更不想超过机器自己所拥有的最大线程数量。因此我们进行了权衡。让每个线程都有一个最少的工作量。这样对于大量的数据运算可以减少许多时间。我们的上例就接近减少了一倍。
这样,当我们了解到需要多少线程之后,我们创造了一个存储各个线程运算结果的中间vector,以及一个线程容器std::vector<std::thread>
用于线程。创建线程数量要注意-1,因为我们的主程序也算一个线程。
需要指出的是,对于迭代器的要求需要稍微严格。因为迭代器至少是前向迭代器,尽管std::accumulate
可以处理单次输入迭代器。并且,T必须是具有默认构造函数的类型。这样我们才可以构造结果向量。要注意并行算法必须不能互相干扰。再加上我们的结果也是无法直接从线程中返回的,因此我们需要传入对于结果的引用。相关的其他细节会在后面进行讨论。
这样,在上例中我们为线程提供了所有必要的信息,包括需要进行操作的数据,存储结果的位置。但是,如果需要标识符的函数在调用栈中位于较深的层次,并且可能从任何线程被调用,那么以这种方式进行就不方便了。这时候我们就需要获得能够识别线程的能力。万幸,C++提供了这样的库。
2.5 分辨线程
- 通过对象调用
get_id()
成员函数。没有与线程关联时,会返回一个默认构造的std::thread::id
类对象,代表没有任何线程。 std::thread::id
对象可以自由复制和比较。除了标识意外应该也没有什么用了www。相等时要么两个相关联的线程相同,要么都没有关联线程。std::thread::id
提供了完整的偏序比较关系,这样可以用于hash集合的键值。可以用于任何有序或无序的键值集合容器中。
例如以下的函数可以用于执行主线程和其他线程:
std::thread::id master_thread;
void foo()
{if(std::this_thread::get_id()==master_thread)do_master_work();elsedo_other_work();
}
同样,线程的标识符可以用于关联数据到特定的线程上,以及通过容器来控制线程存储数据或传递信息。
接下来新的一章将会介绍书中关于数据共享的部分。