• Effekt Logo Effekt Library
    • stream
      • emit
        • emit
      • read
        • read
      • stop
        • stop
      • for
      • map
      • tap
      • fix
      • each
      • each
      • each
      • each
      • eachKey
      • eachValue
      • each
      • boundary
      • exhaustively
      • many
      • some
      • optionally
      • attempt
      • Indexed
      • range
      • rangeTo
      • rangeFrom
      • index
      • limit
      • replicate
      • iterate
      • sum
      • product
      • collectList
      • collectArray
      • collectBytes
      • feed
      • collectMap
      • collectMap
      • collectSet
      • collectSet
      • feed
      • feed
      • source
      • zip
      • writeFile
      • readFile
      • decodeChar
      • encodeChar
      • decodeUTF8
      • encodeUTF8
      • writeFileUTF8
      • readFileUTF8
      • feed
      • each
      • collectString
      • writeLine
      • readLine
      • snapshot
        • snapshot
      • teeing
      • manyTee
      • tee
      • for
      • map
      • tap
      • boundary
      • optionally
      • index
      • limit
      • sum
      • product
      • collectList
      • collectArray
      • collectBytes
      • collectString
      • writeLine
      • readLine
      • source
    • stream
    • Jump to source: libraries/common/stream.effekt
      Example usage: examples/stdlib/stream
      • emit [A]
      • Describes push streams by emitting values of type `A`.
        • emit (value: A): Unit / {}
        • Describes push streams by emitting values of type `A`.
      • read [A]
      • Describes pull streams by reading values of type `A`.
        
        The producer can decide to `stop` emitting values.
        • read : A / {stop}
        • Describes pull streams by reading values of type `A`.
          
          The producer can decide to `stop` emitting values.
      • stop
      • Signal from a producer to a consumer that there are no further values.
        • stop : Nothing / {}
        • Signal from a producer to a consumer that there are no further values.
      • for [A] { stream: => Unit / {emit[A]} } { action: (A) => Unit }: Unit / {}
      • Like `for[A, R]`, but ignores the result of the stream, and consequently
        works for any type. Use this to annotate the type of stream elements
        `A` without having to also annotate `R`.
        
        e.g. for[Int] { prog() } { el => println(el) }
      • map [A, B] { function: (A) => B } { stream: => Unit / {emit[A]} }: Unit / {emit[B]}
      • Applies the given function on each emitted element and reemits the result.
        
        e.g. map[Int,String]{ x => x.show } { range(0, 5) }
      • tap [A] { action: => A / {stop} } { reader: => Unit / {read[A]} }: Unit / {}
      • Runs the given action whenever the given reader reads an element.
        
        e.g. var i = -1; with tap { i = i + 1; i }; reader()
      • fix [T] { one: (T) => Unit / {emit[T]} } { stream: => Unit / {emit[T]} }: Unit / {}
      • Runs the stream as a message queue,
        handling each of its elements that can produce new elements ("messages"),
        until a fixed point is reached.
      • each [A] (list: List[A]): Unit / {emit[A]}
      • Turns a `list` into a producer of a push stream
        by emitting each contained value left-to-right.
      • each [T] (array: Array[T]): Unit / {emit[T]}
      • Turns an `array` into a producer of a push stream
        by emitting each contained value from 0 to length - 1.
      • each (bytes: ByteArray): Unit / {emit[Byte]}
      • Turns `bytes` into a producer of a push stream
        by emitting each contained value from 0 to length - 1.
      • each [K, V] (map: Map[K, V]): Unit / {emit[Tuple2[K, V]]}
      • Turns a `map` into a producer of a push stream
        of `(key, value)` pairs by emitting each contained *in order*.
      • eachKey [K, V] (map: Map[K, V]): Unit / {emit[K]}
      • Turns a `map` into a producer of a push stream
        of its keys by emitting each contained *in order*.
      • eachValue [K, V] (map: Map[K, V]): Unit / {emit[V]}
      • Turns a `map` into a producer of a push stream
        of its values by emitting each contained.
      • each [A] (set: Set[A]): Unit / {emit[A]}
      • Turns a `set` into a producer of a push stream
        by emitting each contained *in order*.
      • boundary { program: => Unit / {stop} }: Unit / {}
      • exhaustively { program: => Unit / {stop} }: Unit / {}
      • Run `program` forever until `stop` is thrown.
      • many [A] { action: => A / {stop} }: Unit / {emit[A]}
      • some [A] { action: => A / {stop} }: Unit / {emit[A], stop}
      • optionally { program: => Unit / {stop} }: Bool / {}
      • attempt [R] { program: => R / {stop} } { fallback: => R }: R / {}
      • Indexed [A] (index: Int, value: A)
      • In Effekt lower bounds are inclusive and upper bounds are exclusive
      • range (lower: Int, upper: Int): Unit / {emit[Int]}
      • rangeTo (upper: Int): Unit / {emit[Int]}
      • rangeFrom (lower: Int): Unit / {emit[Int]}
      • index [A] { stream: => Unit / {emit[A]} }: Unit / {emit[Indexed[A]]}
      • limit [A] (number: Int) { stream: => Unit / {emit[A]} }: Unit / {emit[A]}
      • If `number` is zero or negative it does nothing
      • replicate [A] (number: Int) { action: => A }: Unit / {emit[A]}
      • If `number` is zero or negative it does nothing
      • iterate [A] (initial: A) { step: (A) => A }: Unit / {emit[A]}
      • Creates an infinite iterated stream given by an `initial` seed and a `step` function:
        iterate(a){f} ~> a, f(a), f(f(a)), f(f(f(a))), ...
      • sum { stream: => Unit / {emit[Int]} }: Int / {}
      • product { stream: => Unit / {emit[Int]} }: Int / {}
      • collectList [A] { stream: => Unit / {emit[A]} }: List[A] / {}
      • collectArray [A] { stream: => Unit / {emit[A]} }: Array[A] / {}
      • collectBytes { stream: => Unit / {emit[Byte]} }: ByteArray / {}
      • feed [T, R] (list: List[T]) { reader: => R / {read[T]} }: R / {}
      • collectMap [K, V, R] (compare: (K, K) => Ordering at {}) { stream: => R / {emit[Tuple2[K, V]]} }: Tuple2[R, Map[K, V]] / {}
      • collectMap [K, V] (compare: (K, K) => Ordering at {}) { stream: => Any / {emit[Tuple2[K, V]]} }: Map[K, V] / {}
      • collectSet [A, R] (compare: (A, A) => Ordering at {}) { stream: => R / {emit[A]} }: Tuple2[R, Set[A]] / {}
      • collectSet [A] (compare: (A, A) => Ordering at {}) { stream: => Any / {emit[A]} }: Set[A] / {}
      • feed [T, R] (array: Array[T]) { reader: => R / {read[T]} }: R / {}
      • feed [R] (bytes: ByteArray) { reader: => R / {read[Byte]} }: R / {}
      • source [A] { stream: => Unit / {emit[A]} } { reader: => Unit / {read[A]} }: Unit / {}
      • zip [A, B] { stream1: => Unit / {emit[A]} } { stream2: => Unit / {emit[B]} } { action: (A, B) => Unit }: Unit / {}
      • Combines two streams together producing a stream of pairs in lockstep.
        Terminates when either of them terminates.
      • writeFile [R] (path: String) { stream: => R / {emit[Byte]} }: R / {Exception[IOError]}
      • readFile [R] (path: String) { reader: => R / {read[Byte]} }: R / {Exception[IOError]}
      • decodeChar : Char / {read[Byte], stop}
      • encodeChar (char: Char): Unit / {emit[Byte]}
      • decodeUTF8 [R] { reader: => R / {read[Char]} }: R / {read[Byte]}
      • encodeUTF8 [R] { stream: => R / {emit[Char]} }: R / {emit[Byte]}
      • writeFileUTF8 [R] (path: String) { stream: => R / {emit[Char]} }: R / {Exception[IOError]}
      • readFileUTF8 [R] (path: String) { reader: => R / {read[Char]} }: R / {Exception[IOError]}
      • feed [R] (string: String) { reader: => R / {read[Char]} }
      • each (string: String): Unit / {emit[Char]}
      • collectString { stream: => Unit / {emit[Char]} }: String / {}
      • writeLine { body: => Unit / {emit[Char]} }: Unit / {emit[Char]}
      • readLine { body: => Unit / {read[Char]} }: Unit / {read[Char], stop}
      • snapshot
        • snapshot : Unit / {}
      • teeing [A] { cons: { => Unit / {emit[A]}} => Unit } { prod: => Unit / {emit[A]} }: Unit / {emit[A]}
      • Use cons to handle prod, but also emit to the outside.
        The outside controls iteration, i.e., if cons aborts, the rest of prod will still be emitted.
        
        Starts by executing cons. Will stop cons during execution if the outside stops consuming.
        
        Example, printing all values consumed by the outside:
        
            with teeing{ {s} => for{s}{ e => println(e) } }
            prod() // some producer
        
        Laws-ish (hopefully):
        - hnd{ prd() } === hnd{ teeing{t}{ prd() } } for all hnd and t that calls its argument at most once (and has no other captures)
      • manyTee [A] { body: {{{ => Unit / {emit[A]}} => Unit, => Unit / {emit[A]}} => Unit / {emit[A]}} => Unit / {emit[A]} }: Unit / {}
      • Binds a `teeing`-like function in the body, stopping the inner producer once all consumers in tees are done consuming.
        
        Example of use, equivalent to `tee[A]{cns1}{cns2}{prd}`:
        
            manyTee[A] { {tee} =>
              with tee{cns1}
              with tee{cns2}
              prd()
            }
        
      • tee [A] { cons1: { => Unit / {emit[A]}} => Unit } { cons2: { => Unit / {emit[A]}} => Unit } { prod: => Unit / {emit[A]} }: Unit / {}
      • Streams prod to both cons1 and cons2. Stops once both cons1 and cons2 stopped.
        Only runs prod once, resuming at most once at each emit.
        
            var sumRes = 0
            var productRes = 0
            tee{ s => sumRes = sum{s} }{ s => productRes = product{s} }{ range(0, 10) }
            assertEquals(sum{ range(0, 10) }, sumRes)
            assertEquals(product{ range(0, 10) }, productRes)
        
      • for [A, R] { stream: => R / {emit[A]} } { action: (A) => Unit }: R / {}
      • Canonical handler of push streams that performs `action` for every
        value emitted by `stream`.
      • map [A, B, R] { function: (A) => B } { stream: => R / {emit[A]} }: R / {emit[B]}
      • tap [A, R] { action: => A / {stop} } { reader: => R / {read[A]} }: R / {}
      • boundary [R] { program: => R / {stop} }: Option[R] / {}
      • optionally [R] { program: => R / {stop} }: Option[R] / {}
      • index [A, R] { stream: => R / {emit[A]} }: R / {emit[Indexed[A]]}
      • limit [A, R] (number: Int) { stream: => R / {emit[A]} }: R / {emit[A], stop}
      • If `number` is zero or negative it does nothing
      • sum [R] { stream: => R / {emit[Int]} }: Tuple2[R, Int] / {}
      • product [R] { stream: => R / {emit[Int]} }: Tuple2[R, Int] / {}
      • collectList [A, R] { stream: => R / {emit[A]} }: Tuple2[R, List[A]] / {}
      • collectArray [A, R] { stream: => R / {emit[A]} }: Tuple2[R, Array[A]] / {}
      • collectBytes [R] { stream: => R / {emit[Byte]} }: Tuple2[R, ByteArray] / {}
      • collectString [R] { stream: => R / {emit[Char]} }: Tuple2[R, String] / {}
      • writeLine [R] { body: => R / {emit[Char]} }: R / {emit[Char]}
      • readLine [R] { body: => R / {read[Char]} }: R / {read[Char], stop}
      • source [A, R] { stream: => Unit / {emit[A]} } { reader: => R / {read[A]} }: R / {}