C++11 线程、锁和条件变量

jopen 11年前

std::thread类代表了一个可执行的线程,它来自头文件<thread>。与其它创建线程的API(比如 Windows API中的CreateThread)不同的是, 它可以使用普通函数、lambda函数以及仿函数(实现了operator()函数的类)。另外,它还允许向线程函数传递任意数量的参数。

#include <thread> void func()  { // do some work } int main()  {     std::thread t(func);     t.join(); return 0;  }

在上面的例子中,t是一个线程对象,函数func()运行于该线程之中。调用join函数后,该调用线程(本例中指的就是主线程)就会在join进来进行执行的线程t结束执行之前,一直处于阻塞状态。如果该线程函数执行结束后返回了一个值,该值也将被忽略。不过,该函数可以接受任意数量的参数。

void func(int i, double d, const std::string& s)  {      std::cout << i << ", " << d << ", " << s << std::endl;  } int main()  {     std::thread t(func, 1, 12.50, "sample");     t.join(); return 0;  }
尽管我们可以向线程函数传递任意数量的参数,但是,所有的参数都是按值传递的。如果需要将参数按引用进行传递,那么就一定要象下例所示一样,把该参数封装到 std::ref或者std::cref之中。
void func(int& a)  {     a++;  } int main()  { int a = 42;     std::thread t(func, std::ref(a));     t.join();        std::cout << a << std::endl; return 0;  }

上面程序打印结果为43,但要不是将a封装到std::ref之中的话,输出的将是42。

除join方法之外,这个线程类还提供了另外几个方法:

  • swap: 将两个线程对象的底层句柄进行交换
  • detatch: 允许执行该方法的线程独立于本线程对象的执行而继续执行。脱离后的线程就再也不能执行join了(你不能等待到它执行结束了)
    int main() {      std::thread t(funct);      t.detach(); return 0;  }
有一点非常重要,值得注意:线程函数中要是抛出了异常的话,使用通常的try-catch方式是捕获不到该异常的。换句话说,下面这种做法行不通:
try {      std::thread t1(func);      std::thread t2(func);         t1.join();      t2.join();  } catch(const std::exception& ex)  {      std::cout << ex.what() << std::endl;  }

要在线程间传递异常,你可以先在线程函数中捕获它们,然后再将它们保存到一个合适的地方,随后再让另外一个线程从这个地方取得这些异常。

std::vector<std::exception_ptr>  g_exceptions; void throw_function()  { throw std::exception("something wrong happened");  } void func()  { try {        throw_function();     } catch(...)     {        std::lock_guard<std::mutex> lock(g_mutex);        g_exceptions.push_back(std::current_exception());     }  } int main()  {     g_exceptions.clear();       std::thread t(func);     t.join(); for(auto& e : g_exceptions)     { try { if(e != nullptr)           {              std::rethrow_exception(e);           }        } catch(const std::exception& e)        {           std::cout << e.what() << std::endl;        }     } return 0;  }

要获得更多关于捕获并传递异常的知识,你可以阅读在主线程中处理工作线程抛出的C++异常以及怎样才能在线程间传递异常?

在深入讨论之前还有一点值得注意,头文件<thread>里还在命名空间std::this_thread中提供了一些辅助函数:

  • get_id: 返回胆怯线程的id
  • yield: 让调度器先运行其它的线程,这在忙于等待状态时很有用
  • sleep_for: 将当前线程置于阻塞状态,时间不少于参数所指定的时间段
  • sleep_util: 在指定的时刻来临前,一直将当前的线程置于阻塞状态

在上一个例子中,我需要对g_exceptions这个vector进行同步访问,以确保同一个时刻只能有一个线程向其中压入新元素。为了实现同步,我使用了一个互斥量,并在该互斥量上进行了锁定。互斥量是一个核心的同步原语,C++11的<mutex>头文件中包含了四种不同的互斥量。

以下所列就是一个使用std::mutex(注意其中get_id()和sleep_for()这两个前文所述的辅助函数的用法)的例子。

#include <iostream>  #include <thread>  #include <mutex>  #include <chrono>     std::mutex g_lock; void func()  {      g_lock.lock();         std::cout << "entered thread " << std::this_thread::get_id() << std::endl;      std::this_thread::sleep_for(std::chrono::seconds(rand() % 10));      std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;         g_lock.unlock();  } int main()  {      srand((unsigned int)time(0));         std::thread t1(func);      std::thread t2(func);      std::thread t3(func);         t1.join();      t2.join();      t3.join(); return 0;  }

