Framework Executor du JDK : CountedCompleter

Dans la famille des tâches de fork / join, le dernier arrivé est CountedCompleter. Il a été ajouté dans le JDK 8 alors que les autres datent du JDK 7.

Comme le développement d’une telle tâche est un peu plus complexe, et que les bonnes références sont difficiles à trouver sur le Web, CountedCompleter a droit à sa propre page.

  • CountedCompleter
    ForkJoinTask<V>

    • compute()

    • getRawResult(): V

    • onCompletion(CountedCompleter<?> caller)
       

    • tryComplete()

    • setPendingCount(int count)

    • addToPendingCount(int delta)

Comme pour les actions recursives, il faut implémenter une méthode compute(): void. Cette méthode peut exécuter effectivement l’action ou se scinder tâches plus petites.
Comme pour les tâches récursives, la méthode getRawResult() renvoie le résultat. Par contre, par défaut, elle renvoie null et c’est à nous de la redéfinir pour avoir un vrai résultat.

Compteur

La grande différence avec les tâches et actions récursives, c’est la notion de complétion basée sur un compteur. Chaque objet CountedCompleter a son propre compteur de sous-tâches qu’on doit incrémenter expliciter à chaque appel de fork().

    this.addToPendingCount(2);
    new BigTask(this, leftList).fork();
    new BigTask(this, rightList).fork();

Fin de tâche

Chaque tâche doit appeler tryComplete() en fin de traitement pour que le compteur de son parent puisse être décrémenté. A l’appel de cette méthode, si le compteur est positif, il est décrémenté et s’il est à zéro, l’action est considérée comme terminée et la méthode onCompletion(…​) est appélée.

  @Override
  public void compute() {
    // ...
    tryComplete();
  }

Arbre de tâches

L’autre grande différence, avec les tâches et actions récursive est sous-tendue par ce fonctionnement : les tâches sont organisées explicitement sur des relations parent / enfant. Pour assurer ça, il faut bien appeler le constructeur CountedCompleter(CountedCompleter<?> completer).

class BigTask extends CountedCompleter<Void> {
  public BigTask(List<Integer> data) {
    this.data = data;
  }

  public BigTask(CountedCompleter parent, List<Integer> data) {
    super(parent);
    this.data = data;
  }

  //...
}

Action sans retour

Si on n’attend pas de résultat, c’est assez proche d’un action récursive.

public BigTask extends CountedCompleter<Void> {
  public void compute() {
    if (simple) {
      return doTheJob();
    } else {
      // Compteur
      this.addToPendingCount(2);
      // Fork
      new CustomTask(this, leftData).fork();
      new CustomTask(this, rightData).fork();
    }
    // Fin de la tâche, pas de join
    this.tryComplete();
  }
}

Tâche avec retour

C’est un peu plus compliqué pour calculer et retourner un résultat global. Tout d’abord, il faut redéfinir la méthode getRawResult(). Et pour ça, il faut que notre objet ait calculé un résultat à retourner.

Pour ça, il y a plusieurs possibilité. Par exemple, on peut utiliser un objet partagé sous forme d’un AtomicLong ou d’un AtomicInteger pour des formats simples, ou un AtomicReference<?> pour un objet plus complexe. Cette façon de faire est assez simple mais ne me plait pas parce que cet objet peut devenir un point de contension si on augmente le nombre de threads.

Je préfère une solution basée sur une redéfinition de la méthode onCompletion(…​) où chaque tâche calcul son résultat à partir de celui de ses enfants.

public BigTask extends CountedCompleter<Void> {
  private final List<BigTask> children = new ArrayList<>();

  public void compute() {
    if (simple) {
      return doTheJob();
    } else {
      children.add(new BigTask(this, leftList));
      children.add(new BigTask(this, rightList));
      // Compteur
      this.addToPendingCount(children.size());
      // Fork
      children.forEach(ForkJoinTask::fork);
    }
    // Fin de la tâche
    this.tryComplete();
  }

  @Override
  public void onCompletion(CountedCompleter<?> caller) {
    // compute result based on children and local result
    this.result = ...;
  }

  @Override
  public Integer getRawResult() {
    return result;
  }
}