Class: PRuby::TaskBag

Inherits:
Object show all
Defined in:
lib/pruby/task_bag.rb

Overview

Note:

Il s’agit (pour l’instant?) d’une mise en oeuvre naive, car elle utilise un Channel pour le sac (la queue/file) des taches… Il y a donc deux niveaux de verrous :( Toutefois, le verrou de TaskBag est crucial pour que cela fonctionne correctement: l’utilisation de Channel est uniquement une facon simple de signaler qu’il n’y aura plus rien comme taches en utilisant EOS.

Classe pour un sac dynamique de taches -- donc dans lequel on peut retirer mais aussi ajouter des taches de facon dynamique, i.e., en cours d'execution.

Constant Summary

DEBUG =
true && false

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(nb_threads) ⇒ TaskBag

Note:

Plus precisement, nb_threads indique combien de threads vont faire des get sur le sac. C’est ce nombre qui permet de detecter la terminaison: quand nb_threads-1 threads sont bloques sur le get et que le dernier thread fait un get, alors tous les threads sont reactives avec la valeur nil comme resultat du get. Si le nombre de threads qui obtiennent des valeurs du sac est different de nb_threads, alors le comportement quant a la terminaison pourrait etre incorrect.

Cree un nouveau sac de taches.

Parameters:

  • nb_threads

    Le nombre de threads qui pourraient bloquer (via un get) sur ce sac de taches.



34
35
36
37
38
39
40
41
42
43
# File 'lib/pruby/task_bag.rb', line 34

def initialize( nb_threads )
  @nb_threads = nb_threads
  @nb_threads_en_attente = 0
  @mutex = Mutex.new
  @cond = ConditionVariable.new
  @attente_fin = ConditionVariable.new
  @queue = PRuby::Channel.new
  @termine = false
  @nb_termines = 0
end

Class Method Details

.create_and_run(nb_threads, *tasks) {|tb, k| ... } ⇒ Array<Object> Also known as: run

Lance l'execution d'un groupe de threads qui vont se partager un sac de taches, sac qu'ils utiliseront par l'intermediaire d'appels each et, s'ils generent de nouvelles taches, par des appels a put.

Exemple typiques d'utilisation -- execute avec 4 threads et avec comme taches initiales t1, t2, ..., tk:

resultats = TaskBag.run( 4, t1, t2, ..., tk ) do |tb|

...
tb.each do |task|
   traiter task
end
resultat_a_retourner

end

Parameters:

  • nb_threads (Fixnum)

    nombre de threads a lancer

  • tasks (Array<Object>)

    les taches a mettre dans le sac avant de lancer les threads

Yield Parameters:

  • tb (TaskBag)

    le sac de taches partage par les threads

  • k (Fixnum)

    le numero du thread

Yield Returns:

  • (Object)

    le resultat produit a la fin par le thread

Returns:

  • (Array<Object>)

    la liste des resultats retournes par chacun des threads

Ensures:

  • return.size == nb_threads

Requires:

  • le bloc utilise each pour obtenir les taches qu’il devra traiter et n’utilise pas get – mais peut utiliser put pour ajouter des nouvelles taches, identifiees en cours de traitement



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/pruby/task_bag.rb', line 75

def self.create_and_run( nb_threads, *tasks )
  DBC.require !tasks.empty?, "*** Dans run/create_and_run: Il faut specifier au moins une tache"

  # On cree le sac de taches avec ses taches initiales.
  tb = new( nb_threads )
  tasks.each do |task|
    tb.put task
  end

  # On lance les threads, on attend qu'ils terminent et on
  # retourne leurs resultats.
  (0...nb_threads)
    .map { |k|  PRuby.future { yield tb, k } }
    .map(&:value)
end

.getObject



185
186
187
# File 'lib/pruby/task_bag.rb', line 185

def self.get
  fail "*** Lorsque each est utilise, il ne faut pas faire d'appel a get"
end

Instance Method Details

#done?Bool

Indique si le sac est devenu inactif parce que vide et que tous

les threads qui l'utilisaient sont devenus inactifs -- i.e.,
bloques en attente d'une nouvelle tache.

Returns:

  • (Bool)


213
214
215
# File 'lib/pruby/task_bag.rb', line 213

def done?
  @termine
end

#each {|une| ... } ⇒ void

This method returns an undefined value.

Itere sur des elements du sac de taches.

Plus precisement, les elements du sac sont repartis entre les divers threads qui utilisent le sac, et donc un thread donne n'obtient qu'un sous-ensemble des elements mis dans le sac.

Si un thread utilise each, *il ne doit pas utilise get*. Par contre, il peut utiliser put pour ajouter un tache dans le sac.

Le each se termine lorsque tous les threads qui utilisent le sac de taches sont bloques en attente d'une tache.

Yield Parameters:

  • une (Object)

    tache a traiter

Yield Returns:

  • (void)

Requires:

  • aucun appel a get n’est effectue sur le sac!



183
184
185
186
187
188
189
190
191
192
# File 'lib/pruby/task_bag.rb', line 183

def each
  alias :old_get :get
  def self.get
    fail "*** Lorsque each est utilise, il ne faut pas faire d'appel a get"
  end

  while task = old_get
    yield( task )
  end
end

#getnil, Object

Retire une tache du sac de taches. Bloque si le sac est

actuellement vide mais que d'autres taches pourraient etre
ajoutees parce que des threads sont encore actifs.

Returns:

  • (nil, Object)

    La tache obtenue, ou bien nil lorsqu’il n’y a plus de taches… et qu’il ne pourra plus y en avoir (tous les nb_threads threads sont en attente d’une tache).



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/pruby/task_bag.rb', line 121

def get
  puts "TaskBag#get" if DEBUG

  task = nil
  @mutex.synchronize do
    loop do
      unless @queue.empty?
        # Il y a une tache disponible.
        task = @queue.get
        puts "Il y a une tache disponible: task = #{task.inspect}" if DEBUG
        if task == EOS
          # EOS => privee a mise en oeuvre.
          task = nil
          @nb_termines += 1
        end
        break
      end

      puts "Pas de tache" if DEBUG
      if @nb_threads_en_attente == @nb_threads - 1
        puts "... et dernier thread actif!!" if DEBUG
        # Dernier thread actif: on signale aux autres threads
        # qu'il n'y aura plus rien.
        @queue.put EOS
        @termine = true
        @cond.broadcast
      else
        puts "... mais d'autres threads encore actifs!!" if DEBUG
        # D'autres threads sont encore actifs et pourraient
        # ajouter des taches: on bloque.
        @nb_threads_en_attente += 1
        @cond.wait( @mutex )
        @nb_threads_en_attente -= 1
      end
    end
  end

  puts "TaskBag#get => #{task}" if DEBUG
  @attente_fin.signal if @nb_termines == @nb_threads
  task
end

#put(task) ⇒ self

Ajoute une tache dans le sac de taches. Ne bloque jamais (sac de

taille non bornee.)

Parameters:

  • task

    La tache a ajouter

Returns:

  • (self)

Ensures:

  • La tache a ete ajoutee dans le sac



102
103
104
105
106
107
108
109
110
111
# File 'lib/pruby/task_bag.rb', line 102

def put( task )

  @mutex.synchronize do
    puts "TaskBag#put( #{task} )" if DEBUG
    @queue.put( task )
    @cond.broadcast
  end

  self
end

#wait_donevoid

This method returns an undefined value.

Attente jusqu'a ce que le sac soit devenu inactif parce que vide

et que tous les threads qui l'utilisaient sont devenus inactifs
(i.e., bloques en attente d'une nouvelle tache).

Ensures:

  • done?



201
202
203
204
205
# File 'lib/pruby/task_bag.rb', line 201

def wait_done
  @mutex.synchronize do
    @attente_fin.wait( @mutex ) while !@termine
  end
end