Écrire une version parallèle d'une algorithme standard

Ce texte présume chez vous une familiarité avec les algorithmes standards, les conteneurs standards et le langage C++, en particulier le standard C++ 11.

Vous remarquerez que nous appliquerons quelques techniques contemporaines de programmation. Si vous ne les connaissez pas, je vous invite à lire un peu sur les traits, sur std::enable_if et sur std::function pour vous familiariser avec elles. Ceci pourrait vous aider à mieux tirer profit de ce qui suit.

Notez que, bien que le code proposé ci-dessous soit fonctionnel et correct, il ne se veut pas de qualité industrielle et n'a pas été testé rigoureusement sur plusieurs plateformes. Si vous avez accès à de véritables implémentations commerciales d'algorithmes tels que ceux décrits ici, privilégiez-les.

Supposons que nous souhaitions rédiger une version parallèle d'algorithmes standards tels que for_each(), transform() ou accumulate()... Comment pourrait-on y arriver? Est-ce que le jeu en vaudrait la chandelle?

Ce qui suit donne un exemple simple de programme comparant une version séquentielle des algorithmes transform() et accumulate() (celles logées dans l'espace nommé std, livrées avec votre compilateur) et une version parallèle « maison » de chacun. Des comparatifs de vitesse d'exécution sommaires sont ensuite proposés. Ces comparatifs résultent de tests faits sur mon ordinateur personnel, et ne doivent donc être considérés que comme des illustrations, sans plus.

Survol des versions standards (séquentielles)

Pour en savoir plus sur les algorithmes numériques et leur relation avec la notation mathématique usuelle, voir ceci.

Les versions séquentielles des algorithmes pour lesquels nous proposerons ici des versions parallèles sont décrites dans le standard du langage sous la forme de consignes : préconditions, postconditions, invariants, complexité algorithmique, et quelques mots pour décrire le rôle de chacun (parfois en pseudocode, pour ne pas dire en pseudo-C++).

Ce qui suit décrit sommairement une implémentation possible de chacun de ces algorithmes. Les implémentations effectives dépendront du compilateur.

Implémentation possible de std::transform()

Cet algorithme parcourt une séquence source, décrite par l'intervalle à demi-ouvert ci-dessous, appliquant à chaque élément de cette séquence une fonction et déposant le résultat dans une séquence de destination débutant à .

Précondition : la capacité de la séquence de destination doit être au moins aussi grande que la longueur de la séquence source.

template <class InIt, class OutItt, class F>
   void transform(InIt ds, InIt fs, OutItt dd, F fct) {
      for(; ds != fs; ++ds, (void) ++dd)
         *dd = fct(*ds);
   }

Complexité algorithmique : est le type des éléments de la séquence .

Implémentation possible de std::accumulate()

Cet algorithme parcourt une séquence source, décrite par l'intervalle à demi-ouvert ci-dessous, accumulant l'application d'une fonction à chaque valeur de cette séquence. La valeur initiale du est suppléée à l'appel par le code client, et il en va de même pour le type de cette valeur. Par défaut, dans sa version à trois paramètres, cet algorithme réalise une somme d'éléments.

template <class It, class T, class F>
   T accumulate(It debut, It fin, T init, F fct) {
      for(; debut != fin; ++debut)
         init = fct(init, *debut);
      return init;
   }
template <class It, class T>
   T accumulate(It debut, It fin, T init) {
      return accumulate(debut, fin, init, [](T cur, T val) { return cur + val; });
   }

Complexité algorithmique : est le type du cumul et est le type des éléments de la séquence .

Survol de l'approche pour les versions parallèles

L'idée générale derrière l'approche que j'ai suivie ici est simple :

Une esquisse de cet en-tête suit :

#ifndef PARALLEL_ALGORITHM_H
#define PARALLEL_ALGORITHM_H
#include <thread>
#include <future>
#include <vector>
#include <algorithm>
#include <numeric>
namespace parallel {
   // fonction transform()
   // fonctions accumulate()
}
#endif

Chacun des algorithmes susmentionné est présenté ci-dessous. Vous remarquerez que, dans chaque cas, la version parallèle découpe la tâche en blocs puis les traite en parallèle par des appels asynchrones à la version séquentielle.

Implémentation parallèle de transform()

Pour parallel::transform, j'ai procédé comme suit :

Le code est :

// ...
   template <class InIt, class OutItt, class F>
      void transform(InIt ds, InIt fs, OutItt dd, F fct) {
         using namespace std;
         auto n = thread::hardware_concurrency();
         auto nelems = distance(ds, fs);
         auto step = nelems / n;
         vector<future<void>> futures;
         for (; --n > 0; advance(ds, step), advance(dd, step)) {
            auto p = ds;
            advance(p, step);
            futures.emplace_back(
               async([=]{ std::transform(ds, p, dd, fct); })
            );
         }
         std::transform(ds, fs, dd, fct);
         for(auto &fut : futures) fut.wait();
      }
// ...

Pour être fiable, au sens de « pour mener au même résultat observable que l'application du même algorithme dans sa forme séquentielle », ce code a pour précondition que fct n'ait pas d'effet de bord :

Si cette précondition n'est pas respectée, alors le comportement de cet algorithme est indéfini.

Implémentation parallèle d'accumulate()

Pour parallel::accumulate, j'ai procédé comme suit :

Le code est :

// ...
   template <class It, class F, class T>
      T accumulate(It debut, It fin, T init, F fct) {
         using namespace std;
         auto n = thread::hardware_concurrency();
         auto nelems = distance(debut, fin);
         auto step = nelems / n;
         vector<future<T>> futures;
         for (; --n > 0; advance(debut, step)) {
            auto p = debut;
            advance(p, step);
            futures.emplace_back(
               async([=]{ return std::accumulate(debut, p, init, fct); })
            );
         }
         T result = std::accumulate(debut, fin, init, fct);
         return std::accumulate(begin(futures), end(futures), result, [](T cur, future<T> &fut) { return cur + fut.get(); });
      }
   template <class It, class T>
      T accumulate(It debut, It fin, T init) {
         return parallel::accumulate(debut, fin, init, [](T cur, T val) { return cur + val; });
      }
// ...

Pour être fiable, au sens de « pour mener au même résultat observable que l'application du même algorithme dans sa forme séquentielle », ce code a pour précondition que fct n'ait pas d'effet de bord :

Si cette précondition n'est pas respectée, alors le comportement de cet algorithme est indéfini.

Puisque cet algorithme traite en parallèle plusieurs accumulations distinctes à partir d'une même valeur initiale, il est important que le calcul qui lui est confié se prête à ce comportement. Ce serait le cas d'une somme avec valeur initiale qui serait le static_cast<T>(0), par exemple, ou encore d'un produit arithmétique avec valeur initiale de static_cast<T>(1), ou encore dans le cumul de la valeur minimale ou maximale des éléments d'une séquence, mais il n'est pas clair que ce serait le cas de tous les calculs possibles.

Notons enfin que les résultats de l'application de parallel::accumulate() peuvent différer de ceux d'une application de std::accumulate() sur la même séquence de valeurs, par exemple dans le cas où l'accumulation se fait sur des nombres à virgule flottante, du fait que les erreurs s'accumuleraient différemment de part et d'autre.

Programme de test

Pour tester la vitesse d'exécution des deux versions de chaque algorithme, il nous faudra mettre en place un programme de test, idéalement simple et qui comparera les deux saveurs d'un même algorithme sur les mêmes substrats.

Pour les besoins de la cause, nous inclurons un certain nombre d'en-têtes standards de même que notre en-tête maison, parallel_algorithm.h.

#include "parallel_algorithm.h"
#include <chrono>
#include <thread>
#include <iostream>
#include <algorithm>
#include <vector>
#include <numeric>
#include <iterator>
#include <functional>
#include <cmath>
#include <utility>
#include <type_traits>

Puisque notre code de test se situera dans un fichier source, un .cpp, nous pouvons nous permettre certaines déclarations using globales car celles-ci n'interféreront pas avec les pratiques d'un éventuel code client.

using namespace std;
using namespace std::chrono;

J'ai défini un trait resultat_de<F>::type, qui permet de déduire le type résultant de l'exécution d'un hypothétique appel à une instance de F en tant qu'opération nullaire – par nullaire, j'entend « sans paramètres ». L'idée va comme suit :

  • On suppose F le type d'une opération nullaire
  • On déclare sans la définir une hypothétique méthode privée de classe resultat_de<F>::test(Fct) qui, si on l'appelait en lui passant une instance f de Fct (ce que nous ne ferons pas!), aurait pour type de retour le type retourné par f(). Ce type est déduit à la compilation par l'opérateur statique decltype
  • Le type interne et public resultat_de<F>::type correspond au type retourné par un hypothétique appel à la méthode resultat_de<F>::test(F) à laquelle on aurait passé une instance de F. Puisque nous ne savons pas comment construire un F, nous utilisons declval<F>(), autre fonction hypothétique qui retournerait un F&& si nous l'avions appelée (ce que nous ne ferons pas!)
template <class F>
   struct resultat_de {
   private:
      template <class Fct>
         static auto test(Fct f) -> decltype(f());
   public:
      using type = decltype(test(std::declval<F>()));
   };

Nous avons ensuite deux versions de la fonction tester(F), soit :

  • Une qui prend en paramètre une opération nullaire f de type F dont le type de retour est void, et qui retourne simplement le temps écoulé lors de l'appel de f(), et
  • Une autre qui prend en paramètre une opération nullaire f de type F dont le type de retour n'est pas void, et qui capture la valeur retournée dans une variable locale res pour ensuite retourner une paire contenant cette valeur et le temps écoulé lors de l'appel à f()

Le trait enable_if permet au compilateur d'évacuer, à chaque occurrence de tester() dans le code source, la version qui n'est pas à propos : on ne cherchera pas à placer la valeur retournée par f() dans une variable si f() est void, après tout.

Armés de ces outils, nous pourrons plus aisément tester accumulate() (dont le type de retour est pertinent) et transform() (dont le type de retour est void).

template <class F>
   typename enable_if<
      !is_void<typename resultat_de<F>::type>::value,
      pair<system_clock::duration, typename resultat_de<F>::type>
   >::type
      tester(F && f)
   {
      auto avant = system_clock::now();
      auto res = f();
      return make_pair(system_clock::now() - avant, res);
   }
template <class F>
   typename enable_if<
      is_void<typename resultat_de<F>::type>::value,
      system_clock::duration
   >::type
       tester(F && f)
   {
      auto avant = system_clock::now();
      f();
      return system_clock::now() - avant;
   }

Nos valeurs de tests seront des nombres à virgule flottante approximant les valeurs , donc Pour les générer plus efficacement, j'aurai recours à la fonction génératrice generateur_iota(T) proposée à droite, qui utilise une λ pour retenir la prochaine valeur à retourner suite à chaque appel.

template <class T>
   function<T()> generateur_iota(T val) {
      return [=]() mutable { return val++; };
   }

Le programme de test en soi opérera sur BEAUCOUP de nombres à virgule flottante. La valeur de BEAUCOUP dépend bien entendu de la qualité du matériel à votre disposition.

int main() {
   const int BEAUCOUP = 50'000'000;
   using namespace std::this_thread;

Pour les fins des tests, dans le cas de transform(), je calculerai de vulgaires racines carrées. Pour les appels à accumulate(), dans le but de varier, j'utiliserai la λ nommée count_bits (à droite) qui prendra un float, le transtypera en int et y comptera le nombre de bits de valeur 1.

L'idée ici est d'avoir un calcul qui prenne un peu de temps pour que les temps calculés soient plus pertinents. évidemment, il est essentiel que les versions parallèle et séquentielle de l'algorithme accumulate() donnent les mêmes résultats aux erreurs normales de calcul près; utiliser des entiers nous permet d'opérer sur des valeurs exactes et facilite la validation du code.

   auto count_bits = [](float f) -> int {
      int val = static_cast<int>(f);
      int nbits = 0;
      unsigned int mask = 1;
      for(int i = 0; i < sizeof(int) * 8; ++i) {
         nbits += (val & mask)? 1 : 0;
         mask <<= 1;
      }
      return nbits;
   };

Le test séquentiel applique les algorithmes transform() et accumulate() de l'espace nommé std sur une séquence de valeurs choisies avec soin, et affiche les résultats obtenus.

   {
      vector<float> v;
      generate_n(back_inserter(v), BEAUCOUP, generateur_iota(0.5f));
      auto res_trans = tester(
         [&]() {
            std::transform(
               begin(v), end(v), begin(v), [](float f) -> float {
                  return std::sqrt(f);
               }
            );
         }
      );
      cout << "Temps d'execution total pour " << BEAUCOUP
           << " racines carrees (sequentiel) : "
           << duration_cast<milliseconds>(res_trans).count()
           << " ms." << endl;
      cout << "Temps d'execution total pour un cumul de " << BEAUCOUP
           << " reels (sequentiel) : ";
      auto res_accum = tester(
         [&]() {
            return std::accumulate(
               begin(v), end(v), 0, [&](int cur, float f) {
                  return cur + count_bits(f);
               }
            );
         }
      );
      cout << duration_cast<milliseconds>(res_accum.first).count()
           << " ms. (valeur: " << res_accum.second << ")" << endl;
   }

Le test séquentiel applique les algorithmes transform() et accumulate() de l'espace nommé parallel sur la même séquence de valeurs (c'est important pour que le test soit pertinent!), et affiche les résultats obtenus.

    {
      vector<float> v;
      generate_n(back_inserter(v), BEAUCOUP, generateur_iota{0.5f});
      auto res_trans = tester(
         [&]() {
            parallel::transform(
               begin(v), end(v), begin(v), [](float f) -> float {
                  return std::sqrt(f);
               }
            );
         }
      );
      cout << "Temps d'execution total pour " << BEAUCOUP
           << " racines carrees (parallele) : "
           << duration_cast<milliseconds>(res_trans).count()
           << " ms." << endl;
      cout << "Temps d'execution total pour un cumul de " << BEAUCOUP
           << " reels (parallele) : ";
      auto res_accum = tester(
         [&]() {
            return parallel::accumulate(
               begin(v), end(v), 0, [&](int cur, float f) {
                  return cur + count_bits(f);
               }
            );
         }
      );
      cout << duration_cast<milliseconds>(res_accum.first).count()
          << " ms. (valeur: " << res_accum.second << ")" << endl;
   }
}

