Exemple – implémentation canonique de l'approche Map/ Reduce

Ce texte présume chez vous une familiarité avec les algorithmes standards, les conteneurs standards et le langage C++, en particulier le standard C++ 11. Comprendre les expressions λ vous aidera à suivre certains des raisonnements ci-dessous.

Notez que, bien que le code proposé ci-dessous soit fonctionnel et correct, il ne se veut pas de qualité industrielle et n'a pas été testé rigoureusement sur plusieurs plateformes. Si vous avez accès à de véritables implémentations commerciales d'algorithmes tels que ceux décrits ici, privilégiez-les.

L'exemple canonique d'approche Map/ Reduce est le calcul en parallèle du nombre de mots dans un ensemble de fichiers. Ce qui suit montre une implémentation de ce calcul, avec comparatifs pour la vitesse d'exécution séquentielle et pour la vitesse d'exécution parallèle, en utilisant divers seuils en termes de nombres de fichiers à traiter pour diviser le traitement.

Il est possible, en général, sur une même machine, de se rapprocher de l'optimalité en divisant de prime abord le traitement en fonction du nombre d'unités de traitement matérielles disponibles (en C++ 11, on parle de se baser sur std::thread::hardware_concurrency, qui retourne un entier non-signé représentant le nombre d'unités de traitement matérielles disponibles), surtout pour des tâches qui sont CPU-Bound. Dans une approche Map/ Reduce, il est d'usage de diviser le traitement en deux et de répartir les calculs sur deux ordinateurs distincts, et c'est cette démarche qui est adoptée dans l'exemple ci-dessous, bien qu'elle ne soit pas optimale pour une implémentation multiprogramée sur un seul ordinateur.

Implémentation séquentielle

Le traitement séquentiel s'exprimera comme suit :

Les éléments en retrait s'expriment algorithmiquement comme une composition de fonctions. En notation UML :

En C++, une implémentation possible de lire_fichier() serait celle proposée à droite. Pour la comprendre, ce qu'il faut savoir :

  • Il est possible d'instancier un conteneur avec une séquence standard source déterminée par une paire d'itérateurs
  • Il est possible de définir des itérateurs sur un flux ouvert en lecture
  • Les istreambuf_iterator<char> sont de tels itérateurs, et sont très rapides à l'exécution parce qu'ils ne font presque aucun autre traitement que celui de consommer un byte à la fois
  • Un itérateur sur un flux par défaut est équivalent à une erreur de lecture
  • Ainsi, à droite, la fonction lire_fichier(nom) ouvre en lecture le fichier nommé nom, et le fait en mode binaire pour éviter de donner un traitement particulier à certains caractères, puis aspire tous les char dans une string jusqu'à concurrence d'une erreur de lecture – concrètement, jusqu'à la rencontre d'une fin de flux

Cette technique est très simple et très rapide.

// ... inclusions using ...
string lire_fichier(const string &nom) {
   return string(
      istreambuf_iterator<char>{ifstream{ nom, ios::binary }},
      istreambuf_iterator<char>{}
   );
}

En C++, une implémentation possible de compter_mots() serait celle proposée à droite. Pour la comprendre, ce qu'il faut savoir :

  • L'algorithme count_if(debut, fin, pred) retourne le nombre d'éléments dans la séquence standard pour lesquels pred s'avère
  • Un istream_iterator<string> est excellent pour consommer des string d'un flux texte (par exemple l'entrée standard cin) à l'aide de l'opérateur >>
  • L'opérateur >> sur un flux texte et une string consomme un mot à la fois
  • Un stringstream est un flux en mémoire
  • La λ utilisée comme prédicat ici est une tautologie : elle répond toujours true, peu importe la string lui étant passée en paramètre

Conséquemment, compter_mots() compte, tout simplement, tous les mots du flux sstr qui, lui, contient le texte de la string nommée s. Son type de retour est le type interne et public string::size_type, utilisé pour représenter le nombre d'éléments dans une string.

#include <sstream>
#include <iterator>
#include <string>
#include <algorithm>
// ... using ...
string::size_type compter_mots(string &&s) {
   stringstream sstr;
   sstr << s;
   return count_if(
      istream_iterator<string>{sstr}, 
      istream_iterator<string>{},
      [](const string &) { return true; }
   );
}

Dans le programme de test, plus bas, vous verrez l'expression λ suivante, qui réalise concrètement la composition de ces deux fonctions :

// ...
auto fct = [](const string &nomfich) {
   return compter_mots(lire_fichier(nomfich));
};
// ...

Pour en savoir plus sur la composition de fonctions, voir ceci. C'est cette opération qui sera exécutée par la version séquentielle comme par la version parallèle de notre algorithme, et ce sur chacun des fichiers à traiter.

Le traitement séquentiel en tant que tel s'exprimera comme suit :

