Serveur d'entrées/ sorties non-bloquantes

Dans un système temps réel (STR), il arrive fréquemment qu'un thread assujetti à des contraintes TR doive réaliser des entrées/ sorties. Il existe des mécanismes pour y arriver, surtout quand un programme s'exécute sur un système d'exploitation TR, mais dans bien des cas, il est nécessaire pour le thread TR de déléguer cette responsabilité à une entité tierce, s'exécutant en parallèle avec lui, pour éviter que le caractère a priori imprévisible des accès aux médias de masse ne vienne empêcher le thread TR de bien faire son travail.

Ce qui suit est un petit exemple simple d'un serveur d'entrées/ sorties non-bloquantes. J'ai implémenté deux modalités :

Ces exemples sont très simples, et ne sont pas vouées à un usage industriel (en fait, je n'ai implémenté que les entrées, alors c'est clairement une ébauche, sans plus!). J'espère simplement vous donner, par ceci, des pistes qui vous permettront, si le besoin s'en fait sentir, d'implémenter une solution à la hauteur de vos propres attentes.

Outils d'ordre général

J'utilise à quelques endroits l'idiome de classe Incopiable. Suivez le lien si vous avez besoin de plus d'information à son sujet.

J'utilise dans ces exemples un schème de numérotation séquentielle des tâches. Pour ce faire, j'ai écrit un petit singleton du nom de GenerateurSequentiel, dont le détail suit :

Fichier GenerateurSequentiel.h
#ifndef GENERATEUR_SEQUENTIEL_H
#define GENERATEUR_SEQUENTIEL_H
#include <atomic>
template <class S>
   class GenerateurSequentiel
   {
   public:
      using value_type = S;
   private:
      std::atomic<value_type> cur;
      GenerateurSequentiel()
         : cur{ value_type{} }
      {
      }
   public:
      static GenerateurSequentiel& get()
      {
         static GenerateurSequentiel singleton;
         return singleton;
      }
      value_type prochain()
         { return cur++; }
   };
#endif

J'ai utilisé un compteur atomique pour la génération d'identifiants parce que cette classe est susceptible d'être utilisée dans des programmes multiprogrammés. Pour le reste, on parle d'un singleton tout ce qu'il y a de plus banal. Notez que cette classe est incopiable en vertu de son attribut atomique.

Les tâches d'entrées/ sorties numérotées sont dites « identifiables ». J'ai utilisé une petite classe à cet effet.

Fichier Identifiable.h
#ifndef IDENTIFIABLE_H
#define IDENTIFIABLE_H
#include "GenerateurSequentiel.h"
#include "Incopiable.h"
template <class ID>
   class Identifiable
      : Incopiable
   {
   public:
      using id_type = ID;
   private:
      id_type id_;
   public:
      Identifiable()
         : id_{GenerateurSequentiel<id_type>::get().prochain()}
      {
      }
      id_type id() const noexcept
         { return id_; }
   };
#endif

J'ai fait d'Identifiable<T> une classe Incopiable du fait que j'exploite des objets identifiés de manière unique, mais c'est peut-être abusif.

Classe entree_nb

Une tâche d'entrée sur disque telle que je les ai implémentées ici peut signaler sa progression en retournant des états intermédiaires ou en levant des exceptions indiquant l'état de la progression de la lecture. Je sais que plusieurs préfèrent limiter le recours aux exceptions aux cas d'erreur, mais je suis plus flexible à ce sujet, préférant les utiliser pour des cas de dérogation au comportement jugé normal d'une fonction (après tout, on les nommes « exceptions », pas « erreurs », non?).

J'ai implémenté les deux comportements, pour que vous puissiez comparer. Les deux sont dans un même fichier (entree_nb.h) mais ont été placées dans des espaces nommés distincts (traditionnel::entree_nb et avec_exceptions::entree_nb, respectivement), pour permettre au code client de choisir plus aisément entre l'une et l'autre. Je les ai placées côte à côte dans le but de simplifier la lecture, mais en conservant dans chaque cas une esquisse de l'autre pour faciliter le repérage de votre part.

Remarquez que la version utilisant un code de retour pour la progression ne peut retourner un tampon de lecture pleinement rempli, ce qui entraîne un impact sur l'interface de la méthode lire(). C'est un avantage de la version avec exceptions, dont l'interface est un peu plus naturelle. De même, dans la version traditionnelle, les états internes de la tâche de lecture sont exposés publiquement, étant retournés au code client pour analyse, alors qu'ils peuvent demeurer privés dans le cas de la version avec exceptions; cette dernière doit par contre exposer les types des exceptions possibles, le cas « traitement complété » n'étant pas couvert puisque dans ce cas, il suffit de retourner le résultat de la lecture.

