Class: PRuby::Channel
- Includes:
- Debug, Enumerable
- Defined in:
- lib/pruby/channel.rb
Overview
Canal de communication entre les threads faisant partie d'un pipeline. A un canal est associe une file FIFO d'elements: on ajoute en queue avec put et on retire de la tete avec get.
Pour simplifier, on dit que le canal contient des elements, plutot que de dire que la queue associee au canal contient des elements.
Class Method Summary collapse
Instance Method Summary collapse
-
#close ⇒ self
Indique la fermeture d'un canal.
-
#closed? ⇒ Bool
Determine si tous les appels requis a close ont ete effectues -- en fonction du nombre de writers indiques et du nombre de close effectues.
-
#each(&block) ⇒ Object
Permet d'executer un bloc pour chacun des elements obtenus d'un canal.
-
#empty? ⇒ Bool
Determine si le canal est presentement vide.
-
#eos? ⇒ Bool
Determine si la fin du flux a ete rencontree.
-
#full? ⇒ Bool
Determine si le canal est presentement plein.
-
#get ⇒ Object
Obtient l'element en tete du canal.
-
#get_all(immediate = nil) ⇒ Array
Obtient tous les elements du canal.
-
#ignore_nil? ⇒ Bool
Indique si l'ecriture de nil est permis sur le canal.
-
#initialize(name = nil, max_size = 0, contents = [], ignore_nil = nil) ⇒ Channel
constructor
Constructeur de base.
-
#peek ⇒ Object
Lit l'element en tete du canal, mais sans le retirer du canal.
-
#put(elem) ⇒ self
(also: #<<, #emit)
Ajoute un element a la queue du canal.
-
#to_s ⇒ String
Retourne une chaine representant le canal.
-
#with_multiple_writers(nb) ⇒ self
Indique qu'un canal devra recevoir des messages de la part de plusieurs threads.
Methods included from Debug
__debug__, #__debug__, #debug?
Constructor Details
#initialize(name = nil, max_size = 0, contents = [], ignore_nil = nil) ⇒ Channel
Constructeur de base.
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pruby/channel.rb', line 28 def initialize( name = nil, max_size = 0, contents = [], ignore_nil = nil ) @name = name || Channel.next_id @max_size = max_size @contents = contents.clone @ignore_nil = !!ignore_nil @nb_writers = 1 @nb_eos_received = 0 @eos = false @mutex = Mutex.new @not_empty = ConditionVariable.new @not_full = ConditionVariable.new end |
Class Method Details
.next_id ⇒ Object
282 283 284 285 286 287 |
# File 'lib/pruby/channel.rb', line 282 def self.next_id r = @next_id @next_id += 1 r.to_s end |
Instance Method Details
#close ⇒ self
Indique la fermeture d'un canal.
Mis en oeuvre en transmettant la valeur speciale EOS.
213 214 215 216 217 |
# File 'lib/pruby/channel.rb', line 213 def close put EOS self end |
#closed? ⇒ Bool
Determine si tous les appels requis a close ont ete effectues -- en fonction du nombre de writers indiques et du nombre de close effectues.
74 75 76 |
# File 'lib/pruby/channel.rb', line 74 def closed? @nb_eos_received == @nb_writers end |
#each(&block) ⇒ Object
Permet d'executer un bloc pour chacun des elements obtenus d'un canal.
L'iteration se termine quand la valeur speciale EOS est rencontree -- parce qu'elle a ete transmise explicitement par un put ou implicitement par un close.
Note: la valeur EOS n'est pas transmise au bloc.
232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/pruby/channel.rb', line 232 def each( &block ) DBC.require( block.arity != 0, "*** Le bloc passe a each doit avoir au moins un parametre:\n" << " block = #{block}\n" << " block.arity = #{block.arity}\n" << " block.parameters = #{block.parameters}\n" ) while (v = get) != EOS yield v end end |
#empty? ⇒ Bool
Determine si le canal est presentement vide. Meme si presentement vide, il peut, plus tard, ne plus etre vide si un thread ecrit dans le canal.
84 85 86 87 |
# File 'lib/pruby/channel.rb', line 84 def empty? would_return_something = eos? || !@contents.empty? !would_return_something end |
#eos? ⇒ Bool
Determine si la fin du flux a ete rencontree. Une fois que true a ete retourne, tous les appels subsequents vont aussi retourner true.
64 65 66 |
# File 'lib/pruby/channel.rb', line 64 def eos? @eos end |
#full? ⇒ Bool
Determine si le canal est presentement plein. Meme si presentement plein, il peut, plus tard, ne plus etre plein si un thread fait un get.
95 96 97 |
# File 'lib/pruby/channel.rb', line 95 def full? @max_size > 0 && @contents.size >= @max_size end |
#get ⇒ Object
Obtient l'element en tete du canal.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/pruby/channel.rb', line 159 def get (_debug_ "#{self}#get => EOS"; return EOS) if eos? _debug_ "#{self}#get => ..." elem = nil @mutex.synchronize do # INTERESSANT: que se passe-t'il si on remplace while par if? # Reponse: condition de course possible liee a discipline "signaler et continuer"!! @not_empty.wait(@mutex) while empty? elem = eos? ? EOS : @contents.shift _debug_ "#{self}#get => ... #{elem}" if elem == EOS @eos = true @not_empty.signal end @not_full.signal end elem end |
#get_all(immediate = nil) ⇒ Array
Obtient tous les elements du canal.
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/pruby/channel.rb', line 254 def get_all( immediate = nil ) # Defined mostly for testing purpose. # We want to see what is currently there if immediate elems = nil @mutex.synchronize do elems = @contents.clone end @not_full.signal return elems end # We want to see the full contents, waiting till it is complete. elems = [] loop do elem = get @not_full.signal break if elem == EOS elems << elem end @eos_encountered = true elems end |
#ignore_nil? ⇒ Bool
Indique si l'ecriture de nil est permis sur le canal.
103 104 105 |
# File 'lib/pruby/channel.rb', line 103 def ignore_nil? @ignore_nil end |
#peek ⇒ Object
Lit l'element en tete du canal, mais sans le retirer du canal.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/pruby/channel.rb', line 189 def peek (_debug_ "#{self}#peek => EOS"; return EOS) if eos? _debug_ "#{self}#peek => ..." elem = nil @mutex.synchronize do # INTERESSANT: que se passe-t'il si on remplace while par if? @not_empty.wait(@mutex) while @contents.size == 0 elem = @contents[0] _debug_ "#{self}#peek => ... #{elem}" end elem end |
#put(elem) ⇒ self Also known as: <<, emit
Ajoute un element a la queue du canal.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/pruby/channel.rb', line 121 def put( elem ) @mutex.synchronize do DBC.require( ignore_nil? || !elem.nil?, "*** Cannot put nil into a channel unless explicitly allowed" ) DBC.require( !closed? || elem == EOS, "*** Cannot put '#{elem}' on a closed channel" ) _debug_ "#{self}#put( #{elem} )" if elem == EOS @nb_eos_received += 1 if @nb_eos_received < @nb_writers elem = nil # Pas le dernier EOS, donc on ne l'ecrit pas! end @not_full.signal else @not_full.wait(@mutex) while full? end _debug_ "#{self}#put( #{elem} ) => #{elem.inspect}" if elem @contents.push elem @not_empty.signal end end self end |
#to_s ⇒ String
Retourne une chaine representant le canal.
111 112 113 |
# File 'lib/pruby/channel.rb', line 111 def to_s @name.to_s + ":: (#{@contents.size} /" + (@max_size > 0 ? "#@max_size)" : ")") end |
#with_multiple_writers(nb) ⇒ self
Indique qu'un canal devra recevoir des messages de la part de plusieurs threads. Pour que le canal propage le EOS aux lecteurs, on devra donc avoir recu plusieurs EOS, i.e., autant de EOS que de threads qui ecrivent.
51 52 53 54 55 56 |
# File 'lib/pruby/channel.rb', line 51 def with_multiple_writers( nb ) DBC.require nb >= 2, "*** Dans accept_multiple_eos: nb = #{nb} doit etre >= 2" @nb_writers = nb self end |