Class: PRuby::Stream

Inherits:
Object show all
Includes:
Enumerable
Defined in:
lib/pruby/stream.rb

Constant Summary

DEFAULT_BUFFER_SIZE =

Taille par defaut du buffer alloue pour le Channel d'un Stream.

Preferable d'utiliser 1 dans les tests pour assurer que les methodes fonctionnent correctement meme lorsque le buffer est petit. Evidemment, il est preferable d'utiliser une taille superieure en pratique, pour permettre aux threads de generer plusieurs elements avant de, possiblement, bloquer parce que le tampon de communication est plein.

10

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.generate(initial_value, options = {}) {|current_value| ... } ⇒ Stream

Genere une serie de valeurs *potentiellement infinie*.

Termine la generation seulement quand/si le bloc retourne nil comme resultat.

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :buffer_size (Fixnum)

    La taille de buffer a utiliser pour le Channel de sortie

Yield Parameters:

  • current_value (Object)

    L’etat courant, qui peut etre utilise pour generer le prochain element

Yield Returns:

  • (Object, nil)

    L’element a emettre sur le stream de sortie, si non nil. Si nil, alors le processus de generation est termine et le canal de sortie est ferme.

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/pruby/stream.rb', line 123

def self.generate( initial_value, options = {} )
  DBC.require( Stream.un_seul_thread?(options),
               "*** La generation d'une source avec generate doit se faire sequentiellement" )

  @@dummy_stream.send :mk_stream_with_threads, options do |channel|
    current_value = initial_value
    channel << initial_value
    until (current_value = yield(current_value)).nil?
      channel << current_value
    end
  end
end

.source(source, options = {}) ⇒ Stream

Note:

Si un objet repondant au message :each est est fourni en argument, alors ce sont les elements enumeres par :each qui seront emis sur le stream. Si une chaine est fournie et qu’aucun argument n’est specifie pour source_kind, alors ce sont les caracteres de la chaine qui sont emis. Par contre, si une chaine est fournie et que l’argument :filename est specifie pour source_kind, alors ce sont les *lignes du fichier* qui sont emises. De plus, dans le cas d’un fichier: i) une exception sera lancee si aucun fichier avec le nom indique n’existe; ii) les lignes emises contiendront le saut de ligne final.

Cree un stream a partir des elements d'une source externe. Cette source peut etre n'importe quel objet pouvant repondre au message #each, sinon au message #each_char, sinon etre un nom de fichier. Dans ce dernier cas, ce sont les lignes du fichier qui seront emises, l'une apres l'autre, sur le stream.

