Fonctions wait_all() et wait_any()

Ce qui suit est d'intérêt général, mais pour l'essentiel, l'implémentation est de Bjarne Stroustrup, pas de moi (outre des retouches de styles et la présentation des idées), dans son excellent The C++ Programming Language 4e édition. Ce livre est rempli de bonnes idées.

Je vous recommande d'avoir lu et compris l'article sur les futures avant de lire ce qui suit.

Plusieurs API de multiprogrammation ou de synchronisation permettent à un thread de se mettre en attente de plusieurs ressources, débloquant lorsque toutes ces ressources deviennent disponibles – fonction que l'on pourrait nommer wait_all() – ou encore lorsqu'au moins une d'elles devient disponible – fonction wait_any(). Par exemple, sous Microsoft Windows, on trouve WaitForSingleObject() pour attendre une ressources, mais aussi WaitForMultipleObjects() pour attendre entre une et MAXIMUM_WAIT_OBJECTS ressources, avec un paramètre pour déterminer si la fonction doit débloquer une fois qu'une seule ressources est devenue disponible ou une fois qu'elles le sont toutes devenues.

Il n'existe pas, avec C++ 14, de fonctions standards pour réaliser ces deux opérations, mais il est possible de les implémenter à partir des autres outils mis à notre disposition. C'est ce que nous ferons ci-dessous, en mettant un thread en attente d'un groupe de std::future.

Implémenter un wait_all()

Une fonction wait_all() peut s'exprimer comme suit. Remarquez d'office qu'elle ne reposera que sur des outils de la bibliothèque standard.

#include <random>
#include <future>
#include <iostream>
#include <chrono>
#include <vector>
#include <algorithm>
using namespace std;
using namespace std::chrono;

Pour les besoins de la démonstration, nous utiliserons un foncteur muni d'un identifiant entier, pour les distinguer les uns des autres, et acceptant en paramètre une fonction qui lui retournera un entier utilisé à l'interne pour suspendre l'exécution du thread qui l'appellera.

struct F
{
   int i;
   template <class R>
      int operator()(R r)
      {
         this_thread::sleep_for(seconds{r()});
         return i;
      }
};

La fonction wait_all() se déclinera en deux versions :

  • Une variadique, acceptant en paramètre autant de paramètres que le code client lui en suppléera, et
  • Une autre acceptant en paramètre une séquence standard de ressources

Dans un cas comme dans l'autre, l'implémentation sera relativement simple : bloquer sur l'obtention de la première, puis poursuivre avec les autres.

template <class T>   void wait_all(T && arg)
   {
      arg.wait();
   }
template <class T, class ... Args>
   void wait_all(T && arg, Args && ... args)
   {
      wait_all(arg);
      wait_all(forward<Args>(args)...);
   }
template <class Itt>
   void wait_all(Itt debut, Itt fin)
   {
      for (; debut != fin; ++debut)
         wait_all(*debut);
   }

Le code va de soi.

void test_wait_all()
{
   random_device rd;
   mt19937 prng{ rd() };
   uniform_int_distribution<> d10{1, 10};
   vector<future<int>> v;
   for (int i = 0; i < 10; ++i)
      v.emplace_back(async(launch::async, F{i}, [&]() {
         return d10(prng);
      }));
   wait_all(begin(v), end(v));
   for (auto & fut : v)
      cout << fut.get() << " ";
   cout << endl;
}

Si vous en avez envie, deux défis :

Implémenter un wait_any()

Une fonction wait_any() peut s'exprimer comme suit. Remarquez d'office qu'elle ne reposera que sur des outils de la bibliothèque standard.

#include <random>
#include <future>
#include <iostream>
#include <chrono>
#include <vector>
#include <algorithm>
using namespace std;
using namespace std::chrono;

Pour les besoins de la démonstration, nous utiliserons un foncteur muni d'un identifiant entier, pour les distinguer les uns des autres, et acceptant en paramètre une fonction qui lui retournera un entier utilisé à l'interne pour suspendre l'exécution du thread qui l'appellera.

struct F
{
   int i;
   template <class R>
      int operator()(R r)
      {
         this_thread::sleep_for(seconds{r()});
         return i;
      }
};