其输出将类似如下所示:

entered thread 10144 leaving thread 10144 entered thread 4188 leaving thread 4188 entered thread 3424 leaving thread 3424 

lock()和unlock()这两个方法顾名思义,头一个方法用来对互斥量进行加锁,如果互斥量不可得便会处于阻塞状态;第二个方法用来对互斥量进行解锁。

接下来的这个例子演示的是一个简单的线程安全的容器(内部使用的是std::vector)。这个容器具有添加单个元素的add()方法以及添加一批元素的addrange()方法,addrange()方法内只是简单的调用了add()方法。 

template <typename T> class container   {      std::mutex _lock;      std::vector<T> _elements; public: void add(T element)       {          _lock.lock();          _elements.push_back(element);          _lock.unlock();      } void addrange(int num, ...)      {          va_list arguments;             va_start(arguments, num); for (int i = 0; i < num; i++)          {              _lock.lock();              add(va_arg(arguments, T));              _lock.unlock();          }             va_end(arguments);       } void dump()      {          _lock.lock(); for(auto e : _elements)              std::cout << e << std::endl;          _lock.unlock();      }  }; void func(container<int>& cont)  {      cont.addrange(3, rand(), rand(), rand());  } int main()  {      srand((unsigned int)time(0));         container<int> cont;         std::thread t1(func, std::ref(cont));      std::thread t2(func, std::ref(cont));      std::thread t3(func, std::ref(cont));         t1.join();      t2.join();      t3.join();         cont.dump(); return 0;  }

这个程序执行起来会进入死锁状态。其原因在于,该容器多次尝试获取同一个互斥量而之前却并没有释放该互斥量,这么做是行不通的。这正是std::recursive_mutex的用武之地,它允许同一个线程多次获得同一个互斥量,可重复获得的最大次数并未具体说明,但一旦查过一定次数,再对lock进行调用就会抛出std::system错误。为了修复上面所列代码的死锁问题(不通过修改addrange方法的实现,让它不对lock和unlock方法进行调用),我们可以将互斥量改为std::recursive_mutex

template <typename T> class container   {      std::recursive_mutex _lock; // ...   };

经过修改之后,该程序的输出会同如下所示类似:

6334 18467 41 6334 18467 41 6334 18467 41 

明眼的读者可能已经发现了,每次调用func()所产生的数字序列都完全相同。这是因为对srad的初始化是要分线程进行的,对srand()的调用只是在主线程中进行了初始化。在其它的工作线程中,srand并没有得到初始化,所以每次产生的数字序列就是完全相同的了。

显式的加锁和解锁可能会导致一定的问题,比如忘了解锁或者加锁的顺序不对都有可能导致死锁。本标准提供了几个类和函数用于帮助解决这类问题。使用这些封装类就能够以相互一致的、RAII风格的方式使用互斥量了,它们可以在相应的代码块的范围内进行自动的加锁和解锁动作。这些封装类包括:

  • lock_guard: 该类的对象在构造之时会试图获得互斥量的拥有权(通过调用lock()实现),而在析构之时会自动释放它所获得的互斥量(通过调用unlock()实现)。这是一个不可复制的类。
  • unique_lock: 是一个通用的互斥量封装类。与lock_quard不同,它还支持延迟加锁、时间锁、递归锁、锁所有权的转移并且还支持使用条件变量。这也是一个不可复制的类,但它是可以移动的类。

使用这些封装类,我们可以象这样来改写我们的容器:

template <typename T> class container   {      std::recursive_mutex _lock;      std::vector<T> _elements; public: void add(T element)       {          std::lock_guard<std::recursive_mutex> locker(_lock);          _elements.push_back(element);      } void addrange(int num, ...)      {          va_list arguments;             va_start(arguments, num); for (int i = 0; i < num; i++)          {              std::lock_guard<std::recursive_mutex> locker(_lock);              add(va_arg(arguments, T));          }             va_end(arguments);       } void dump()      {          std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements)              std::cout << e << std::endl;      }  };

有人会说,既然dump()方法并不会对容器的状态做出任何修改,所以它应该定义为congst的方法。但要是你真的这么改了之后,编译器就会报告出如下的错误:

‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'
互斥量(无论使用的是哪一种实现)必须要获得和释放,这就意味着要调用非常量型的lock()和unlock()方法。所以,从逻辑上讲,lock_guard不能在定义中添加const(因为该方法定义为const的话,互斥量也就必需是const的了)这个问题有个解决办法,可以让mutex变为mutable的。成为 mutable之后就可以在const函数中对状态进行修改了。不过,这种用法应该只用于隐藏的或者“元”状态(比如,对计算结果或者查询到的数据进行缓存,以供下次调用时直接使用而无需再次计算或查询;再比如,对 只是对对象的实际状态起着辅助作用的互斥量中的位进行修改)。
template <typename T> class container   {     mutable std::recursive_mutex _lock;     std::vector<T> _elements; public: void dump() const {        std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements)           std::cout << e << std::endl;     }  };

这些封装类都具有可以接受一个用来指导加锁策略的参数的构造器,可用的加锁策略有:

  • defer_lockof typedefer_lock_t: 不要取得互斥量的拥有权
  • try_to_lockof typetry_to_lock_t: 在不会被阻塞的情况下尝试获得互斥量的拥有权
  • adopt_lockof typeadopt_lock_t: 假设调用线程已经获得了互斥量的拥有权

这些策略的定义如下所示:

struct defer_lock_t { };   struct try_to_lock_t { };   struct adopt_lock_t { };   constexpr std::defer_lock_t defer_lock = std::defer_lock_t();   constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();   constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();

除了这些互斥量的封装类,本标准还提供了几个用来对一个或多个互斥量进行加锁的方法。
  • lock: 使用一种可避免死锁的算法对互斥量进行加锁(通过调用tolock()、try_lock()以及unlock())。
  • try_lock: 通过调用try_lock()i按照参数里指定的互斥量的顺序对多个互斥量进行加锁。

这里举一个造成死锁的例子:我们有一个保存元素的容器,还有一个叫做exchange()的方法,用来将一个元素从一个容器中取出来放入另外一个容器。为了成为线程安全的函数,这个函数通过获得每个容器的互斥量,对两个容器的访问进行了同步处理。

template <typename T> class container   { public:      std::mutex _lock;      std::set<T> _elements; void add(T element)       {          _elements.insert(element);      } void remove(T element)       {          _elements.erase(element);      }  }; void exchange(container<int>& cont1, container<int>& cont2, int value)  {      cont1._lock.lock();      std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock  cont2._lock.lock();             cont1.remove(value);      cont2.add(value);         cont1._lock.unlock();      cont2._lock.unlock();  }

假设这个函数是从两个不同的线程中进行调用的,在第一个线程中有一个元素从第一个容器中取出来,放到了第二个容器中,在第二个线程中该元素又从第二个容器中取出来放回到了第一个容器中。这样会导致死锁(如果线程上下文正好在获得第一个锁的时候从一个线程切换到了另一个线程的时候就会发生死锁)。

int main()  {      srand((unsigned int)time(NULL));         container<int> cont1;       cont1.add(1);      cont1.add(2);      cont1.add(3);         container<int> cont2;       cont2.add(4);      cont2.add(5);      cont2.add(6);         std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3);      std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6);         t1.join();      t2.join(); return 0;  }

要解决该问题,你可以使用以能够避免死锁的方式获得锁的std::lock:

void exchange(container<int>& cont1, container<int>& cont2, int value)  {      std::lock(cont1._lock, cont2._lock);          cont1.remove(value);      cont2.add(value);         cont1._lock.unlock();      cont2._lock.unlock();  }

条件变量

C++11还提供了对另外一个同步原语的支持,这个原语就是条件变量。使用条件变量可以将一个或多个线程进入阻塞状态,直到收到另外一个线程的通知,或者超时或者发生了虚假唤醒,才能退出阻塞状态。头文件<condition_variable>中包含的条件变量有两种实现:

  • condition_variable: 要求任何想等待该条件变量的线程必需先获得std::unique_lock锁。
  • condition_variable_any: 该实现更加通用,它可以用于任何满足基本条件的锁(只要实现了lock()和unlock()方法即可)。因为它使用起来代价要更高一些(从性能和操作系统的字样的角度讲),所以,应该在只有它所提供的额外的灵活性是必不可少的情况下才会选用它。
