Class: PRuby::TaskBag
Overview
Il s’agit (pour l’instant?) d’une mise en oeuvre naive, car elle utilise un Channel pour le sac (la queue/file) des taches… Il y a donc deux niveaux de verrous :( Toutefois, le verrou de TaskBag est crucial pour que cela fonctionne correctement: l’utilisation de Channel est uniquement une facon simple de signaler qu’il n’y aura plus rien comme taches en utilisant EOS.
Classe pour un sac dynamique de taches -- donc dans lequel on peut retirer mais aussi ajouter des taches de facon dynamique, i.e., en cours d'execution.
Constant Summary
- DEBUG =
true && false
Class Method Summary collapse
-
.create_and_run(nb_threads, *tasks) {|tb, k| ... } ⇒ Array<Object>
(also: run)
Lance l'execution d'un groupe de threads qui vont se partager un sac de taches, sac qu'ils utiliseront par l'intermediaire d'appels each et, s'ils generent de nouvelles taches, par des appels a put.
- .get ⇒ Object
Instance Method Summary collapse
-
#done? ⇒ Bool
Indique si le sac est devenu inactif parce que vide et que tous les threads qui l'utilisaient sont devenus inactifs -- i.e., bloques en attente d'une nouvelle tache.
-
#each {|une| ... } ⇒ void
Itere sur des elements du sac de taches.
-
#get ⇒ nil, Object
Retire une tache du sac de taches.
-
#initialize(nb_threads) ⇒ TaskBag
constructor
Cree un nouveau sac de taches.
-
#put(task) ⇒ self
Ajoute une tache dans le sac de taches.
-
#wait_done ⇒ void
Attente jusqu'a ce que le sac soit devenu inactif parce que vide et que tous les threads qui l'utilisaient sont devenus inactifs (i.e., bloques en attente d'une nouvelle tache).
Constructor Details
#initialize(nb_threads) ⇒ TaskBag
Plus precisement, nb_threads indique combien de threads vont faire des get sur le sac. C’est ce nombre qui permet de detecter la terminaison: quand nb_threads-1 threads sont bloques sur le get et que le dernier thread fait un get, alors tous les threads sont reactives avec la valeur nil comme resultat du get. Si le nombre de threads qui obtiennent des valeurs du sac est different de nb_threads, alors le comportement quant a la terminaison pourrait etre incorrect.
Cree un nouveau sac de taches.
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/pruby/task_bag.rb', line 34 def initialize( nb_threads ) @nb_threads = nb_threads @nb_threads_en_attente = 0 @mutex = Mutex.new @cond = ConditionVariable.new @attente_fin = ConditionVariable.new @queue = PRuby::Channel.new @termine = false @nb_termines = 0 end |
Class Method Details
.create_and_run(nb_threads, *tasks) {|tb, k| ... } ⇒ Array<Object> Also known as: run
Lance l'execution d'un groupe de threads qui vont se partager un sac de taches, sac qu'ils utiliseront par l'intermediaire d'appels each et, s'ils generent de nouvelles taches, par des appels a put.
Exemple typiques d'utilisation -- execute avec 4 threads et avec comme taches initiales t1, t2, ..., tk:
resultats = TaskBag.run( 4, t1, t2, ..., tk ) do |tb|
...
tb.each do |task|
traiter task
end
resultat_a_retourner
end
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/pruby/task_bag.rb', line 75 def self.create_and_run( nb_threads, *tasks ) DBC.require !tasks.empty?, "*** Dans run/create_and_run: Il faut specifier au moins une tache" # On cree le sac de taches avec ses taches initiales. tb = new( nb_threads ) tasks.each do |task| tb.put task end # On lance les threads, on attend qu'ils terminent et on # retourne leurs resultats. (0...nb_threads) .map { |k| PRuby.future { yield tb, k } } .map(&:value) end |
.get ⇒ Object
185 186 187 |
# File 'lib/pruby/task_bag.rb', line 185 def self.get fail "*** Lorsque each est utilise, il ne faut pas faire d'appel a get" end |
Instance Method Details
#done? ⇒ Bool
Indique si le sac est devenu inactif parce que vide et que tous
les threads qui l'utilisaient sont devenus inactifs -- i.e.,
bloques en attente d'une nouvelle tache.
213 214 215 |
# File 'lib/pruby/task_bag.rb', line 213 def done? @termine end |
#each {|une| ... } ⇒ void
This method returns an undefined value.
Itere sur des elements du sac de taches.
Plus precisement, les elements du sac sont repartis entre les divers threads qui utilisent le sac, et donc un thread donne n'obtient qu'un sous-ensemble des elements mis dans le sac.
Si un thread utilise each, *il ne doit pas utilise get*. Par contre, il peut utiliser put pour ajouter un tache dans le sac.
Le each se termine lorsque tous les threads qui utilisent le sac de taches sont bloques en attente d'une tache.
183 184 185 186 187 188 189 190 191 192 |
# File 'lib/pruby/task_bag.rb', line 183 def each alias :old_get :get def self.get fail "*** Lorsque each est utilise, il ne faut pas faire d'appel a get" end while task = old_get yield( task ) end end |
#get ⇒ nil, Object
Retire une tache du sac de taches. Bloque si le sac est
actuellement vide mais que d'autres taches pourraient etre
ajoutees parce que des threads sont encore actifs.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/pruby/task_bag.rb', line 121 def get puts "TaskBag#get" if DEBUG task = nil @mutex.synchronize do loop do unless @queue.empty? # Il y a une tache disponible. task = @queue.get puts "Il y a une tache disponible: task = #{task.inspect}" if DEBUG if task == EOS # EOS => privee a mise en oeuvre. task = nil @nb_termines += 1 end break end puts "Pas de tache" if DEBUG if @nb_threads_en_attente == @nb_threads - 1 puts "... et dernier thread actif!!" if DEBUG # Dernier thread actif: on signale aux autres threads # qu'il n'y aura plus rien. @queue.put EOS @termine = true @cond.broadcast else puts "... mais d'autres threads encore actifs!!" if DEBUG # D'autres threads sont encore actifs et pourraient # ajouter des taches: on bloque. @nb_threads_en_attente += 1 @cond.wait( @mutex ) @nb_threads_en_attente -= 1 end end end puts "TaskBag#get => #{task}" if DEBUG @attente_fin.signal if @nb_termines == @nb_threads task end |
#put(task) ⇒ self
Ajoute une tache dans le sac de taches. Ne bloque jamais (sac de
taille non bornee.)
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/pruby/task_bag.rb', line 102 def put( task ) @mutex.synchronize do puts "TaskBag#put( #{task} )" if DEBUG @queue.put( task ) @cond.broadcast end self end |
#wait_done ⇒ void
This method returns an undefined value.
Attente jusqu'a ce que le sac soit devenu inactif parce que vide
et que tous les threads qui l'utilisaient sont devenus inactifs
(i.e., bloques en attente d'une nouvelle tache).
201 202 203 204 205 |
# File 'lib/pruby/task_bag.rb', line 201 def wait_done @mutex.synchronize do @attente_fin.wait( @mutex ) while !@termine end end |