Class: PRuby::Channel

Inherits:
Object show all
Includes:
Debug, Enumerable
Defined in:
lib/pruby/channel.rb

Overview

Canal de communication entre les threads faisant partie d'un pipeline. A un canal est associe une file FIFO d'elements: on ajoute en queue avec put et on retire de la tete avec get.

Pour simplifier, on dit que le canal contient des elements, plutot que de dire que la queue associee au canal contient des elements.

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Debug

__debug__, #__debug__, #debug?

Constructor Details

#initialize(name = nil, max_size = 0, contents = [], ignore_nil = nil) ⇒ Channel

Constructeur de base.

Parameters:

  • name (String) (defaults to: nil)

    Le nom du canal, utilise essentiellement pour le debogage

  • max_size (Fixnum) (defaults to: 0)

    Taille maximum du canal, i.e., nombre max. d’elements pouvant etre en attente dans le canal. Si max_size == 0, alors canal non-borne

  • contents (Array) (defaults to: [])

    Contenu initial du canal



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pruby/channel.rb', line 28

def initialize( name = nil, max_size = 0, contents = [], ignore_nil = nil )
  @name = name || Channel.next_id
  @max_size = max_size
  @contents = contents.clone
  @ignore_nil = !!ignore_nil

  @nb_writers = 1
  @nb_eos_received = 0
  @eos = false
  @mutex = Mutex.new
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
end

Class Method Details

.next_idObject



282
283
284
285
286
287
# File 'lib/pruby/channel.rb', line 282

def self.next_id
  r = @next_id
  @next_id += 1

  r.to_s
end

Instance Method Details

#closeself

Indique la fermeture d'un canal.

Mis en oeuvre en transmettant la valeur speciale EOS.

Returns:

  • (self)

Ensures:

  • Les appels subsequents a get vont retourner EOS apres que le contenu deja present aura ete lu.



213
214
215
216
217
# File 'lib/pruby/channel.rb', line 213

def close
  put EOS

  self
end

#closed?Bool

Determine si tous les appels requis a close ont ete effectues -- en fonction du nombre de writers indiques et du nombre de close effectues.

Returns:

  • (Bool)


74
75
76
# File 'lib/pruby/channel.rb', line 74

def closed?
  @nb_eos_received == @nb_writers
end

#each(&block) ⇒ Object

Permet d'executer un bloc pour chacun des elements obtenus d'un canal.

L'iteration se termine quand la valeur speciale EOS est rencontree -- parce qu'elle a ete transmise explicitement par un put ou implicitement par un close.

Note: la valeur EOS n'est pas transmise au bloc.

Parameters:

  • block

    Le bloc a executer

Ensures:

  • Le bloc est execute pour chaque element du flux, sauf le EOS final

Requires:

  • Le bloc recoit un argument, qui est un element du flux a traiter



232
233
234
235
236
237
238
239
240
241
242
# File 'lib/pruby/channel.rb', line 232

def each( &block )
  DBC.require( block.arity != 0,
               "*** Le bloc passe a each doit avoir au moins un parametre:\n" <<
               "    block = #{block}\n" <<
               "    block.arity = #{block.arity}\n" <<
               "    block.parameters = #{block.parameters}\n" )

  while (v = get) != EOS
    yield v
  end
end

#empty?Bool

Determine si le canal est presentement vide. Meme si presentement vide, il peut, plus tard, ne plus etre vide si un thread ecrit dans le canal.

Returns:

  • (Bool)


84
85
86
87
# File 'lib/pruby/channel.rb', line 84

def empty?
  would_return_something = eos? || !@contents.empty?
  !would_return_something
end

#eos?Bool

Determine si la fin du flux a ete rencontree. Une fois que true a ete retourne, tous les appels subsequents vont aussi retourner true.

Returns:

  • (Bool)


64
65
66
# File 'lib/pruby/channel.rb', line 64

def eos?
  @eos
end

#full?Bool

Determine si le canal est presentement plein. Meme si presentement plein, il peut, plus tard, ne plus etre plein si un thread fait un get.

Returns:

  • (Bool)


95
96
97
# File 'lib/pruby/channel.rb', line 95

def full?
  @max_size > 0 && @contents.size >= @max_size
end

#getObject

Obtient l'element en tete du canal.

Returns:

  • L’element qui etait en tete du canal. Bloque si empty?

Ensures:

  • L’element retourne est retire du canal, donc le canal a un element de moins



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/pruby/channel.rb', line 159