Fichier entree_nb.h
Version « traditionnelle  Version avec exceptions
#ifndef ENTREES_SORTIES_NON_BLOQUANTES
#define ENTREES_SORTIES_NON_BLOQUANTES
#include <iosfwd>
#include <iomanip>
namespace traditionnel
{
   template <class C>
      class entree_nb
      {
      public:
         enum Etat { Inactif, EnTraitement, Complete };
         using size_type = typename C::size_type;
         using value_type = typename C::value_type;
      private:
         std::istream &is;
         Etat etat;
         C buf;
      public:
         entree_nb(std::istream &is)
            : is{is}, etat{Inactif}
         {
         }
         Etat lire(C &dest, size_type n)
         {
            switch (etat)
            {
            case Inactif:
            case Complete:
               // dꣵter le travail
               buf.clear();
               etat= EnTraitement;
               break;
            case EnTraitement:
               is >> noskipws;
               {
                  value_type temp;
                  for (size_type i = 0; i < n && is_ >> temp; ++i)
                     buf.push_back(temp);
               }
               dest = buf;
               etat = Complete;
               break;
            }
            return etat;
         }
      };
}
namespace avec_exceptions
{
   template <class C>
      class entree_nb
      {
         // ...
      };
}
#endif
#ifndef ENTREES_SORTIES_NON_BLOQUANTES
#define ENTREES_SORTIES_NON_BLOQUANTES
#include <iosfwd>
#include <iomanip>
namespace traditionnel
{
   template <class C>
      class entree_nb
      {
         // ...
      };
}
namespace avec_exceptions
{
   template <class C>
      class entree_nb
      {
      public:
         class Inactif {};
         class EnTraitement {};
         using size_type = typename C::size_type;
         using value_type = typename C::value_type;
      private:
         enum Etat { INACTIF, EN_TRAITEMENT };
         std::istream &is;
         Etat etat;
         C buf;
      public:
         entree_nb(std::istream &is)
            : is{is}, etat{INACTIF}
         {
         }
         C lire(size_type n)
         {
            if (etat == INACTIF)
            {
               // débuter le travail
               buf.clear();
               etat= EN_TRAITEMENT;
               throw EnTraitement{};
            }
            is >> noskipws;
            value_type temp;
            for (size_type i = 0; i < n && is >> temp; ++i)
               buf.push_back(temp);
            etat = INACTIF;
            return buf;
         }
      };
}
#endif

Les constructeurs de part et d'autre jouent des rôles analogues. Les tâches de lecture de part et d'autre progressent par étapes (lectures de n éléments par appel). Pour le reste, les différences sont plus une question de préférences individuelles du code client.

Classe ServeurEntreesSorties

La classe ServeurEntreesSorties est un pImpl d'abord pour des raisons historiques (j'avais implémenté les threads manuellement pour une version pré-C++ 11), puis pour réduire le couplage et diminuer les temps de compilation.

 Fichier ServeurEntreesSorties.h

La classe se nomme ServeurEntreesSorties bien que, dans cette implémentation simpliste, elle ne fasse que des entrées.

Dans son incarnation idéale, ce serait un singleton, ce qui permettrait de contrôler l'équilibre systémique d'un STR : avec un seul serveur pour les entrées / sorties, le contrôle des conséquences de son action sur le système est typiquement plus facile à évaluer.

Quelques types internes et publics sont définis d'office :

  • Le type id_type sera utilisé pour identifier les tâches d'entrées/ sorties prises en charge
  • Le type conteneur_type est utilisé pour retourner les données lues. Ici, j'abuse un peu, ne retournant que des vector<char> car les lectures, bien qu'a priori génériques, sont ainsi contraintes à ne lire que des séquences de caractères (ou de bytes bruts, si nous raffinons un peu les paramètres des tâches). Je vous laisse faire les changements qui s'imposent pour en arriver à une implémentation moins contraignante, et
  • Le type size_type est utilisé pour représenter la quantité de données à lire du fichier. Par défaut, la lecture consommera le fichier en entier. Dans cette implémentation, la lecture commence toujours au début du fichier

L'implémentation du pimpl est classique, et passe par un pointeur intelligent pour simplifier le tout.

Le ServeurEntreesSorties encapsule un thread pour s'exécuter concurremment avec son code client, ce qui lui permet de ne pas bloquer ce dernier pendant que les entrées/ sorties se font.