La fonction wait_any() acceptera une séquence standard de ressources, et reposera sur une version à un seul paramètre. La technique appliquée à un seul paramètre sera d'attendre « zéro milliseconde » (milliseconds{0} en C++ 11, ou plus simplement 0ms en C++ 14) et de retourner true seulement si la raison du déblocage est que la std::future sur laquelle nous avons bloqué est prête.

Pour la version implémentée sur une séquence standard, nous itérons jusqu'à ce qu'une des std::future sur laquelle nous avons bloqué soit prête, et nous retournons un entier indiquant la position, dans la séquence, de celle qui a été identifiée. Dans le cas dégénéré où la séquence serait vide, nous retournons -1.

template <class T>
   bool wait_any(T && arg)
   {
      return arg.valid() && arg.wait_for(0ms) == future_status::ready;
   }
template <class Itt>
   int wait_any(Itt debut, Itt fin)
   {
      if (debut != fin)
         for (;;)
         {
            int n = 0;
            for (auto p = debut; p != fin; ++p, ++n)
               if (wait_any(*p)) return n;
         }
      return -1;
   }

Le code de test lance plusieurs std::future et itère sur celles-ci. À chaque fois que la fin d'une std::future est détectée, celle-ci est retirée de la séquence.

void test_wait_any()
{
   random_device rd;
   mt19937 prng{ rd() };
   uniform_int_distribution<> d10{1, 10};
   vector<future<int>> v;
   for (int i = 0; i < 10; ++i)
      v.emplace_back(async(launch::async, F{i}, [&]() {
         return d10(prng);
      }));
   for (auto n = wait_any(begin(v), end(v)); !(n == -1 || v.empty()); n = wait_any(begin(v), end(v)))
   {
      cout << v[n].get() << ' ' << flush;
      v.erase(begin(v) + n);
   }
   cout << endl;
}

Si vous en avez envie, deux défis :

En vue de C++ 17 fonctions when_all() et when_any()

Ce dont nous discutons ci-dessous fait partie de la spécification technique sur la concurrence de C++ telle que prévue pour expérimentation, probablement en vue d'une inclusion dans le standard à partir de C++ 17.

Puisque ces fonctions devraient raisonnablement être standardisées, la spécification technique d'extensions pour programmation concurrente propose, entre autres outils, des fonction standards nommées respectivement when_all() et when_any().

Ces deux fonctions suivent des modalités analogues aux fonction wait_all() et wait_any() proposées plus haut. Évidemment, savoir comment les implémenter est une chose, le faire en est une autre; privilégiez les outils standards en autant que cela soit possible!

Fonction when_all()

La fonction when_all() bloquera jusqu'à la complétion de toutes les futures d'une séquence. Cette séquence peut être faite d'une paire d'itérateurs (séquence à demi ouverte), dans quel cas toutes les futures seront du même type, ou d'une séquence variadique, dans quel cas elles pourront avoir des types distincts.

Le type de retour de when_all() dépendra des modalités d'appel :

Dans un cas comme dans l'autre, l'ordre des paramètres à l'entrée correspondra à l'ordre des éléments dans la valeur retournée.

Fonction when_any()

La fonction when_any() bloquera jusqu'à la complétion d'au moins une future d'une séquence. Cette séquence peut être faite d'une paire d'itérateurs (séquence à demi ouverte), dans quel cas toutes les futures seront du même type, ou d'une séquence variadique, dans quel cas elles pourront avoir des types distincts.

Le type de retour de when_any() sera une instance de future<when_any_result<Sequence>> when_any_result aura la forme suivante :

template<class Sequence>
   struct when_any_result
   {
      size_t index;
      Sequence futures;
   };

... et où Sequence dépendra des modalités d'appel :

Dans un cas comme dans l'autre, l'ordre des paramètres à l'entrée correspondra à l'ordre des éléments dans la valeur retournée. Pour sa part, l'attribut index sera la position de la future ayant débloqué la mise en attente. La documentation existante laisse entendre que l'attribut index aura la valeur -1 dans le cas d'une séquence vide, mais étant donné que size_t est un type entier non-signé, il s'agit probablement d'une erreur.


Valid XHTML 1.0 Transitional

CSS Valide !