def get
  (_debug_ "#{self}#get => EOS"; return EOS) if eos?

  _debug_ "#{self}#get => ..."

  elem = nil
  @mutex.synchronize do
    # INTERESSANT: que se passe-t'il si on remplace while par if?
    # Reponse: condition de course possible liee a discipline "signaler et continuer"!!
    @not_empty.wait(@mutex) while empty?

    elem = eos? ? EOS : @contents.shift

    _debug_ "#{self}#get => ... #{elem}"

    if elem == EOS
      @eos = true
      @not_empty.signal
    end
    @not_full.signal
  end

  elem
end

#get_all(immediate = nil) ⇒ Array

Obtient tous les elements du canal.

Parameters:

  • immediate (Bool, nil) (defaults to: nil)

    Si true alors ne bloque pas et retourne le contenu courant. Si false, alors ne retourne que lorsque la fin de canal est rencontree

Returns:

  • (Array)

    immediate => tous les elements presentement dans le canal, sans bloquer

  • (Array)

    !immediate => tous les elements jusqu’au EOS, en bloquant si necessaire

Ensures:

  • empty?



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/pruby/channel.rb', line 254

def get_all( immediate = nil )
  # Defined mostly for testing purpose.

  # We want to see what is currently there
  if immediate
    elems = nil
    @mutex.synchronize do
      elems = @contents.clone
    end
    @not_full.signal
    return elems
  end

  # We want to see the full contents, waiting till it is complete.
  elems = []
  loop do
    elem = get
    @not_full.signal
    break if elem == EOS
    elems << elem
  end
  @eos_encountered = true

  elems
end

#ignore_nil?Bool

Indique si l'ecriture de nil est permis sur le canal.

Returns:

  • (Bool)


103
104
105
# File 'lib/pruby/channel.rb', line 103

def ignore_nil?
  @ignore_nil
end

#peekObject

Lit l'element en tete du canal, mais sans le retirer du canal.

Returns:

  • L’element en tete du canal. Bloque si empty?

Ensures:

  • Aucun effe sur le contenu du canal



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/pruby/channel.rb', line 189

def peek
  (_debug_ "#{self}#peek => EOS"; return EOS) if eos?

  _debug_ "#{self}#peek => ..."

  elem = nil
  @mutex.synchronize do
    # INTERESSANT: que se passe-t'il si on remplace while par if?
    @not_empty.wait(@mutex) while @contents.size == 0
    elem = @contents[0]
    _debug_ "#{self}#peek => ... #{elem}"
  end

  elem
end

#put(elem) ⇒ self Also known as: <<, emit

Ajoute un element a la queue du canal.

Parameters:

  • elem

    L’element a ajouter

Returns:

  • (self)

Requires:

  • (ignore_nil? || !elem.nil?) && !closed?



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/pruby/channel.rb', line 121

def put( elem )
  @mutex.synchronize do
    DBC.require( ignore_nil? || !elem.nil?,
                 "*** Cannot put nil into a channel unless explicitly allowed" )
    DBC.require( !closed? || elem == EOS,
                 "*** Cannot put '#{elem}' on a closed channel" )

    _debug_ "#{self}#put( #{elem} )"

    if elem == EOS
      @nb_eos_received += 1
      if @nb_eos_received < @nb_writers
        elem = nil  # Pas le dernier EOS, donc on ne l'ecrit pas!
      end
      @not_full.signal
    else
      @not_full.wait(@mutex) while full?
    end

    _debug_ "#{self}#put( #{elem} ) => #{elem.inspect}"

    if elem
      @contents.push elem
      @not_empty.signal
    end
  end

  self
end

#to_sString

Retourne une chaine representant le canal.

Returns:

  • (String)


111
112
113
# File 'lib/pruby/channel.rb', line 111

def to_s
  @name.to_s + ":: (#{@contents.size} /" + (@max_size > 0 ? "#@max_size)" : ")")
end

#with_multiple_writers(nb) ⇒ self

Indique qu'un canal devra recevoir des messages de la part de plusieurs threads. Pour que le canal propage le EOS aux lecteurs, on devra donc avoir recu plusieurs EOS, i.e., autant de EOS que de threads qui ecrivent.

Parameters:

  • nb (Fixnum)

    Nombre de threads qui vont ecrire dans le canal

Returns:

  • (self)

Requires:

  • nb >= 2



51
52
53
54
55
56
# File 'lib/pruby/channel.rb', line 51

def with_multiple_writers( nb )
  DBC.require nb >= 2, "*** Dans accept_multiple_eos: nb = #{nb} doit etre >= 2"

  @nb_writers = nb
  self
end