#ifndef SERVEUR_ENTREES_SORTIES_H
#define SERVEUR_ENTREES_SORTIES_H
#include <string>
#include <vector>
#include <memory>
#include <thread>
//
// PRUDENCE: il y a un bogue dans VS qui provoque un deadlock si un join() est fait
// sur un thread après la fin de main()... C'est pourquoi j'ai retiré (temporairement)
// le singleton du portrait ici
//
class ServeurEntreesSorties
{
   class Impl;
   std::unique_ptr<Impl> impl;
   std::thread th;

Le code pour arriver à transformer ServeurEntreesSorties en singleton a été laissé en commentaire ici (constructeur par défaut privé; service de classe get() pour accéder indirectement au singleton), car il existe un vilain bogue dans l'implémentation de std::thread avec Visual Studio 2013 qui fait qu'un join() réalisé après la fin de main(), par exemple dans le destructeur du ServeurEntreesSorties, provoque un interblocage (un vilain Deadlock).

J'ai donc dû passer par un serveur dont la portée est délimitée par celle de main() pour cet exemple; cette rustine est apparente dans le programme de test, plus bas.

public: // rendre privé s'il s'agit d'un singleton
   ServeurEntreesSorties();
public:
   using id_type = int;
   using conteneur_type = std::vector<char>;
   using size_type = conteneur_type::size_type;
   //static ServeurEntreesSorties& get()
   //{
   //   static ServeurEntreesSorties singleton;
   //   return singleton;
   //}

Les services clés de ServeurEntreesSorties sont :

  • Ajouter une tâche à traiter, et
  • Obtenir le résultat d'une tâche une fois celle-ci complétée

Le temps entre l'ajout d'une tâche et l'obtention du résultat est inconnu au préalable, mais se fait (par définition ici) en arrière-plan et de manière asynchrone.

Les types fermant la marche sont des cas possibles d'exceptions que le ServeurEntreesSorties est susceptible de lever si une erreur survient.

   id_type ajouterTache(const std::string&, size_type);
   id_type ajouterTache(const std::string&);
   conteneur_type obtenirResultat(id_type);
   ~ServeurEntreesSorties();
};
class TacheInexistante {};
class FichierInvalide {};
#endif

L'interface du ServeurEntreesSorties est donc somme toute assez simple, et surtout très perfectible; c'est dans l'implémentation que certaines subtilités apparaissent.

 Fichier ServeurEntreesSorties.cpp

Tout d'abord, j'ai choisi pour cet exemple les entrées/ sorties non-bloquantes rapportant leur progression avec des exceptions. Pour changer la modalité, il suffit de changer le using qui fait ce choix, et de modifier une méthode de TacheES<C,ID> plus bas du fait que la signature de la fonction pour réaliser une étape de la lecture ne concorde pas dans ces deux cas.

#include "ServeurEntreesSorties.h"
#include "es_nb.h"
#include "Identifiable.h"
#include <list>
#include <fstream>
#include <algorithm>
#include <limits>
#include <string>
#include <chrono>
#include <atomic>
#include <mutex>
#include <cassert>
using namespace std;
using namespace std::chrono;
using avec_exceptions::entree_nb;
//using traditionnel::entree_nb;

La représentation interne d'une tâche d'entrée/ sortie est une instance de TacheES<C,ID>. Chaque instance de cette classe porte un identifiant unique, en vertu de son parent Identifiable<ID>.

Notez ici le recours à l'héritage privé, puisque nous ne parlons pas de polymorphisme mais bien de composition. Les using pour id_type et id permettent à l'enfant, TacheES<C,ID>, d'exposer sélectivement des membres et des types du parent ainsi caché.

