Tâches morcelables

Dans un STR où une tâche doit démarrer à intervalles réguliers ou constants (une tâche périodique), il est typique qu'il reste un temps rédisuel après la complétion de l'exécution de cette tâche et avant le démarrage de son prochain cycle. Par exemple, si une tâche doit s'exécuter à un rythme de 10 Hz, il importe que son exécution se complète dans un temps tel que . La part résiduelle du cycle est donc .

Il est commun qu'un thread se suspende pour le temps résiduel , or ce temps résiduel peut souvent être utilisé à des fins plus pertinentes, par exemple pour exécuter des tâches auxiliaires susceptibles d'améliorer la fluidité ultérieure du programme.

C'est dans cet esprit que se présente la posibilité de morceler une tâche.

Exemple concret, pas à pas

Supposons un algorithme standard tel que std::transform(). Une manière de l'exprimer serait :

template <class IIt, class OIt, class F> // IIt : input iterator; OIt : output iterator; F : fonction
   void transform(IIt ds, IIt fs, OIt dd, F f) {
      for(; ds != fs; ++ds, (void) ++dd)
         *dd = f(*ds);
   }

Cet algorithme a quelques préconditions : la paire forme un intervalle légal en entrée; f est applicable à *ds et f(*ds) peut être affecté à *dd; il y a au moins std::distance(ds,fs) éléments dans lesquels il est possible d'écrire à partir de dd; etc. Présumant que s'exécute dans un temps dont le pire cas est connu a priori, la complexité algorithmique de std::transform(ds,fs,dd,f) est , donc , donc . Clairement, sur cette base, le temps d'exécution de la fonction dépend linérairement du nombre d'éléments dans la séquence en entrée. Pour cette raison, appeler cette fonction dans la part résiduelle d'un cycle peut faire en sorte que l'algorithme appelant ne rencontre pas ses contraintes de temps (pour une fonction f donnée) si la séquence à traiter est trop longue.

Une version morcelable du même algorithme aura les propriétés suivantes :

Cela signifie qu'une version morcelable d'un algorithme comme std::transform() pourrait s'exprimer comme suit :

template <class IIt, class OIt, class F /* ... */>
   std::pair<IIt, OIt> transform_morcelable(IIt ds, IIt fs, OIt dd, F f, /* ... */) {
      for(; ds != fs && /* ... */; ++ds, (void) ++dd)
         *dd = f(*ds);
      return { ds, dd };
   }

... alors qu'un code client naïf opérant sur un vector<int> et produisant un vector<X> pour un certain type X pourrait s'exprimer comme suit (ici, nous simplifions le problème en présumant que suite au traitement, v.size()==res.size() s'avèrera) :

// ...
vector<int> v = // ...
// ...
vector<X> res;
res.reserve(v.size());
for(auto p = begin(v); !fini;) {
   // ... traitement TR essentiel
   // ... évaluer le temps résiduel disponible
   auto rt = transform_morcelable(p, end(v), back_inserter(res); /* ... */);
   p = re.first;
}

Ce code est naïf du fait qu'il ne permet pas de passer à un autre bloc de données à traiter une fois la première incarnation de v pleinement traitée, mais donne un aperçu de ce qu'il faudrait faire en pratique.

Choisir une modalité d'interruption

Plusieurs modalités d'interruption sont possibles. L'une d'elles se base sur le nombre d'itérations à exécuter, estimé dans ce cas par le code client :

template <class IIt, class OIt, class F>
   std::pair<IIt, OIt> transform_morcelable(IIt ds, IIt fs, OIt dd, F f, int nit) {
      for(int n = 0; ds != fs && n < nit; ++ds, (void) ++dd, ++n)
         *dd = f(*ds);
      return { ds, dd };
   }

Une autre se base sur le dépassement d'une échéance fixée par le code client (qui devrait tenir compte du coût estimé d'une itération de l'algorithme pour éviter de déborder du temps alloué pour son propre cycle d'exécution). Dans l'exemple ci-dessous, cette échéance est représentée par un délai dt suivant l'appel de la fonction :

template <class IIt, class OIt, class F, class T>
   std::pair<IIt, OIt> transform_morcelable(IIt ds, IIt fs, OIt dd, F f, T dt) {
      auto t0 = system_clock::now();
      auto tf = t0 + dt;
      for(int n = 0; ds != fs && t0 < tf; ++ds, (void) ++dd, ++n)
         *dd = f(*ds);
      return { ds, dd };
   }

L'approche la plus simple est cependant de suppléer à la fonction appelée un prédicat de poursuite choisi par le code client. Ceci offre une souplesse maximale dans l'implémentation et découple la fonction morcelée des besoins réels du code client :

template <class IIt, class OIt, class F, class Pred>
   std::pair<IIt, OIt> transform_morcelable(IIt ds, IIt fs, OIt dd, F f, Pred pred) {
      for(int n = 0; ds != fs && pred(); ++ds, (void) ++dd, ++n)
         *dd = f(*ds);
      return { ds, dd };
   }

À titre d'exemple, dans cette dernière approche, il est possible de modéliser les deux approches précédentes, plus spécifiques, par des foncteurs :

Approche comptabilisant les itérations Approche validant le non-dépassement d'une échéance
class CompterIterations {
   mutable int n = 0;
   int seuil;
public:
   CompterIterations(int seuil) : seuil{ seuil } {
   }
   bool operator()() const {
      return ++n < seuil;
   }
};
class RepecterEcheance {
   system_clock::time_point base = system_clock::now();
   system_clock::time_point echeance;
public:
   RespecterEcheance(system_clock::duration dt) {
      echeance = base + dt;
   }
   bool operator()() const {
      return system_clock::now() < echeance;
   }
};
Code client Code client
// ...
vector<int> v = // ...
// ...
vector<X> res;
res.reserve(v.size());
for(auto p = begin(v); !fini;) {
   // ... traitement TR essentiel
   auto rt = transform_morcelable(p, end(v), back_inserter(res); CompterIterations{ 10 }); // 10 itérations max
   p = re.first;
}
// ...
vector<int> v = // ...
// ...
vector<X> res;
res.reserve(v.size());
for(auto p = begin(v); !fini;) {
   // ... traitement TR essentiel
   auto rt = transform_morcelable(p, end(v), back_inserter(res); RespecterEcheancs{ 10ms }); // 10ms max
   p = re.first;
}

Voilà!


Valid XHTML 1.0 Transitional

CSS Valide !