// ...
template <class It, class F, class T>
   T traiter_sequentiel(It debut, It fin, F fct, T init) {
      using value_type = typename
         iterator_traits<It>::value_type;
      return accumulate(debut, fin, init, [&](T cur, value_type val) {
         return cur + fct(val);
      });
   }

La séquence standard (à demi-ouverte) de noms de fichiers à traiter est déterminée par debut et fin. La fonction à appliquer sur chaque fichier est fct, et la valeur initiale du cumul est init. Il suffit d'examiner l'expression λ un peu plus haut pour constater que, puisque fct(fich) retourne le nombre mots dans un fichier nommé fich donné, alors traiter_sequentiel(debut, fin, fct, 0) retournera la somme du nombre de mots dans tous les fichiers dont le nom apparaît dans l'intervalle .

Programme de test

Le programme de test que nous utiliserons ici sera :

// ...
#include <vector>
#include <string>
#include <chrono>
#include <iostream>
// ... using ...
int main(int argc, char *argv[]) {
   auto fichiers = (argc == 1)? vector<string>(8000, "in.txt") : vector<string>(argv + 1, argv + argc);
   auto fct = [](const string &nomfich) {
      return compter_mots(lire_fichier(nomfich));
   };
   cout << fichiers.size() << " fichiers a traiter" << endl;
   cout << endl;
   cout << "Nombre de mots au total (calcul sequentiel) : ";
   {
      auto avant = system_clock::now();
      auto n = traiter_sequentiel(begin(fichiers), end(fichiers), fct, 0);
      auto apres = system_clock::now();
      cout << n << "\n\tevalue en " << duration_cast<milliseconds>(apres-avant).count() << " ms.";
   }
   cout << endl;
   for (int seuil = 256; seuil <= 4096; seuil *= 2) {
      cout << "Nombre de mots au total (calcul parallele), seuil " << seuil << " : ";
      {
         auto avant = system_clock::now();
         auto n = traiter_parallele(begin(fichiers), end(fichiers), fct, 0, seuil);
         auto apres = system_clock::now();
         cout << n << "\n\tevalue en " << duration_cast<milliseconds>(apres-avant).count() << " ms.";
      }
      cout << endl;
   }
}

Évidemment, la valeur 8000 et le contenu du fichier "in.txt" sont arbitraires : les deux ont été choisis (ou générés) dans le but de faire travailler le programme. Vous remarquerez qu'outre le seuil à partir duquel le calcul parallèle doit cesser de se subdiviser, les deux calculs se présentent de la même manière : capturer le moment présent avant et après le calcul, puis afficher le temps écoulé exprimé en millisecondes. Pour le calcul parallèle, divers seuils de subdivision sont utilisés. L'idée va comme suit :

Nous montrerons le temps d'exécution total, incluant celui de l'exécution séquentielle, pour trois implémentations distinctes de traitement_parallele() ci-dessous.

Implémentation parallèle – plusieurs threads, gestion manuelle

Une implémentation de traitement_parallele() basée sur des instances de std::thread prises en charge manuellement serait :

struct traiter_parallele {
   template <class It, class F, class T>
      T operator()(It debut, It fin, F fct, T init, int seuil) {
         auto n = distance(debut, fin);
         if (n <= seuil)
            return traiter_sequentiel(debut, fin, fct, init);
         auto p = next(debut, n / 2);
         T r0; // resultat de th
         thread th {
            [&r0,fct,init,seuil](It debut, It fin) {
               r0 = traiter_parallele{}(debut, fin, fct, init, seuil);
            }, debut, p
         };
         auto r1 = traiter_parallele{}(p, fin, fct, init, seuil);
         th.join();
         return r0 + r1;
      }
};

Pas à pas, cette implémentation procède comme suit :

À l'exécution du programme de test avec cette implémentation, j'obtiens :

8000 fichiers a traiter

Nombre de mots au total (calcul sequentiel) : 97992000
        evalue en 28735 ms.
Nombre de mots au total (calcul parallele), seuil 256 : 97992000
        evalue en 44784 ms.
Nombre de mots au total (calcul parallele), seuil 512 : 97992000
        evalue en 45048 ms.
Nombre de mots au total (calcul parallele), seuil 1024 : 97992000
        evalue en 44485 ms.
Nombre de mots au total (calcul parallele), seuil 2048 : 97992000
        evalue en 31018 ms.
Nombre de mots au total (calcul parallele), seuil 4096 : 97992000
        evalue en 22731 ms.
Appuyez sur une touche pour continuer...

Lorsque le seuil est petit, trop de threads sont lancés pour mon ordinateur, et ces threads se battent pour avoir accès au processeur. Conséquemment, la gestion des threads nuit à leur saine exécution. Ceci se voit du fait que les trois premiers seuils testés mènent à des résultats parallèles nettement moins bons que ceux d'une exécution purement séquentielle. Sur mon ordinateur à huit coeurs, cette implémentation amène rapidement tous les coeurs à travailler à .