template <class C, class ID>
   class TacheES
      : Identifiable<ID>
   {
   public:
      using id_type = typename Identifiable<ID>::id_type;
      using Identifiable<ID>::id;
      using conteneur_type = C;
      using size_type = typename conteneur_type::size_type;

Les états importants d'un TacheES sont :

  • le conteneur de destination, pour entreposer les données lues;
  • le nombre de bytes à lire au total;
  • des marqueurs indiquant comment s'est complété le traitement;
  • le flux duquel lire; et
  • l'objet réalisant la lecture à proprement dit.
   private:
      conteneur_type dest_;
      size_type nbytes_;
      bool consommee_, erreur_lecture_;
      ifstream flux_; // ICI: ne pas changer l'ordre de déclaration
      entree_nb<conteneur_type> es_nb; // de ces deux attributs

À la construction, une TacheES apprend le nom du fichier dont il faut consommer les données et le nombre de bytes à en consommer. L'ouverture du fichier est faite immédiatement, et une exception à saveur diagnostique est levée si un problème est constaté.

   public:
      TacheES(const string &src, size_type n)
         : nbytes_{n}, consommee_{}, erreur_lecture_{},
           flux_{src}, es_nb{flux_}
      {
         if (!flux_)
            throw FichierInvalide{};
      }
      conteneur_type& destination() noexcept
         { return dest_; }
      const conteneur_type& destination() const noexcept
         { return dest_; }
      bool operator==(id_type id) const noexcept
         { return id() == id; }
      size_type nb_a_lire() const noexcept
         { return nbytes_; }
      bool est_consommee () const noexcept
         { return consommee_; }
      void consommer() noexcept
         { consommee_ = true; }
      size_type reste_a_lire() const noexcept
         { return nb_a_lire() - destination().size(); }
      bool lecture_completee() const noexcept
         { return !reste_a_lire() || erreur_lecture(); }
      bool erreur_lecture() const noexcept
         { return erreur_lecture_; }

Enfin, la méthode qui réalise une étape de la lecture à faire, lire(), est implémentée pour la version notant les changements d'étape par des exceptions (sans commentaires à droite) et pour la version signalant ces changements d'étapes par des codes de succès ou d'échec (en commentaires).

Vous remarquerez que les deux se ressemblent beaucoup, mais que la version impliquant des exceptions est un peu plus simple, se limitant à traiter le cas « normal » d'un succès.

En pratique, lire() sera appelé plusieurs fois, jusqu'à ce que les données à lire aient été pleinement consommées ou encore jusqu'à ce qu'un problème grave soit survenu.

      void lire()
      {
         if (auto n = reste_a_lire())
         {
            auto ct = es_nb.lire(n);
            if (ct.size () < n)
               erreur_lecture_ = true;
            dest_.insert(end(dest_), begin(ct), end(ct));
         }
      }
   };

Évidemment, la classe TacheES<C,ID> ne fait rien en soi; il s'agit d'un outil destiné à être pris en charge par un thread tiers. Poursuivons donc avec l'implémentation du pImpl nommé ServeurEntreesSorties::Impl.

 Fichier ServeurEntreesSorties.cpp (suite)

La classe expose quelques types internes et publics simples, pour alléger l'écriture.


class ServeurEntreesSorties::Impl
{
public:
   using conteneur_type = ServeurEntreesSorties::conteneur_type;
   using id_type = ServeurEntreesSorties::id_type;
   using type_tache = TacheES<conteneur_type, id_type>;

Les ajouts de tâches à réaliser et les suppressions de tâches traitées se font concurremment, ce qui explique le recours à une liste protégée par mutex.

La variable terminer_ servira de signal pour l'arrêt du thread. L'écriture dans cette variable se faisant typiquement à partir du destructeur d'un ServeurEntreesSorties alors que la lecture de cette variable se fait dans le thread réalisant les entrées/ sorties de manière asynchrone, nous avons recours ici à une atomique.

private:
   mutex m;
   atomic<bool> terminer_;
   list<unique_ptr<type_tache>> taches_;

Le contrôle de la variable terminer_ est plutôt simple : initialement false, elle ne peut que devenir true en cours de route. J'ai volontairement choisi de ne pas offrir la possibilité de redémarrer le serveur, pour réduire au minimum la complexité de l'implémentation.

public:
   Impl()
      : terminer_{}
   {
   }
   void terminer() noexcept
      { terminer_ = true; }
   bool doit_terminer() const noexcept
      { return terminer_; }

L'ajout d'une tâche à traiter est un exemple classique d'insertion dans une liste synchronisée. Notez que l'identifiant de la tâche est retourné pour faciliter la tâche de suivre le déroulement de l'opération dans le code client.

   id_type ajouter(unique_ptr<type_tache> &&p)
   {
      assert(p);
      lock_guard<mutex> _ { m };
      auto id = p->id();
      taches_.push_back(move(p));
      return id;
   }

La méthode obtenirResultat() est un peu plus complexe. Notez en particulier que dans le cas où le code client demanderait le résultat d'une tâche qui ne s'est pas complétée, cette implémentation lèvera une exception, ce qui n'est probablement pas approprié; il serait plus près des usages dans de tels cas de bloquer le thread courant jusqu'à ce que l'exécution de la tâche ait été complétée.