À l'exécution de ce programme, j'obtiens :

Temps d'execution total pour 50000000 racines carrees (sequentiel) : 239 ms.
Temps d'execution total pour un cumul de 50000000 reels (sequentiel) : 1468 ms. (valeur: 142272644)
Temps d'execution total pour 50000000 racines carrees (parallele) : 70 ms.
Temps d'execution total pour un cumul de 50000000 reels (parallele) : 384 ms. (valeur: 142272644)
Appuyez sur une touche pour continuer...

Visiblement, l'effort rapporte.

Algorithmes parallèles et C++ 17

Ce dont nous discutons ci-dessous fait partie de la spécification technique sur la concurrence de C++ telle que prévue pour expérimentation, probablement en vue d'une inclusion dans le standard à partir de C++ 17.

La spécification technique proposée en vue de C++ 17 met en place des versions parallèles de plusieurs algorithmes standards (environ 73 d'entre eux), pas tous.

Les versions parallèles des algorithmes ne garantissent pas un ordre spécifique d'accès des éléments. Une conséquence directe de ce constat est d'être prudent(e)s avec les conditions de course. Par exemple, ceci contient une course dû à l'accès à:

int tab[] = { 0,1 };
vector<int> v;
for_each(par, begin(tab), end(tab), [&v](int n) {
   v.push_back(n * 2 + 1);
});

Ceci dépend de l'ordre d'exécution des itérations, ce qui rend son comportement indéfini :

atomic<int> x = 0;
int tab[] = { 1,2 }; // peu importe
for_each(par, begin(tab), end(tab), [&x](int) {
   x.fetch_add(1, std::memory_order_relaxed);
   // on attend qu'une autre itération modifie la valeur de x
   while (x.load(std::memory_order_relaxed) == 1)
      ;
});

Ceci, par contre, est correct :

int x = 0;
mutex m;
int tab[] = { 1,2 }; // peu importe
for_each(par, begin(tab), end(tab), [&x](int) {
   lock_guard<mutex> _ { m };
   ++x;
});

Politiques d'exécution

En vue de C++ 17, plusieurs algorithmes standards peuvent être annotés, à l'appel, d'une politique d'exécution. En conformité avec les usages existants, par défaut, ces algorithmes sont exécutés de manière séquentielle, mais il est possible de passer à un mode d'exécution parallèle (ou autre) sur demande.

Par exemple :

vector<int> v = ...;
sort(begin(v), end(v));
// ...
execution_policy pol = seq;
if (v.size() > SEUIL)
   pol = par;
sort(pol, begin(v), end(v));
// alternatif
sort((v.size() > SEUIL? par : seq), begin(v), end(v));

Il existe d'autres politiques d'exécution que par (parallèle) et seq (séquentielle), en particulier par_vec (parallèle vectorielle). Formellement, les types de politiques d'exécution sur la table au moment d'écrire ceci sont :

Un trait is_execution_policy est défini pour les politiques connues. Le standard ne définit pas le comportement d'un programme qui le spécialiserait. Dans le cas d'une politique dynamique, l'algorithme testera les options dynamiquement au démarrage selon les options qu'il connaît,  ceci explique le côté indéfini de l'ajout de politiques à celles du standard.

Les objets constexpr globaux que sont par, seq et par_vec sont les objets typiquement utilisés pour choisir une  politique d'exécution. Les politiques d'exécution sont des souhaits, pas des obligations, car il peut advenir que le matériel ne puisse répondre à la demande faute de ressources.

Pour une exécution suivant la politique par_vec, il est possible qu'une même fonction soit appelée plusieurs fois en parallèle sur un même thread. Il faut noter que le standard ne permet traditionnellement pas cela (c'est une nouveauté, alors mieux vaut garder l'effet local).

Gestion des exceptions

Une exception levée lors d'un algorithme suivant une politique d'exécution par_vec entraînera un appel à terminate().

Une exception levée lors d'un algorithme par ou seq sera soit logée dans uncaught_exception, soit ajoutée dans une exception_list. Ceci peut être utile quand un algorithme travaille en parallèle et quand chacune des « branches » parallèles est susceptible de lever une exception. Dans un tel cas, la poursuite ou non du parallélisme de l'algorithme (s'il y a lieu) est indéfinie.

Si au moins une des exceptions levées est bad_alloc, il se peut que tout se termine violemment faute de pouvoir faire mieux.

Le type exception_list offre les services suivants :

Les itérateurs permettent de parcourir des exception_ptr, et sont de la catégorie ForwardIterator.

Algorithmes spécifiquement parallèles et numériques

Certains algorithmes proposés pour standardisation, bien qu'ils puissent être exécutés séquentiellement, sont d'abord et avant tout pensés pour fins d'exécution parallèle. Ces algorithmes sont les suivants (je ne donne que quelques-unes des signatures clés) :

Algorithme Détails
reduce(debut, fin) // implicite binop = plus<>{}
reduce(debut, fin, binop)

Se comporte comme accumulate(), mais son comportement peut être indéterministe si binop est non-commutatif

exclusive_scan(debut, fin, dest, init) // implicite binop = plus<>{}
exclusive_scan(debut, fin, dest, init, binop)

Prend une séquence source et génère une séquence de même taille. Son comportement peut être indéterministe si binop est non-associatif.

Un exemple séquentiel, présumant A,B : vector<int> et A.size()<=B.size(), serait :

B[0] = 0;
for (vector<int>::size_type i = 1; i < A.size(); ++i)
   B[i] = B[i-1] + A[i-1];

Dans cet exemple, si A vaut 1,1,4,2,1 alors B vaudra 0,1,2,6,8

inclusive_scan(debut, fin, dest)
inclusive_scan(debut, fin, dest, binop)
inclusive_scan(debut, fin, dest, binop, init)

Se comporte comme un exclusive_scan mais conserve le ième élément (donc la sortie est un élément plus grand que la source).

Son comportement peut aussi être indéterministe si binop est non-associatif

transform_reduce(debut, fin, unop, init, binop)

Ces fonctions ont en commun de ne pas appliquer unop() à init, mais appliquent par contre unop() sur chaque élément impliqué avant de faire chaque binop()

transform_exclusive_scan(debut, fin, dest, unop, init, binop)
transform_inclusive_scan(debut, fin, dest, unop, binop)
transform_inclusive_scan(debut, fin, dest, unop, binop, init)

Dans la table, unop identifie une opération unaire (à un seul paramètre) et binop identifie une opération binaire (à deux opérandes)

Lectures complémentaires

Quelques liens pour enrichir le propos.


Valid XHTML 1.0 Transitional

CSS Valide !