Class: PRuby::Stream
Constant Summary
- DEFAULT_BUFFER_SIZE =
Taille par defaut du buffer alloue pour le Channel d'un Stream.
Preferable d'utiliser 1 dans les tests pour assurer que les methodes fonctionnent correctement meme lorsque le buffer est petit. Evidemment, il est preferable d'utiliser une taille superieure en pratique, pour permettre aux threads de generer plusieurs elements avant de, possiblement, bloquer parce que le tampon de communication est plein.
10
Class Method Summary collapse
-
.generate(initial_value, options = {}) {|current_value| ... } ⇒ Stream
Genere une serie de valeurs *potentiellement infinie*.
-
.source(source, options = {}) ⇒ Stream
Cree un stream a partir des elements d'une source externe.
Instance Method Summary collapse
-
#>>(proc) ⇒ Stream
Cree un stream a partir d'un proc (avec apply) et le lie (le concatene) au stream courant.
-
#apply(proc = nil) {|self| ... } ⇒ Stream
Applique une fonction sur le stream.
-
#drop(n, options = {}) ⇒ Stream
Supprime (laisse tomber) les n premiers elements du stream.
-
#drop_while(options = {}) {|x| ... } ⇒ Stream
Laisse tomber les elements du stream tant qu'ils satisfont la condition (bloc).
-
#each {|x| ... } ⇒ void
Enumere, sequentiellement, les elements du stream, et applique le bloc sur ces elements.
-
#filter(options = {}) {|x| ... } ⇒ Stream
(also: #select)
Selectionne les elements du stream qui satisfont une fonction (un bloc).
-
#flat_map(options = {}) {|x| ... } ⇒ Stream
Applique une fonction (un bloc) sur chacun des elements du stream d'entree.
-
#go(options = {}) {|cin, channel| ... } ⇒ Stream
Applique un traitement a la go sur le stream.
-
#group_by(options = {}) {|x| ... } ⇒ Stream<Object,Object>
Regroupe les elements du stream selon la valeur generee par l'application du bloc.
-
#group_by_key(options = {}) ⇒ Stream<Object,Array<Object>>
A partir d'un stream contenant des paires <cle, valeur>, retourne un stream de <cle,Array<valeur>>, donc ou toutes les valeurs associees a une cle ont ete regroupees.
-
#iterate(nb_iterations) {|s| ... } ⇒ Stream
Itere une transformation de stream un certain nombre fixe de fois.
-
#map(options = {}) {|x| ... } ⇒ Stream
(also: #collect)
Applique une fonction (un bloc) sur chacun des elements du stream.
-
#peek(options = {}) {|x| ... } ⇒ Stream
Applique un traitement arbitraire sur chacun des elements du stream, et retourne le stream d'entree sans aucune modification.
-
#reject(options = {}) {|x| ... } ⇒ Stream
Selectionne les elements du stream qui ne satisfont pas une fonction (un bloc).
-
#sink(destination) ⇒ Array?
Collecte les elements d'un stream dans un objet final non-stream.
-
#sort(options = {}) {|x, y| ... } ⇒ Stream
Tri les elements d'un stream.
-
#stateful(options = {}) {|state, x| ... } ⇒ Stream
Transforme un stream avec un changement etat.
-
#take(n, options = {}) ⇒ Stream
Prend les n premiers elements du stream.
-
#take_while(options = {}) {|x| ... } ⇒ Stream
Prend les elements du stream tant qu'ils satisfont la condition (bloc).
-
#to_a ⇒ Array
Collecte les elements du Stream dans un tableau.
-
#uniq(options = {}) ⇒ Stream
Filtre les elements du stream d'entree pour assurer qu'il n'y ait qu'une seule occurrence de chaque element dans le stream de sortie.
Class Method Details
.generate(initial_value, options = {}) {|current_value| ... } ⇒ Stream
Genere une serie de valeurs *potentiellement infinie*.
Termine la generation seulement quand/si le bloc retourne nil comme resultat.
123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/pruby/stream.rb', line 123 def self.generate( initial_value, = {} ) DBC.require( Stream.un_seul_thread?(), "*** La generation d'une source avec generate doit se faire sequentiellement" ) @@dummy_stream.send :mk_stream_with_threads, do |channel| current_value = initial_value channel << initial_value until (current_value = yield(current_value)).nil? channel << current_value end end end |
.source(source, options = {}) ⇒ Stream
Si un objet repondant au message :each est est fourni en argument, alors ce sont les elements enumeres par :each qui seront emis sur le stream. Si une chaine est fournie et qu’aucun argument n’est specifie pour source_kind, alors ce sont les caracteres de la chaine qui sont emis. Par contre, si une chaine est fournie et que l’argument :filename est specifie pour source_kind, alors ce sont les *lignes du fichier* qui sont emises. De plus, dans le cas d’un fichier: i) une exception sera lancee si aucun fichier avec le nom indique n’existe; ii) les lignes emises contiendront le saut de ligne final.
Cree un stream a partir des elements d'une source externe. Cette source peut etre n'importe quel objet pouvant repondre au message #each, sinon au message #each_char, sinon etre un nom de fichier. Dans ce dernier cas, ce sont les lignes du fichier qui seront emises, l'une apres l'autre, sur le stream.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/pruby/stream.rb', line 85 def self.source( source, = {} ) DBC.require( source.respond_to?(:each) || source.respond_to?(:each_char), "*** La source doit repondre a :each, :each_char\ ou etre un nom de fichier" ) DBC.check_value( [:source_kind], [nil, :string, :file_name, :filename], "*** Les sortes acceptees sont :filename et :string" ) DBC.require( Stream.un_seul_thread?(), "*** La generation d'une source doit se faire sequentiellement" ) @@dummy_stream.send :mk_stream_with_threads, do |channel| if source.respond_to?(:each_char) && [:file_name, :filename].include?([:source_kind]) File.open(source, 'r') do |f| f.each_line { |line| channel << line } end elsif source.respond_to?(:each) source.each { |x| channel << x } else source.each_char { |c| channel << c } end end end |
Instance Method Details
#>>(proc) ⇒ Stream
Voir .apply
Cree un stream a partir d'un proc (avec apply) et le lie (le concatene) au stream courant.
Essentiellement, >> est un alias d'apply, mais qui s'appique uniquement sur une lambda-expression.
879 880 881 882 883 |
# File 'lib/pruby/stream.rb', line 879 def >>( proc ) DBC.require proc.is_a?(Proc), "*** Dans >>: argument proc doit etre specifie" apply( proc ) end |
#apply(proc = nil) {|self| ... } ⇒ Stream
Applique une fonction sur le stream.
La fonction est specifiee par une lambda-expression ou par un bloc. Cette fonction recoit un stream et retourne un stream. Typiquement, elle utilisera les diverses operations sur les streams.
857 858 859 860 861 862 863 864 |
# File 'lib/pruby/stream.rb', line 857 def apply( proc = nil ) if block_given? yield self else DBC.require proc && proc.is_a?(Proc), "*** Dans apply: bloc ou proc argument doit etre specifie" proc.call self end end |
#drop(n, options = {}) ⇒ Stream
Supprime (laisse tomber) les n premiers elements du stream.
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/pruby/stream.rb', line 373 def drop( n, = {} ) DBC.require( n >= 0, "*** Dans drop: n doit etre >= 0 (n = #{n})" ) DBC.require( Stream.un_seul_thread?(), "*** Un traitement avec drop doit se faire sequentiellement" ) cin = @channel mk_stream_with_threads do |channel| while n > 0 && !cin.eos? cin.get n -= 1 end each do |x| channel << x end end end |
#drop_while(options = {}) {|x| ... } ⇒ Stream
Laisse tomber les elements du stream tant qu'ils satisfont la condition (bloc). Ajoute ensuite tous les elements subsequents dans le stream de sortie.
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/pruby/stream.rb', line 431 def drop_while( = {} ) DBC.require( Stream.un_seul_thread?(), "*** Un traitement avec drop_while doit se faire sequentiellement" ) cin = @channel mk_stream_with_threads do |channel| on_conserve = false cin.each do |x| if on_conserve channel << x else unless yield(x) on_conserve = true channel << x end end end end end |
#each {|x| ... } ⇒ void
This method returns an undefined value.
Enumere, sequentiellement, les elements du stream, et applique le bloc sur ces elements.
Ne genere pas un nouveau stream.
178 179 180 181 182 183 184 |
# File 'lib/pruby/stream.rb', line 178 def each @channel.each do |x| yield x end nil end |
#filter(options = {}) {|x| ... } ⇒ Stream Also known as: select
Selectionne les elements du stream qui satisfont une fonction (un bloc).
218 219 220 221 222 223 224 |
# File 'lib/pruby/stream.rb', line 218 def filter( = {} ) mk_stream_with_threads( ) do |channel| each do |x| channel << x if yield(x) end end end |
#flat_map(options = {}) {|x| ... } ⇒ Stream
Applique une fonction (un bloc) sur chacun des elements du stream d'entree. La fonction produit en sortie un Array d'elements (contenant 0, 1 ou plusieurs elements), lesquels arrays sont ensuite concatenes dans le stream de sortie.
332 333 334 335 336 337 338 339 340 |
# File 'lib/pruby/stream.rb', line 332 def flat_map( = {} ) mk_stream_with_threads( ) do |channel| each do |x| yield( x ).each do |y| channel << y end end end end |
#go(options = {}) {|cin, channel| ... } ⇒ Stream
Applique un traitement a la go sur le stream.
Le bloc recu doit recevoir deux arguments: un canal d'entree et un canal de sortie. Le code dans le bloc manipule ensuite de facon explicite ces deux canaux, le premier avec avec get/each, le deuxieme avec put/close, etc.
571 572 573 574 575 576 577 578 |
# File 'lib/pruby/stream.rb', line 571 def go( = {} ) DBC.require( Stream.un_seul_thread?(), "*** Un traitement avec go doit se faire sequentiellement" ) cin = @channel mk_stream_with_threads do |channel| yield( cin, channel ) end end |
#group_by(options = {}) {|x| ... } ⇒ Stream<Object,Object>
Regroupe les elements du stream selon la valeur generee par l'application du bloc.
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 |
# File 'lib/pruby/stream.rb', line 462 def group_by( = {} ) DBC.require( Stream.un_seul_thread?(), "*** Un traitement avec group_by doit se faire sequentiellement" ) cin = @channel mk_stream_with_threads do |channel| h = cin.group_by { |x| yield x } h.each_pair do |k, v| if value = [:merge_value] || [:map_value] channel << [k, v.reduce([]) { |a, x| a << value.call(x) }] else channel << [k, v] end end end end |
#group_by_key(options = {}) ⇒ Stream<Object,Array<Object>>
Le stream d’entree complet doit etre recu avant que des elements puissent etre emis sur le stream de sortie
A partir d'un stream contenant des paires <cle, valeur>, retourne un stream de <cle,Array<valeur>>, donc ou toutes les valeurs associees a une cle ont ete regroupees.
502 503 504 505 506 507 508 509 510 511 512 513 |
# File 'lib/pruby/stream.rb', line 502 def group_by_key( = {} ) DBC.require( Stream.un_seul_thread?(), "*** Un traitement avec group_by doit se faire sequentiellement" ) cin = @channel mk_stream_with_threads do |channel| h = cin.group_by { |x| x.first } h.each_pair do |k, v| channel << [k, v.reduce([]) { |a, x| a << x.last }] end end end |
#iterate(nb_iterations) {|s| ... } ⇒ Stream
Itere une transformation de stream un certain nombre fixe de fois.
834 835 836 837 838 839 840 |
# File 'lib/pruby/stream.rb', line 834 def iterate( nb_iterations ) s = self nb_iterations.times do s = yield s end s end |
#map(options = {}) {|x| ... } ⇒ Stream Also known as: collect
Applique une fonction (un bloc) sur chacun des elements du stream.
201 202 203 204 205 206 207 |
# File 'lib/pruby/stream.rb', line 201 def map( = {} ) mk_stream_with_threads( ) do |channel| each do |x| channel << yield( x ) end end end |
#peek(options = {}) {|x| ... } ⇒ Stream
Utile pour debogger.
Applique un traitement arbitraire sur chacun des elements du stream, et retourne le stream d'entree sans aucune modification.
287 288 289 290 291 292 293 294 |
# File 'lib/pruby/stream.rb', line 287 def peek( = {} ) mk_stream_with_threads( ) do |channel| each do |x| yield x channel << x end end end |
#reject(options = {}) {|x| ... } ⇒ Stream
Selectionne les elements du stream qui ne satisfont pas une fonction (un bloc).
235 236 237 238 239 240 241 |
# File 'lib/pruby/stream.rb', line 235 def reject( = {} ) mk_stream_with_threads( ) do |channel| each do |x| channel << x unless yield(x) end end end |
#sink(destination) ⇒ Array?
Collecte les elements d'un stream dans un objet final non-stream.
Plus specifiquement, les objets recus pourront etre mis dans un tableau -- nouveau ou ajoute a un tableau existant -- ou dans un fichier.
899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 |
# File 'lib/pruby/stream.rb', line 899 def sink( destination ) # Si c'est simplement le nom de classe Array, on genere un # nouveau tableau pour recevoir les elements du Stream. destination = [] if destination == Array if destination.class == String # On ajoute les elements au fichier ayant le nom indique. File.open( destination, 'a+' ) do |f| each { |v| f.puts v } end result = nil elsif destination.respond_to?( :puts ) # On ajoute les elements du Stream avec :puts each { |v| destination.puts v } result = nil elsif destination.respond_to?( :<< ) # On ajoute les elements du Stream avec :<< each { |v| destination << v } result = destination end # On termine les threads -- requis pour ceux qui generent de # facon potentiellement infinie. threads.each do |t| Thread.kill t if t.alive? end result end |
#sort(options = {}) {|x, y| ... } ⇒ Stream
Le stream d’entree complet doit etre recu avant que des elements puissent etre emis sur le stream de sortie.
Tri les elements d'un stream.
Le bloc a utiliser pour comparer les elements est optionnel.
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/pruby/stream.rb', line 259 def sort( = {} ) DBC.require( Stream.un_seul_thread?(), "*** Un tri avec sort doit se faire sequentiellement" ) cin = @channel mk_stream_with_threads do |channel| if block_given? sorted = cin.sort { |x, y| yield( x, y ) }.each else sorted = cin.sort.each end sorted.each do |x| channel << x end end end |
#stateful(options = {}) {|state, x| ... } ⇒ Stream
Transforme un stream avec un changement etat.
On specifie l'etat initial lors de l'appel a la methode. Le bloc execute doit recevoir deux arguments: l'etat courant et l'element a traiter. Il doit aussi retourner deux resultats (Array): le nouvel etat et la valeur a emettre sur le stream de sortie suite au traitement de l'element recu. Un tel traitement implique donc une execution strictement sequentielle.
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 |
# File 'lib/pruby/stream.rb', line 604 def stateful( = {} ) DBC.require( Stream.un_seul_thread?(), "*** Un traitement avec stateful doit se faire sequentiellement" ) state = [:initial_state] cin = @channel mk_stream_with_threads do |channel| cin.each do |x| state, result = yield( state, x ) channel << result end at_eos = [:at_eos] if at_eos.is_a?(Proc) channel << at_eos.call(state) elsif at_eos == :STATE || at_eos == :EMIT_STATE channel << state elsif at_eos channel << at_eos end end end |
#take(n, options = {}) ⇒ Stream
Prend les n premiers elements du stream.
350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/pruby/stream.rb', line 350 def take( n, = {} ) DBC.require( n >= 0, "*** Dans take: n doit etre >= 0 (n = #{n})" ) DBC.require( Stream.un_seul_thread?(), "*** Un traitement avec take doit se faire sequentiellement" ) cin = @channel mk_stream_with_threads do |channel| while n > 0 && !cin.eos? channel << cin.get n -= 1 end end end |
#take_while(options = {}) {|x| ... } ⇒ Stream
Prend les elements du stream tant qu'ils satisfont la condition (bloc). Termine le stream de sortie des qu'un element qui ne satisfait pas la condition est rencontre.
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
# File 'lib/pruby/stream.rb', line 403 def take_while( = {} ) DBC.require( Stream.un_seul_thread?(), "*** Un traitement avec take_while doit se faire sequentiellement" ) cin = @channel mk_stream_with_threads do |channel| cin.each do |x| if yield( x ) channel << x else break end end end end |
#to_a ⇒ Array
Collecte les elements du Stream dans un tableau.
934 935 936 |
# File 'lib/pruby/stream.rb', line 934 def to_a sink [] end |
#uniq(options = {}) ⇒ Stream
Mise en oeuvre couteuse (utilisation d’un Mutex et d’un ensemble contenant les elements vus)
Filtre les elements du stream d'entree pour assurer qu'il n'y ait qu'une seule occurrence de chaque element dans le stream de sortie.
306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/pruby/stream.rb', line 306 def uniq( = {} ) vus = [] mutex = Mutex.new mk_stream_with_threads( ) do |channel| each do |x| mutex.synchronize do unless vus.include? x channel << x vus << x end end unless vus.include? x end end end |