   conteneur_type obtenirResultat(id_type id)
   {
      lock_guard<mutex> _ { m };
      auto itt = find_if(begin(taches_), end(taches_), [id](const unique_ptr<type_tache> &p) { return id == p->id(); });
      if (itt == end(taches_))
         throw TacheInexistante{};
      auto &p = *itt;
      if (!p->lecture_completee())
         throw avec_exceptions::entree_nb<conteneur_type>::EnTraitement{};
      p->consommer();
      return p->destination();
   }

La méthode agir() est celle qui sera exécutée dans le thread réalisant les entrées/ sorties de manière asynchrone. Elle est simple : elle tente de lire un peu de toutes les tâches demandées (je ne prétends pas du tout que ce soit la pratique optimale; c'est un exemple, sans plus), puis elle fait un peu de nettoyage pour éliminer de la liste des tâches celles qui ont été complétées.

Le tout se fait en possession du mutex, les opérations sur la liste étant nombreuses. C'est une granularité de verrouillage perfectible, car un peu grossière : un demander de tâche d'entrées/ sorties pourrait se trouver bloquée sur ajout d'une tâche du fait que le mutex protégeant la liste est tenu trop longtemps par cette méthode.

   void agir()
   {
      lock_guard<mutex> _ { m };
      lire();
      nettoyer();
   }

La méthode lire() est simple, tentant d'appeler lire() sur chacune des tâches listées.

Notez la présence d'accolades redondantes pour délimiter la portée de la répétitive for : elles sont là parce que Visual Studio 2013 a un bogue qui fait en sorte qu'il ne voit pas le bloc try dans ce cas si les accolades ne délimitent pas le for.

private:
   void lire()
   {
      for (auto & p : taches_) // les accolades superflues tiennent a un bogue de VS
      {
         try {
            p->lire();
         } catch (...) {
         }
      }
   }

La dernière méthode de cette classe a pour rôle d'éliminer les tâches dont l'exécution a été complétée. Elle est plutôt banale.

   void nettoyer()
   {
      for (auto it = begin(taches_); it != end(taches_); )
         if ((*it)->est_consommee())
            it = taches_.erase(it);
         else
            ++it;
   }
};

Ne reste plus qu'à examiner les quelques services restants de la classe ServeurEntreesSorties.

 Fichier ServeurEntreesSorties.cpp (suite)

Les deux méthodes ajouterTache() jouent le même rôle, à ceci près que la plus simple demande de lire la totalité du fichier, ce qui permet d'exprimer la plus simple en termes de la plus sophistiquée, une pratique qui simplifie l'entretien.

auto ServeurEntreesSorties::ajouterTache(const string &nomFich, size_type n) -> id_type
{
   return impl->ajouter(make_unique<Impl::type_tache>(nomFich, n));
}
auto ServeurEntreesSorties::ajouterTache(const string &nomFich) -> id_type
{
   return ajouterTache(nomFich, numeric_limits<size_type>::max());
}

La méthode obtenirResultat() est une simple délégation.

Le constructeur assure l'instanciation de l'implémentation et démarre un thread pour prendre en charge son exécution asynchrone.

Enfin, le destructeur signale la fin à ce thread et en attend la complétion.


auto ServeurEntreesSorties::obtenirResultat(id_type id) -> conteneur_type
   { return impl->obtenirResultat(id); }
ServeurEntreesSorties::ServeurEntreesSorties()
   : impl{ make_unique<Impl>() }
{
   th = thread([](unique_ptr<Impl> &p) {
      while (!p->doit_terminer())
         p->agir();
   }, ref(impl));
}
ServeurEntreesSorties::~ServeurEntreesSorties()
{
   impl->terminer();
   th.join();
}

Programme de test

Le programme de test est trop simple pour mettre en valeur notre serveur, mais donne tout de même un aperçu de son utilisation. À noter :

Le code suit.

Fichier Principal.cpp
#include "ServeurEntreesSorties.h"
#include <iostream>
#include <string>
using namespace std;
int main()
{
   // auto & srv_es = ServeurEntreesSorties::get();
   ServeurEntreesSorties srv_es;
   auto id = srv_es.ajouterTache("fichier.txt"); // il pourrait y en avoir plusieurs autres, dans
                                                 // la mesure où l'on conserve les id
   try
   {
      auto id2 = srv_es.ajouterTache("patate.txt", 20);
   }
   catch (...)
   {
   }
   string dest;
   bool ok = {};
   do
   {
      // faire des trucs...
      try
      {
         auto cont = srv_es.obtenirResultat(id);
         dest = string{begin(cont), end(cont)};
         ok = true;
      }
      catch (...)
      {
      }
   }
   while (!ok);
   cout << dest << endl;
}

En espérant que le tout vous soit utile...


Valid XHTML 1.0 Transitional

CSS Valide !