Parameters:

  • source (#each, String)

    La source qui fournira les elements a emettre

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :source_kind (nil, :string, :filename)

    La sorte de source

  • :nb_threads (Fixnum)

    Le nombre de threads avec lesquels on desire que le traitement soit fait

  • :buffer_size (Fixnum)

    La taille de buffer a utiliser pour le Channel de sortie

Returns:



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/pruby/stream.rb', line 85

def self.source( source, options = {} )
  DBC.require( source.respond_to?(:each) || source.respond_to?(:each_char),
               "*** La source doit repondre a :each, :each_char\
                ou etre un nom de fichier" )
  DBC.check_value( options[:source_kind],
                   [nil, :string, :file_name, :filename],
                   "*** Les sortes acceptees sont :filename et :string" )

  DBC.require( Stream.un_seul_thread?(options),
               "*** La generation d'une source doit se faire sequentiellement" )

  @@dummy_stream.send :mk_stream_with_threads, options do |channel|
    if source.respond_to?(:each_char) && [:file_name, :filename].include?(options[:source_kind])
      File.open(source, 'r') do |f|
        f.each_line { |line| channel << line }
      end
    elsif source.respond_to?(:each)
      source.each { |x| channel << x }
    else
      source.each_char { |c| channel << c }
    end
  end
end

Instance Method Details

#>>(proc) ⇒ Stream

Note:

Voir .apply

Cree un stream a partir d'un proc (avec apply) et le lie (le concatene) au stream courant.

Essentiellement, >> est un alias d'apply, mais qui s'appique uniquement sur une lambda-expression.

Parameters:

  • proc (Proc)

    Un lambda qui represente le traitement a faire sur le stream, via apply

Returns:

  • (Stream)

    Un nouveau stream



879
880
881
882
883
# File 'lib/pruby/stream.rb', line 879

def >>( proc )
  DBC.require proc.is_a?(Proc), "*** Dans >>: argument proc doit etre specifie"

  apply( proc )
end

#apply(proc = nil) {|self| ... } ⇒ Stream

Applique une fonction sur le stream.

La fonction est specifiee par une lambda-expression ou par un bloc. Cette fonction recoit un stream et retourne un stream. Typiquement, elle utilisera les diverses operations sur les streams.

Parameters:

  • proc (Proc, nil) (defaults to: nil)

    Une lambda-expression a appliquer sur self ou sinon un bloc

Yield Parameters:

  • self (Stream)

    Le stream courant

Yield Returns:

Returns:



857
858
859
860
861
862
863
864
# File 'lib/pruby/stream.rb', line 857

def apply( proc = nil )
  if block_given?
    yield self
  else
    DBC.require proc && proc.is_a?(Proc), "*** Dans apply: bloc ou proc argument doit etre specifie"
    proc.call self
  end
end

#drop(n, options = {}) ⇒ Stream

Supprime (laisse tomber) les n premiers elements du stream.

Parameters:

  • n (Fixnum)

    Nombre d’elements a laisser tomber au debut du stream

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/pruby/stream.rb', line 373

def drop( n, options = {} )
  DBC.require( n >= 0,
               "*** Dans drop: n doit etre >= 0 (n = #{n})" )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un traitement avec drop doit se faire sequentiellement" )

  cin = @channel
  mk_stream_with_threads do |channel|
    while n > 0 && !cin.eos?
      cin.get
      n -= 1
    end
    each do |x|
      channel << x
    end
  end
end

#drop_while(options = {}) {|x| ... } ⇒ Stream

Laisse tomber les elements du stream tant qu'ils satisfont la condition (bloc). Ajoute ensuite tous les elements subsequents dans le stream de sortie.

Yield Parameters:

  • x (Object)

    Un element du stream d’entree

Yield Returns:

  • (Bool)

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/pruby/stream.rb', line 431

def drop_while( options = {} )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un traitement avec drop_while doit se faire sequentiellement" )

  cin = @channel
  mk_stream_with_threads do |channel|
    on_conserve = false
    cin.each do |x|
      if on_conserve
        channel << x
      else
        unless yield(x)
          on_conserve = true
          channel << x
        end
      end
    end
  end
end

#each {|x| ... } ⇒ void

This method returns an undefined value.

Enumere, sequentiellement, les elements du stream, et applique le bloc sur ces elements.

Ne genere pas un nouveau stream.

Yield Parameters:

  • x (Object)

    Le prochain element obtenu du canal associe au stream



178
179
180
181
182
183
184
# File 'lib/pruby/stream.rb', line 178

def each
  @channel.each do |x|
    yield x
  end

  nil
end

#filter(options = {}) {|x| ... } ⇒ Stream Also known as: select

Selectionne les elements du stream qui satisfont une fonction (un bloc).

Yield Parameters:

  • x (Object)

    Un element du stream d’entree

Yield Returns:

  • (Bool)

    True si l’argument satisfait la condition, false sinon

Returns:



218
219
220
221
222
223
224
# File 'lib/pruby/stream.rb', line 218

def filter( options = {} )
  mk_stream_with_threads( options ) do |channel|
    each do |x|
      channel << x if yield(x)
    end
  end
end

#flat_map(options = {}) {|x| ... } ⇒ Stream

Applique une fonction (un bloc) sur chacun des elements du stream d'entree. La fonction produit en sortie un Array d'elements (contenant 0, 1 ou plusieurs elements), lesquels arrays sont ensuite concatenes dans le stream de sortie.

Yield Parameters:

  • x (Object)

    Un element du stream d’entree

Yield Returns:

  • (#each)

    Un ou plusieurs elements accessibles par l’intermediaire d’appels a #each

Returns:



332
333
334
335
336
337
338
339
340
# File 'lib/pruby/stream.rb', line 332

def flat_map( options = {} )
  mk_stream_with_threads( options ) do |channel|
    each do |x|
      yield( x ).each do |y|
        channel << y
      end
    end
  end
end

#go(options = {}) {|cin, channel| ... } ⇒ Stream

Applique un traitement a la go sur le stream.

Le bloc recu doit recevoir deux arguments: un canal d'entree et un canal de sortie. Le code dans le bloc manipule ensuite de facon explicite ces deux canaux, le premier avec avec get/each, le deuxieme avec put/close, etc.

Yield Parameters:

  • cin (Channel)

    Le canal du stream d’entree

  • channel (Channel)

    Le canal du stream de sortie

Yield Returns:

  • (void)

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



571
572
573
574
575
576
577
578
# File 'lib/pruby/stream.rb', line 571

def go( options = {} )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un traitement avec go doit se faire sequentiellement" )
  cin = @channel
  mk_stream_with_threads do |channel|
    yield( cin, channel )
  end
end

#group_by(options = {}) {|x| ... } ⇒ Stream<Object,Object>

Regroupe les elements du stream selon la valeur generee par l'application du bloc.

Yield Parameters:

  • x (Object)

    Un element du stream d’entree

Yield Returns:

  • (Object)

    La valeur a utiliser pour le regroupement

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/pruby/stream.rb', line 462

def group_by( options = {} )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un traitement avec group_by doit se faire sequentiellement" )

  cin = @channel
  mk_stream_with_threads do |channel|
    h = cin.group_by { |x| yield x }
    h.each_pair do |k, v|
      if value = options[:merge_value] || options[:map_value]
        channel << [k, v.reduce([]) { |a, x| a << value.call(x) }]
      else
        channel << [k, v]
      end
    end
  end
end

#group_by_key(options = {}) ⇒ Stream<Object,Array<Object>>

Note:

Le stream d’entree complet doit etre recu avant que des elements puissent etre emis sur le stream de sortie

A partir d'un stream contenant des paires <cle, valeur>, retourne un stream de <cle,Array<valeur>>, donc ou toutes les valeurs associees a une cle ont ete regroupees.

Returns:

Ensures:

  • Si <k1, v1> et <k2, v2> font partie du stream de sortie, alors k1 != k2

Requires:

  • Le stream d’entree est une suite de paires <cle,valeur>, i.e., Stream<Object,Object>

  • Ne peut s’executer qu’avec un seul thread



502
503
504
505
506
507
508
509
510
511
512
513
# File 'lib/pruby/stream.rb', line 502

def group_by_key( options = {} )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un traitement avec group_by doit se faire sequentiellement" )

  cin = @channel
  mk_stream_with_threads do |channel|
    h = cin.group_by { |x| x.first }
    h.each_pair do |k, v|
      channel << [k, v.reduce([]) { |a, x| a << x.last }]
    end
  end
end

#iterate(nb_iterations) {|s| ... } ⇒ Stream

Itere une transformation de stream un certain nombre fixe de fois.

Parameters:

  • nb_iterations (Fixnum)

    Nombre total de fois ou il faut proceder au traitement du stream de bout en bout

Yield Parameters:

  • s (Stream)

    Le stream a utiliser pour la nouvelle iteration

Yield Returns:

  • (Stream)

    Le stream produit par l’iteration

Returns:



834
835
836
837
838
839
840
# File 'lib/pruby/stream.rb', line 834

def iterate( nb_iterations )
  s = self
  nb_iterations.times do
    s = yield s
  end
  s
end

#map(options = {}) {|x| ... } ⇒ Stream Also known as: collect

Applique une fonction (un bloc) sur chacun des elements du stream.

Yield Parameters:

  • x (Object)

    Un element du stream d’entree

Yield Returns:

  • (Object)

    L’element a emettre sur le stream de sortie

Returns:



201
202
203
204
205
206
207
# File 'lib/pruby/stream.rb', line 201

def map( options = {} )
  mk_stream_with_threads( options ) do |channel|
    each do |x|
      channel << yield( x )
    end
  end
end

#peek(options = {}) {|x| ... } ⇒ Stream

Note:

Utile pour debogger.

Applique un traitement arbitraire sur chacun des elements du stream, et retourne le stream d'entree sans aucune modification.

Yield Parameters:

  • x (Object)

    Un element du stream d’entree

Yield Returns:

  • (void)

Returns:



287
288
289
290
291
292
293
294
# File 'lib/pruby/stream.rb', line 287

def peek( options = {} )
  mk_stream_with_threads( options ) do |channel|
    each do |x|
      yield x
      channel << x
    end
  end
end

#reject(options = {}) {|x| ... } ⇒ Stream

Selectionne les elements du stream qui ne satisfont pas une fonction (un bloc).

Yield Parameters:

  • x (Object)

    Un element du stream d’entree

Yield Returns:

  • (Bool)

    True si l’argument ne satisfait pas la condition, false sinon

Returns:



235
236
237
238
239
240
241
# File 'lib/pruby/stream.rb', line 235

def reject( options = {} )
  mk_stream_with_threads( options ) do |channel|
    each do |x|
      channel << x unless yield(x)
    end
  end
end

#sink(destination) ⇒ Array?

Collecte les elements d'un stream dans un objet final non-stream.

Plus specifiquement, les objets recus pourront etre mis dans un tableau -- nouveau ou ajoute a un tableau existant -- ou dans un fichier.

Parameters:

  • destination (Class, #<<, #puts, String)

    La destination des elements

Returns:

  • (Array, nil)

    Un Array si la destination en est un, sinon nil

Ensures:

  • si destination == Array alors les elements du stream sont mis dans un nouveau tableau

  • si destination.respond_to?(:puts) ou destination.respond_to?(:<<), alors les elements recus du stream sont ajoutes avec la methode

  • si destination.class == String alors les elements recus sont ajoutes dans le fichier avec le nom indique (qui peut ne pas exister)



899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
# File 'lib/pruby/stream.rb', line 899

def sink( destination )
  # Si c'est simplement le nom de classe Array, on genere un
  # nouveau tableau pour recevoir les elements du Stream.
  destination = [] if destination == Array

  if destination.class == String
    # On ajoute les elements au fichier ayant le nom indique.
    File.open( destination, 'a+' ) do |f|
      each { |v| f.puts v }
    end
    result = nil
  elsif destination.respond_to?( :puts )
    # On ajoute les elements du Stream avec :puts
    each { |v| destination.puts v }
    result = nil
  elsif destination.respond_to?( :<< )
    # On ajoute les elements du Stream avec :<<
    each { |v| destination << v }
    result = destination
  end

  # On termine les threads -- requis pour ceux qui generent de
  # facon potentiellement infinie.
  threads.each do |t|
    Thread.kill t if t.alive?
  end

  result
end

#sort(options = {}) {|x, y| ... } ⇒ Stream

Note:

Le stream d’entree complet doit etre recu avant que des elements puissent etre emis sur le stream de sortie.

Tri les elements d'un stream.

Le bloc a utiliser pour comparer les elements est optionnel.

Yield Parameters:

  • x (Object)

    Le premier element a comparer

  • y (Object)

    Le deuxieme element a comparer

Yield Returns:

  • (<-1, 0, 1>)

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/pruby/stream.rb', line 259

def sort( options = {} )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un tri avec sort doit se faire sequentiellement" )

  cin = @channel
  mk_stream_with_threads do |channel|
    if block_given?
      sorted = cin.sort { |x, y| yield( x, y ) }.each
    else
      sorted = cin.sort.each
    end
    sorted.each do |x|
      channel << x
    end
  end
end

#stateful(options = {}) {|state, x| ... } ⇒ Stream

Transforme un stream avec un changement etat.

On specifie l'etat initial lors de l'appel a la methode. Le bloc execute doit recevoir deux arguments: l'etat courant et l'element a traiter. Il doit aussi retourner deux resultats (Array): le nouvel etat et la valeur a emettre sur le stream de sortie suite au traitement de l'element recu. Un tel traitement implique donc une execution strictement sequentielle.

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • initial_state (Object)

    Valeur initiale de l’etat

  • at_eos (Proc, :STATE, :EMIT_STATE)

    Traitement a effectuer lorsque la fin du stream est rencontree. Si Proc, alors l’etat est transmis au Proc. Si :STATE ou :EMIT_STATE, l’etat est simplement emis sur le stream de sortie

Yield Parameters:

  • state (Object)

    L’etat courant

  • x (Object)

    La valeur a traiter du stream d’entree

Yield Returns:

  • (Array<Object>)

    Le nouvel etat courant et la valeur a emettre sur le canal du stream de sortie (si non nil)

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
# File 'lib/pruby/stream.rb', line 604

def stateful( options = {} )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un traitement avec stateful doit se faire sequentiellement" )

  state = options[:initial_state]
  cin = @channel
  mk_stream_with_threads do |channel|
    cin.each do |x|
      state, result = yield( state, x )
      channel << result
    end
    at_eos = options[:at_eos]
    if at_eos.is_a?(Proc)
      channel << at_eos.call(state)
    elsif at_eos == :STATE || at_eos == :EMIT_STATE
      channel << state
    elsif at_eos
      channel << at_eos
    end
  end
end

#take(n, options = {}) ⇒ Stream

Prend les n premiers elements du stream.

Parameters:

  • n (Fixnum)

    Nombre d’elements a prendre au debut du stream

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/pruby/stream.rb', line 350

def take( n, options = {} )
  DBC.require( n >= 0,
               "*** Dans take: n doit etre >= 0 (n = #{n})" )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un traitement avec take doit se faire sequentiellement" )

  cin = @channel
  mk_stream_with_threads do |channel|
    while n > 0 && !cin.eos?
      channel << cin.get
      n -= 1
    end
  end
end

#take_while(options = {}) {|x| ... } ⇒ Stream

Prend les elements du stream tant qu'ils satisfont la condition (bloc). Termine le stream de sortie des qu'un element qui ne satisfait pas la condition est rencontre.

Yield Parameters:

  • x (Object)

    Un element du stream d’entree

Yield Returns:

  • (Bool)

Returns:

Requires:

  • Ne peut s’executer qu’avec un seul thread



403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/pruby/stream.rb', line 403

def take_while( options = {} )
  DBC.require( Stream.un_seul_thread?(options),
               "*** Un traitement avec take_while doit se faire sequentiellement" )

  cin = @channel
  mk_stream_with_threads do |channel|
    cin.each do |x|
      if yield( x )
        channel << x
      else
        break
      end
    end
  end
end

#to_aArray

Collecte les elements du Stream dans un tableau.

Returns:



934
935
936
# File 'lib/pruby/stream.rb', line 934

def to_a
  sink []
end

#uniq(options = {}) ⇒ Stream

Note:

Mise en oeuvre couteuse (utilisation d’un Mutex et d’un ensemble contenant les elements vus)

Filtre les elements du stream d'entree pour assurer qu'il n'y ait qu'une seule occurrence de chaque element dans le stream de sortie.

Returns:



306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/pruby/stream.rb', line 306

def uniq( options = {} )
  vus = []
  mutex = Mutex.new
  mk_stream_with_threads( options ) do |channel|
    each do |x|
      mutex.synchronize do
        unless vus.include? x
          channel << x
          vus << x
        end
      end unless vus.include? x
    end
  end
end