La situation s'améliore un peu quand le seuil atteint 2048, puis le programme parallèle commence à donner de meilleurs résultats (de manière significative) que le code séquentiel une fois le seuil de 4096 atteint.

Cette implémentation fonctionne bien, mais a deux gros défaut : elle est un peu trop « bas niveau », nous forçant à utiliser une variable temporaire r0 passée par référence pour entreposer la valeur du calcul fait dans th, et elle ne gère pas les exceptions correctement si celles-ci sont levées dans le thread.

Implémentation parallèle – async(), naïf

Une implémentation de traitement_parallele() basée sur un recours un peu naïf à std::async serait :

struct traiter_parallele {
   template <class It, class F, class T>
      T operator()(It debut, It fin, F fct, T init, int seuil) {
         auto n = distance(debut, fin);
         if (n <= seuil)
            return traiter_sequentiel(debut, fin, fct, init);
         auto p = next(debut, n / 2);
         auto f0 = async(launch::async, traiter_parallele{}, debut, p, fct, init, seuil);
         return f0.get() + traiter_parallele{}(p, fin, fct, init, seuil);
      }
};

Pas à pas, cette implémentation procède comme suit :

À l'exécution du programme de test avec cette implémentation, j'obtiens :

8000 fichiers a traiter

Nombre de mots au total (calcul sequentiel) : 97992000
        evalue en 27784 ms.
Nombre de mots au total (calcul parallele), seuil 256 : 97992000
        evalue en 28206 ms.
Nombre de mots au total (calcul parallele), seuil 512 : 97992000
        evalue en 28154 ms.
Nombre de mots au total (calcul parallele), seuil 1024 : 97992000
        evalue en 28273 ms.
Nombre de mots au total (calcul parallele), seuil 2048 : 97992000
        evalue en 28115 ms.
Nombre de mots au total (calcul parallele), seuil 4096 : 97992000
        evalue en 28131 ms.
Appuyez sur une touche pour continuer...

Manifestement, cette implémentation a des vertus (elle est plus simple que la précédente), mais elle ne fonctionne pas, les résultats des exécutions « parallèles » s'apparentant aux résultats des exécutions « séquentielles » (avec un peu plus de temps consacré à la gestion de la mécanique). On le constate d'ailleurs en examinant le taux d'occupation des différents coeurs de notre ordinateur : un seul coeur est sollicité ici en pratique.

La raison est la suivante :

Cette situation un peu déplaisante s'avère, que nous ayons utilisé la politique de lancement par défaut pour notre async(), qui est de laisser le moteur choisir pour nous de réaliser un traitmenent parallèle ou séquentiel, ou que nous ayons, comme c'est le cas ici, explicitement demandé un lancement asynchrone.

Implémentation parallèle – async(), correct

Une implémentation de traitement_parallele() basée sur un recours correctement implémenté à std::async serait :

struct traiter_parallele {
   template <class It, class F, class T>
      T operator()(It debut, It fin, F fct, T init, int seuil) {
         auto n = distance(debut, fin);
         if (n <= seuil)
            return traiter_sequentiel(debut, fin, fct, init);
         auto p = debut;
         advance(p, n / 2);
         auto f0 = async(launch::async, traiter_parallele{}, debut, p, fct, init, seuil);
         auto f1 = async(launch::async, traiter_parallele{}, p, fin, fct, init, seuil);
         return f0.get() + f1.get();
      }
};

La grosse différence avec la version trop naïve proposée plus haut est qu'ici, traiter_parallele() lance deux async() distinctes et se suspend (appel bloquant au get() des deux futures) en attendant que ceux-ci aient conclu leur tâche. Cette version occupe rapidement à pleine capacité tous les coeurs d'un ordinateur.

À l'exécution du programme de test avec cette implémentation, j'obtiens :

8000 fichiers a traiter

Nombre de mots au total (calcul sequentiel) : 97992000
        evalue en 28476 ms.
Nombre de mots au total (calcul parallele), seuil 256 : 97992000
        evalue en 45734 ms.
Nombre de mots au total (calcul parallele), seuil 512 : 97992000
        evalue en 43602 ms.
Nombre de mots au total (calcul parallele), seuil 1024 : 97992000
        evalue en 44239 ms.
Nombre de mots au total (calcul parallele), seuil 2048 : 97992000
        evalue en 30915 ms.
Nombre de mots au total (calcul parallele), seuil 4096 : 97992000
        evalue en 23122 ms.
Appuyez sur une touche pour continuer...

L'exécution de la version parallèle avec un seuil de 4096 s'exécute en du temps de la version séquentielle, un gain de de temps d'exécution. Visiblement, l'effort rapporte.


Valid XHTML 1.0 Transitional

CSS Valide !