下面说说条件变量的工作原理:
  • 必须至少要有一个等待条件变为true的线程。等待中的线程必须首先获得一个unique_lock锁。 该锁将会传递给wait()方法,然后wait()方法会释放互斥量并将该线程暂停,直到条件变量得到相应的信号。当接受到信号,线程被唤醒后,该锁就又被重新获得了。
  • 必须至少要有一个线程发送信号使得条件变为true。信号可以通过调用notify_one()来发送,发用这个方法发送后就会将处于阻塞状态的等待该条件获得信号的线程中的某一个线程(任意一个线程)恢复执行;还可以通过调用notify_all()将等待该条件的所以线程唤醒。
  • 因为在多处理器的环境下,要让条件唤醒成为完全可预测会有一些复杂情况难以克服,所以就会出现一些虚假唤醒。也就是说,线程甚至在没有人向条件变量发送信号的情况下就有可能会被唤醒。因此,在线程唤醒后,仍然需要检测条件是不是还为true。而且因为虚假唤醒可能会多次发生,所以该检测必须用一个循环来进行。

以下代码给出了一个利用状态变量来同步线程的例子:几个工作线程可能在他们运行的时候产生错误并且他们把这些错误放到队列里面。一个记录线程会通过从队列得到并输出错误来处理这些错误代码。当有错误发生的时候,工作线程会发信号给记录线程。记录线程一直在等待着状态变量接收信号。为了防止虚假的唤醒,所以记录线程的等待是发生在一个以检测布尔值(boolean)的循环之中的。

#include <thread>  #include <mutex>  #include <condition_variable>  #include <iostream>  #include <queue>  #include <random>    std::mutex              g_lockprint;  std::mutex              g_lockqueue;  std::condition_variable g_queuecheck;  std::queue<int>         g_codes; bool g_done; bool g_notified; void workerfunc(int id, std::mt19937& generator)  { // print a starting message  {          std::unique_lock<std::mutex> locker(g_lockprint);          std::cout << "[worker " << id << "]\trunning..." << std::endl;      } // simulate work  std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error  int errorcode = id*100+1;      {          std::unique_lock<std::mutex> locker(g_lockprint);          std::cout  << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;      } // notify error to be logged  {          std::unique_lock<std::mutex> locker(g_lockqueue);          g_codes.push(errorcode);          g_notified = true;          g_queuecheck.notify_one();      }  } void loggerfunc()  { // print a starting message  {          std::unique_lock<std::mutex> locker(g_lockprint);          std::cout << "[logger]\trunning..." << std::endl;      } // loop until end is signaled  while(!g_done)      {          std::unique_lock<std::mutex> locker(g_lockqueue); while(!g_notified) // used to avoid spurious wakeups  {              g_queuecheck.wait(locker);          } // if there are error codes in the queue process them  while(!g_codes.empty())          {              std::unique_lock<std::mutex> locker(g_lockprint);              std::cout << "[logger]\tprocessing error:  " << g_codes.front()  << std::endl;              g_codes.pop();          }            g_notified = false;      }  } int main()  { // initialize a random generator  std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); // start the logger  std::thread loggerthread(loggerfunc); // start the working threads  std::vector<std::thread> threads; for(int i = 0; i < 5; ++i)      {          threads.push_back(std::thread(workerfunc, i+1, std::ref(generator)));      } // work for the workers to finish  for(auto& t : threads)          t.join(); // notify the logger to finish and wait for it  g_done = true;      loggerthread.join(); return 0;  }  Running this code produces an output that looks like this (notice this output is different with each run because each worker thread works, i.e. sleeps, for a random interval):  [logger]        running...  [worker 1]      running...  [worker 2]      running...  [worker 3]      running...  [worker 4]      running...  [worker 5]      running...  [worker 1]      an error occurred: 101 [worker 2]      an error occurred: 201 [logger]        processing error: 101 [logger]        processing error: 201 [worker 5]      an error occurred: 501 [logger]        processing error: 501 [worker 3]      an error occurred: 301 [worker 4]      an error occurred: 401 [logger]        processing error: 301 [logger]        processing error: 401 
如上所示的wait()方法有两个重载:

1.一个是只有一个唯一锁;这个重载释放锁,封锁线程和把线程加入都是等待这一个状态变量的线程队列里面;当状态变量被信号通知后或者是一个假唤醒发生,这些线程就会被唤醒。但他们中任何一个发生时,锁就被重新获得然后函数返回。

2.另外一个是对于唯一锁的添加,它也是使用一个循环的谓语直到它返回false;这个重载可以用来防止假式唤醒。它基本上是与以下是等价的:

while(!predicate())      wait(lock);
因此在上例中,通过使用重载的wait函数以及一个验证队列状态(空或不空)的断言,就可以避免使用布尔变量g_notified了:
void workerfunc(int id, std::mt19937& generator)  { // print a starting message  {          std::unique_lock<std::mutex> locker(g_lockprint);          std::cout << "[worker " << id << "]\trunning..." << std::endl;      } // simulate work  std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error  int errorcode = id*100+1;      {          std::unique_lock<std::mutex> locker(g_lockprint);          std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;      } // notify error to be logged  {          std::unique_lock<std::mutex> locker(g_lockqueue);          g_codes.push(errorcode);          g_queuecheck.notify_one();      }  } void loggerfunc()  { // print a starting message  {          std::unique_lock<std::mutex> locker(g_lockprint);          std::cout << "[logger]\trunning..." << std::endl;      } // loop until end is signaled  while(!g_done)      {          std::unique_lock<std::mutex> locker(g_lockqueue);            g_queuecheck.wait(locker, [&](){return !g_codes.empty();}); // if there are error codes in the queue process them  while(!g_codes.empty())          {              std::unique_lock<std::mutex> locker(g_lockprint);              std::cout << "[logger]\tprocessing error:  " << g_codes.front() << std::endl;              g_codes.pop();          }      }  }

除了这个wait()重载方法,还有另外两个进行类似重载的等待方法,都有用了一个用来避免虚假唤醒的断言:

  • wait_for: 在条件变量收到信号或者指定的超时发生前,一直都将线程置于阻塞状态。
  • wait_until: 在条件变量收到信号或者指定的时刻到来前,一直都将线程处于阻塞状态。

这两个函数不带断言的重载函数会返回一个cv_status状态,该状态用来表明线程被唤醒了到底是因为发生了超时还是因为条件变量收到了信号抑或是发生了虚假唤醒。

本标准还提供了一个叫做notified_all_at_thread_exit的函数,它实现了一种机制,在该机制下,我们可以通知其它线程,某个给定的线程执行结束了,并销毁了所有的thread_local对象。之所以引入该函数,是因为如果使用了thread_local后,采用join()之外的机制等待线程可能会导致不正确甚至是致命的行为,出现这样的问题是因为thread_local的析构函数甚至可能会在原本处于等待中的线程继续执行后被执行了而且还可能已经执行完成了。(有关这方面更多的情况可参见N3070N2880)。 一般情况下,notified_all_at_thread_exitTypically必须正好在线程生成前调用。下面给出一个例子,演示一下notify_all_at_thread_exit是如何同condition_variable一起使用来对两个线程进行同步处理的:

std::mutex              g_lockprint;  std::mutex              g_lock;  std::condition_variable g_signal; bool g_done; void workerfunc(std::mt19937& generator)  {     {        std::unique_lock<std::mutex> locker(g_lockprint);        std::cout << "worker running..." << std::endl;     }       std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));       {        std::unique_lock<std::mutex> locker(g_lockprint);        std::cout << "worker finished..." << std::endl;     }       std::unique_lock<std::mutex> lock(g_lock);     g_done = true;     std::notify_all_at_thread_exit(g_signal, std::move(lock));  } int main()  { // initialize a random generator  std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());       std::cout << "main running..." << std::endl;       std::thread worker(workerfunc, std::ref(generator));     worker.detach();       std::cout << "main crunching..." << std::endl;       std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));       {        std::unique_lock<std::mutex> locker(g_lockprint);        std::cout << "main waiting for worker..." << std::endl;     }       std::unique_lock<std::mutex> lock(g_lock); while(!g_done) // avoid spurious wake-ups  g_signal.wait(lock);       std::cout << "main finished..." << std::endl; return 0;  }

如果工作线程是在主线程结束之前结束的,输出将会是如下所示:

main running...  worker running...  main crunching...  worker finished...  main waiting for worker...  main finished...

如果是主线程在工作线程结束之前结束的,输出将会是如下所示:

main running...  worker running...  main crunching...  main waiting for worker...  worker finished...  main finished...

结束语

C++11标准使得C++开发人员能够以一种标准的和平台独立的方式来编写多线程代码。本文一一讲述了标准所支持的线程和同步机制。<thread>头文件提供了名为thread的类(另外还包含了一些辅助类或方法),该类代表了一个执行线程。头文件<mutex>提供了几种互斥量的实现,以及对线程进行同步访问的封装类。头文件<condition_variable>为条件变量提供了两种实现,利用这些实现可以让一个或多个线程进入阻塞状态,直到从收到来自另外一个或多个线程的通知、或者发生超时或虚假唤醒为止才会被唤醒。推荐在这方面再阅读一些别的资料来获得更详细的信息。