本文介绍 Modern C++
标准库提供的关于多线程编程的工具。如标题所示,本文定位是入门,不讨论内存序、无锁编程等并发编程的高级主题。本文内容主要参考《C++
并发编程实战(第二版)》一书,对并发编程感兴趣推荐阅读原书。
1. 线程
C++ 中使用线程的方式非常简单,只需要包含 thread
头文件,然后向 thread
构造函数传入可调用函数和实参即可。一些示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <functional> #include <iostream> #include <thread> void hello () { std::cout << "Hello Concurrent World!\n" ; }struct HelloThread { void operator () () const { std::cout << "Hello Thread!" << std::endl; } };void bind_sample (int a, int b) { std::cout << a << " + " << b << " = " << a + b << std::endl; }void increase (int &i) { ++i; }class AddTen { public : void add (int value) { std::cout << value << " + " << ten << " = " << value + ten << std::endl; } private : int ten{10 }; };int main () { std::thread t1 (hello) ; t1.join (); std::thread t2 (HelloThread{}) ; t2.join (); std::thread t3 ([]() { std::cout << "Hello Lambda!" << std::endl; }) ; t3.join (); std::thread t4 (bind_sample, 1 , 2 ) ; t4.join (); auto f = std::bind (bind_sample, std::placeholders::_1, 4 ); std::thread t5 (f, 5 ) ; t5.join (); int i = 0 ; std::thread t6 (increase, std::ref(i)) ; t6.join (); std::cout << "i = " << i << std::endl; AddTen obj; std::thread t7 (&AddTen::add, &obj, 1 ) ; t7.join (); return 0 ; }
运行结果如下:
1 2 3 4 5 6 7 Hello Concurrent World! Hello Thread! Hello Lambda! 1 + 2 = 3 5 + 4 = 9 i = 1 1 + 10 = 11
上面的代码示例了如何传入各种类型的函数、可调用对象等构造
thread
,以在单独的线程中运行任务。需要注意的是,传入的参数默认是进行拷贝的,因此,如果函数形参类型是引用,需要使用
std::ref()
以指定引用语义,如上述代码 41
行所示。
线程构造完成后,即自动在新线程中运行指定的任务。此时需要使用
t.join()
进行汇合,表示等待线程执行完成。也可以使用
t.detach()
表示不等待线程执行完成。同时 thread
对象是不可拷贝的,只具有移动语义。可以使用 vector
等容器管理线程对象。
2. 锁
当多个线程并发读写某个数据时,可能会出现错误。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <iostream> #include <thread> int n = 0 ;constexpr int total = 100000 ;void add () { for (int i = 0 ; i < total; ++i) { ++n; } }int main () { std::thread t1 (add) ; std::thread t2 (add) ; t1.join (); t2.join (); std::cout << n << std::endl; return 0 ; }
使用 g++ xxx.cpp
编译上述源文件(不使用任何编译优化选项)并多次运行可执行文件,可以发现每次程序的输出都不确定:
1 2 3 4 5 6 7 8 $ ./a.out 112924$ ./a.out 188328$ ./a.out 135646$ ./a.out 131277
输出并不是预期的 200000
,而似乎是随机数。这是因为多个线程并发读写同一个变量(数据竞争),导致了不合理的数据状态。解决这一问题的最简单方法就是加锁。
C++ 中锁的使用非常简单,只需要包含 mutex
头文件,然后在进入临界区之前调用 lock
函数;在离开临界区时调用 unlock
函数,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #include <iostream> #include <mutex> #include <thread> int n = 0 ; std::mutex m;constexpr int total = 100000 ;void add () { for (int i = 0 ; i < total; ++i) { m.lock (); ++n; m.unlock (); } }
除了 mutex
类外,C++ 还提供了
shared_mutex
,也即读写锁,可用于读多写少的场景,以提高并发度。
上述基于锁的写法虽然解决了数据竞争的问题,但是必须保证对
lock
和 unlock
方法配对调用,否则就会导致锁一直被某个线程占用,阻塞其它线程。C++
中提倡的写法是基于 RAII 的锁管理,包括 lock_guard
、unique_lock
、shared_lock
。这些类是一个包装类,可以指定这些类在构造函数中调用
lock
方法,在析构函数中自动调用 unlock
方法对锁进行管理。这样,程序员就不需要在每个可能的退出分支中调用
unlock
函数,降低出现 bug 的概率。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #include <iostream> #include <mutex> #include <thread> int n = 0 ; std::mutex m;constexpr int total = 100000 ;void add () { for (int i = 0 ; i < total; ++i) { std::lock_guard lock (m) ; ++n; } }
当需要对多个锁进行加锁时,为了防止死锁,需要按照相同的顺序进行加锁。C++
提供了 std::lock
函数以解决这一问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <iostream> #include <mutex> #include <thread> int n = 0 ; std::mutex m1; std::mutex m2;constexpr int total = 100000 ;void add () { for (int i = 0 ; i < total; ++i) { std::unique_lock lock1 (m1, std::defer_lock) ; std::unique_lock lock2 (m2, std::defer_lock) ; std::lock (lock1, lock2); ++n; } }
3. call_once
在实际项目中有一个常见的需求:某个对象只需要构造一次,但是其消耗的资源较大,我们希望只有在真的需要用到它时在进行构造。这一需求类似于线程安全的单例模式。在此我们不考虑其究竟有几种并发安全的写法,而只介绍一种实用的、C++
语言也支持的写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <chrono> #include <iostream> #include <mutex> #include <thread> #include <vector> void heavyInit () { std::this_thread::sleep_for (std::chrono::seconds (1 )); std::cout << "Resource prepared!" << std::endl; } std::once_flag work_once_flag;void work () { std::call_once (work_once_flag, heavyInit); }int main () { int workers = 10 ; std::vector<std::thread> threads; for (int i = 0 ; i < workers; ++i) { threads.emplace_back (work); } for (auto &t : threads) { t.join (); } return 0 ; }
代码非常简单,只需要包含 mutex
头文件,定义
std::call_once_flag
类型变量,并使用
std::call_once
方法,传入对应的
std::call_once_flag
和方法名和参数,即可保证某方法只被多个线程调用一次,且保证并发安全。
4. 条件变量
在实际编程中,我们经常遇到一种情况:一个线程准备数据;另一个线程等待数据,待数据准备好后对其进行处理。如果仅使用上述已经介绍的知识,我们可以写出类似下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include <chrono> #include <iostream> #include <mutex> #include <thread> bool ok = false ; std::mutex m;void provide () { std::this_thread::sleep_for (std::chrono::milliseconds (500 )); std::unique_lock lock (m) ; ok = true ; lock.unlock (); }void consume () { while (true ) { std::unique_lock lock (m) ; if (ok) { break ; } lock.unlock (); std::cout << "comsume thread waked without data..." << std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } }int main () { std::thread t1 (provide) ; std::thread t2 (consume) ; t1.join (); t2.join (); return 0 ; }
核心思路是:使用一个标记变量,和一个保护该变量的锁。生产数据的线程完成数据生产后,将标记变量赋值为
true
;消费线程使用轮询+休眠策略,直到标记变量变为
true
后,进行数据处理。
这么写虽然能够解决问题,但是不够优雅:消费线程不应该使用轮询+休眠策略,而是应该一直处于休眠中,待生产线程准备好数据后,通知消费进程即可,消费线程唤醒进行数据处理工作。实际上,条件变量就是为了解决这个问题,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <chrono> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> std::condition_variable cv;bool ok = false ; std::mutex m;void provide () { std::this_thread::sleep_for (std::chrono::milliseconds (500 )); std::unique_lock lock (m) ; ok = true ; cv.notify_one (); lock.unlock (); }void consume () { std::unique_lock lock (m) ; cv.wait (lock, [&]() { if (!ok) { std::cout << "comsume thread waked without data..." << std::endl; } return ok; }); }int main () { std::thread t1 (provide) ; std::thread t2 (consume) ; t1.join (); t2.join (); return 0 ; }
使用条件变量时,仍需要锁对临界区进行保护。cv.wait
函数传入锁和线程等待的条件,以避免假唤醒。此外,条件变量还有
wait_for
、wait_until
等接口,能够实现线程的限时等待。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include <chrono> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> using namespace std::chrono_literals;auto one_day = 24 h;auto half_an_hour = 30 min;auto max_time_between_messages = 30 ms; std::condition_variable cv;bool done; std::mutex m;bool wait_loop () { auto const timeout = std::chrono::steady_clock::now () + std::chrono::milliseconds (500 ); std::cout << "timer begin..." << std::endl; std::unique_lock<std::mutex> lk (m) ; while (!done) { if (cv.wait_until (lk, timeout) == std::cv_status::timeout) { break ; } std::cout << "wake up while timer not out..." << std::endl; } std::cout << "done = " << done << std::endl; return done; }void notify () { for (int i = 0 ; i < 10 ; ++i) { cv.notify_one (); std::this_thread::sleep_for (std::chrono::milliseconds (10 )); } }int main () { auto start = std::chrono::high_resolution_clock::now (); std::thread t (notify) ; wait_loop (); t.join (); auto stop = std::chrono::high_resolution_clock::now (); std::cout << "do_something() took " << std::chrono::duration <double >(stop - start).count () << " seconds" << std::endl; return 0 ; }
输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 $ ./a.out timer begin... wake up while timer not out... wake up while timer not out... wake up while timer not out... wake up while timer not out... wake up while timer not out... wake up while timer not out... wake up while timer not out... wake up while timer not out... wake up while timer not out... wake up while timer not out... done = 0 do_something() took 0.500422 seconds
使用条件变量,我们还可以实现并发安全的队列(本段代码主要来自《C++
并发编程实战(第二版)》一书):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 #include <condition_variable> #include <iostream> #include <memory> #include <mutex> #include <queue> #include <thread> #include <vector> template <typename T>class threadsafe_queue { private : mutable std::mutex mut; std::queue<T> data_queue; std::condition_variable data_cond; public : threadsafe_queue () {} threadsafe_queue (threadsafe_queue const &other) { std::lock_guard<std::mutex> lk (other.mut) ; data_queue = other.data_queue; } void push (T new_value) { std::lock_guard<std::mutex> lk (mut) ; data_queue.push (new_value); data_cond.notify_one (); } void wait_and_pop (T &value) { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait (lk, [this ] { return !data_queue.empty (); }); value = data_queue.front (); data_queue.pop (); } std::shared_ptr<T> wait_and_pop () { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait (lk, [this ] { return !data_queue.empty (); }); std::shared_ptr<T> res (std::make_shared<T>(data_queue.front())) ; data_queue.pop (); return res; } bool try_pop (T &value) { std::lock_guard<std::mutex> lk (mut) ; if (data_queue.empty ()) return false ; value = data_queue.front (); data_queue.pop (); return true ; } std::shared_ptr<T> try_pop () { std::lock_guard<std::mutex> lk (mut) ; if (data_queue.empty ()) return std::shared_ptr <T>(); std::shared_ptr<T> res (std::make_shared<T>(data_queue.front())) ; data_queue.pop (); return res; } bool empty () const { std::lock_guard<std::mutex> lk (mut) ; return data_queue.empty (); } };int main () { std::mutex cout_mutex; threadsafe_queue<int > q; std::vector<std::thread> threads; int n = 10 ; for (int i = 0 ; i < n; ++i) { if (i % 2 == 0 ) { threads.emplace_back (std::thread ([&]() { for (int i = 0 ; i < 10 ; ++i) { q.push (i); } })); } else { threads.emplace_back (std::thread ([&]() { int value; for (int i = 0 ; i < 10 ; ++i) { q.wait_and_pop (value); } })); } } for (auto &t : threads) { t.join (); } std::cout << "q.empty() = " << q.empty () << std::endl; }
5. 异步任务
除了上述提供的较为低级的并发编程工具,C++
也提供了一些较为高级的接口,能让我们迅速的使用多线程技术以充分利用多核
CPU 的能力。
C++ 的 future
头文件中提供了 std::async
方法,能够直接让系统开启一个线程执行指定的任务:
1 2 3 4 5 6 7 8 9 10 11 12 #include <future> #include <iostream> int find_the_answer_to_ltuae () { return 42 ; }void do_other_stuff () {}int main () { std::future<int > the_answer = std::async (find_the_answer_to_ltuae); do_other_stuff (); std::cout << "The answer is " << the_answer.get () << std::endl; }
运行结果如下:
1 2 $ ./a.out The answer is 42
async
可以看作时 thread
的封装,提供了更简单的使用方法供用户使用。因此,调用 async
方法时,和 thread
的构造函数类似,参数传递为拷贝语义,具体说明见下面代码(本段代码主要来自《C++
并发编程实战(第二版)》一书):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include <future> #include <string> struct X { void foo (int , std::string const &) ; std::string bar (std::string const &) ; }; X x;auto f1 = std::async (&X::foo, &x, 42 , "hello" ); auto f2 = std::async (&X::bar, x, "goodbye" ); struct Y { double operator () (double ) ; }; Y y;auto f3 = std::async (Y (), 3.141 ); auto f4 = std::async (std::ref (y), 2.718 ); X baz (X &) ;auto f6 = std::async (baz, std::ref (x)); class move_only { public : move_only (); move_only (move_only &&); move_only (move_only const &) = delete ; move_only &operator =(move_only &&); move_only &operator =(move_only const &) = delete ; void operator () () ; };auto f5 = std::async (move_only ());
利用 async
接口,我们能够方便地实现一个多线程版本的快速排序算法(本段代码主要来自《C++
并发编程实战(第二版)》一书):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <algorithm> #include <future> #include <iostream> #include <list> #include <thread> template <typename T>std::list<T> parallel_quick_sort (std::list<T> input) { if (input.size () <= 1 ) { return input; } std::list<T> result; result.splice (result.begin (), input, input.begin ()); T const &pivot = *result.begin (); auto divide_point = std::partition (input.begin (), input.end (), [&](T const &t) { return t < pivot; }); std::list<T> lower_part; lower_part.splice (lower_part.end (), input, input.begin (), divide_point); std::future<std::list<T> > new_lower (std::async (¶llel_quick_sort<T>, std::move (lower_part))); auto new_higher (parallel_quick_sort(std::move(input))) ; result.splice (result.end (), new_higher); result.splice (result.begin (), new_lower.get ()); return result; }int main () { std::list<int > lst{-1 , 12 , 3 , 3 , 9 , 32 , 0 , -9 }; auto sorted_lst = parallel_quick_sort (lst); for (auto &elem : sorted_lst) { std::cout << elem << " " ; } std::cout << std::endl; return 0 ; }
除了 async
,future
头文件还提供了
std::promise
、std::packaged_task
等高级编程接口,这里不具体介绍,可参考 https://zh.cppreference.com/w/cpp/thread#future 。
6. 原子类型
除了上述工具,C++
还提供了相对而言轻量级、高性能的原子类型。对应本文开始的代码,使用原子类型能够在基本不改变原有代码的情况下得到预期的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #include <atomic> #include <iostream> #include <thread> std::atomic<int > n = 0 ; constexpr int total = 100000 ;void add () { for (int i = 0 ; i < total; ++i) { ++n; } }int main () { std::thread t1 (add) ; std::thread t2 (add) ; t1.join (); t2.join (); std::cout << n << std::endl; return 0 ; }
运行结果:
使用原子类型,也可以方便地解决前文提到的生产者消费者问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <atomic> #include <chrono> #include <iostream> #include <thread> #include <vector> std::vector<int > data;std::atomic_bool data_ready (false ) ; void reader_thread () { while (!data_ready.load ()) { std::cout << "not prepared yet" << std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (1 )); } std::cout << "The answer=" << data[0 ] << "\n" ; }void writer_thread () { data.push_back (42 ); data_ready = true ; }int main () { std::thread t1 (reader_thread) ; std::thread t2 (writer_thread) ; t1.join (); t2.join (); return 0 ; }
关于原子类型更多信息可参考 https://zh.cppreference.com/w/cpp/atomic/atomic
。
7. 总结
本文仅仅是关于 C++ 并发编程的最简单介绍,介绍了线程、锁、RAII
管理锁、条件变量、原子类型、异步任务等的使用方法。内存模型、内存序、并行算法库等内容将在以后介绍。