C++ 11 中新增了支持线程(thread )、互斥(mutual exclusion)、条件变量(condition variables)和 std::future(期货)。
本节是 C++ 11 并发的第一节,主要介绍多线程运行时的基本高级接口——std::async() 和 std::future 以及底层接口 std::thread 和 std::promise。
常见的并行编程有多重模型,比如共享内存,多线程,消息传递等。从实用性上讲,多线程模型往往具有较大的优势。多线程模型允许同一时间有多个处理器单元执行统一进程中的代码部分,而通过分离的栈空间和共享的数据区及堆空间,线程可以拥有独立的执行状态,还可以进行快速的数据共享。
C++ 11 之前多线程编程都需要系统的支持,在不同的系统下创建线程需要不同的 API 如 pthread_create(),Createthread(),beginthread() 等,使用起来都比较复杂。而 C++11 提供了新头文件 <thread>、<mutex>、<atomic>、<future> 等用于支持多线程。
在 C++ 11 中,标准的一个相当大的变化就是引入了多线程的支持。这使得 C++ 在进行线程编程的时候,不必依赖第三方库和标准。
std::async() 函数是更高层次上的多线程操作(相对于 std::thread),使我们不用关注线程创建内部细节,就能方便的获取异步执行状态和结果,还可以指定线程创建策略。新标准鼓励用 std::async() 替代线程的创建,让它作为异步操作的首选。
通常来说,对初学者最友好的都是那些封装得最好、最高级的接口。而 std::async() 和 std::future 构成的多线程高级接口就是入门的最好开始,官方文档请见 std::future 和 std::async()。
如果需要计算两个返回值操作数的和,寻常做法如下所示。但是无论调用顺序是什么(调用次序无法确定),需要的时间是调用 func1() 和 func2() 的时间再加上求和的时间。
func1() + func2()
但是,有了 std::async() 就可以做得更好。
int DoSomething(int i) { ... }
int func1(int i) { return DoSomething(i + 1); }
int func2(int j) { return DoSomething(j + 2); }
int main() {
std::future<int> result1 = std::async(func1, 1); // 异步调用 func1
int result2 = func2(2); // 因为后面马上要获取结果,func2() 调用一个新线程没有太大意义
int result = result1.get() + result2; // get() 函数获取 std::future 的返回值
}
在这里,std::async() 尝试将所获得的函数立刻异步启动与另一个线程中。需要注意的是,不论原函数是否具有返回值,也会返回一个 std::future 对象。
接下来是处理总和,就需要调用 get() 成员函数来获得 result1 的返回值。随着调用 get() 函数,以下三件事之一会发生:
以上三种情况,特别是第三种,保证了在单线程环境或者已经无法再开辟新线程的时候可以有效运行。如果当前有一个线程处于可用状态,那么它启动新线程;否则,这一个调用会延迟到明确需要结果(调用 get())或者明确要求线程运行(调用 wait())。
事实上,std::async() 启动只有两个可能:调用时立即建立新线程,否则直到明确表示需要结果才会运行。不会出现发现线程池有空缺了立即补上的情况。
为了获得最佳效果,尽量在调用 std::async() 和调用 get() 之间的距离最大化。如果主程序后面完全没有调用 get() 或者 wait(),那么线程也可能直接永远不启动。
可以使用 std::launch::async 枚举强迫 std::async() 绝不拖延开辟新线程:
std::future<int> result1 = std::async(std::launch::async, func1, 1);
如果异步调用在此处无法实现(单线程环境或者开辟线程失败),就会抛出 std::system_error 异常。而在声明这个 launch 策略后,后面不调用 get() 或者 wait(),程序也一定以执行完成再返回,即使主程序已经运行完毕,也会等待这个线程结束才会关闭应用程序;另外,如果这里没有声明 std::future
也可以使用 std::lauch::deffered 来声明线程延缓执行。
std::future<int> result1 = std::async(std::launch::deffered, func1, 1);
这样,程序绝不会在调用 get() 或者 wait() 之前运行,其实 std::lauch::deffered 与串行调用没有不同,在很多时候可以节约不必要的浪费。如下所示,就不需要两个值都计算一遍。
auto f1 = std::async(std::launch::deffered, task1);
auto f2 = std::async(std::launch::deffered, task2);
...
auto val = RunF1() ? f1.get() : f2.get();
除了调用 get() 或者 wait() 可以马上获得结果,有时需要一种东西可以帮助鉴别线程任务是否完成,如果完成,则可以获取结果,如果没有完成,主线程就继续执行其他任务或者休眠让出 CPU。有两个函数可以给线程运行设定一个时间然后查询是否运行完成,但这都是阻塞主线程换来的。
int main()
{
std::future<int> future = std::async(std::launch::async, []() {
std::this_thread::sleep_for(std::chrono::seconds(3));
return 8;
});
std::cout << "waiting...\n";
std::future_status status;
do {
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
}
else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
}
else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}
} while (status != std::future_status::ready);
std::cout << "result is " << future.get() << '\n';
}
// 输出为
//waiting...
//timeout
//timeout
//ready!
//result is 8
在上面提到的,std::future 提供了得到未来结果的能力,但是,std::future 的结果只能处理一次。第二次调用 get() 会导致未定义行为。
有的时候,需要多次处理未来的结果。所以,C++ 提供了 std::shared_future,从而可以多次调用 get(),多次得到同样的结果或者同样的异常。比如下面的粒子:
int GetNumber() {
int i = 0;
std::cin >> i;
if (!std::cin)
throw std::runtime_error("no number read");
return i;
}
void DoSomething(char c, std::shared_future<int> f) {
try {
int num = f.get();
for (int i = 0; i < num; ++i) {
// 每 100 毫秒输出一个字符,输出 num 次
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout.put(c).flush();
}
}
catch (const std::exception &e) {
std::cerr << std::this_thread::get_id() << ": " << e.what() << std::endl;
}
}
int main() {
try {
std::shared_future<int> f = std::async(GetNumber);
// 由于我们构造 f 的时候并不知道开启下列线程的值是什么,所以可以构造成 std::shared_future 分发给不同的线程。
auto f1 = std::async(std::launch::async, DoSomething, '.', f);
auto f2 = std::async(std::launch::async, DoSomething, '+', f);
auto f3 = std::async(std::launch::async, DoSomething, '*', f);
f1.get();
f2.get();
f3.get();
}
catch (const std::exception &e) {
std::cout << e.what() << std::endl;
}
std::cout << std::endl;
}
在上面如果输入 5,可能输出:
除了高级接口 async() 和 (shared)future,C++ 标准库还提供了一个启动及处理线程的底层接口。
构造一个 std::thread 对象可以传入任何 callable object(function、member function、function object、lambda),还可以夹带任何可能的实参。这是相对 std::async() 更底层的接口,相比高级接口:
下面是一段示例代码:
void DoSomething(int num, char c) {
for(int i = 0; i < num; ++i) {
std::this_thread::sleep_for(chrono::milliseconds(100));
std::cout.put(c).flush();
}
}
int main() {
std::thread t1(DoSomething, 5, ',');
for(int i = 0; i < 5; ++i) {
thread t(DoSomething, 100, 'a' + i);
t.detach();
}
cin.get();
t1.join();
}
上面的代码中,第一个线程输出 5 个逗号,然后构建 5 个线程,马上 detach() 运行与后台。等待控制台输入一个字符,就马上退出程序,预计中后台程序会马上停止输出。不计算输入的控制字符,得到的字符串为:
而如果将 t.detach() 注释掉,由于 thread object 还没有结束就被析构掉了,线程会主动调用 std::terminate() 结束整个应用程序。注意,线程 detach() 是危险的举动,尽可能自己保存 thread object 达到控制的目的。
如果要在线程之间传递参数和处理异常,就必须要用到 std::promise。其实 std::async() 就是使用的 std::promise 来返回 std::future 对象。std::promise 和 std::future 对象一样都可以暂时持有一个对象。但是 std::future 是用 get() 来取回对象,std::promise 是用 set_…() 来设置对象。如下所示:
void DoSomething(std::promise<std::string> &p) {
try {
char c;
std::cin >> c;
if (c == 'x')
throw std::runtime_error(std::string("char ") + c);
std::string s = std::string("char ") + c;
p.set_value(std::move(s));
}
catch (...) {
p.set_exception(std::current_exception());
}
}
int main() {
try {
std::promise<std::string> p;
std::thread t(DoSomething, p);
t.detach();
std::future<std::string> f(p.get_future());
std::cout << "result " << f.get() << std::endl;
}
catch (const std::exception& e) {
std::cerr << "exception " << e.what() << std::endl;
}
}
在这里,必须对线程内传递引用或者指针,否则是无法取出结果的。然后在线程内调用 set_value() 或者 set_exception() 在 std::promise 中存储对象。一旦存有了值或者异常,状态就会转变为 ready,必须将 std::promise 转化为 std::future 才能获取存储的值。而 std::future 变量的 get() 仍是线程的节点,如果线程没有运行完,主线程会阻塞直到状态为 ready。另外需要注意,一旦状态为 ready,线程就会退出。如果线程中有需要释放的变量,比如堆变量,就应该调用 set_value_at_thread_exit() 或者 set_exception_at_thread_exit() 来让线程释放资源。
其实还有一个高级接口, std::packaged_task。std::async() 是一旦开启马上运行于后台,但有的时候就是希望某些线程过一会儿再建立(建立线程池),那么就可以使用 std::packaged_task。
原本可以这么写:
double compute(int x, int y);
std::future<double> f = std::async(compute, 1, 2);
...
double res = f.get();
现在可以这么写:
double compute(int x, int y);
std::packaged_task<double(int, int)> task(compute);
std::future<double> f = task.get_future();
...
task(7, 5);
...
double res = f.get();
转载请带上本文永久固定链接:http://www.gleam.graphics/multithread.html