-
Effekt Library
- stream
- emit
- emit
- read
- read
- stop
- stop
- for
- map
- tap
- fix
- each
- each
- each
- each
- eachKey
- eachValue
- each
- boundary
- exhaustively
- many
- some
- optionally
- attempt
- Indexed
- range
- rangeTo
- rangeFrom
- index
- limit
- replicate
- iterate
- sum
- product
- collectList
- collectArray
- collectBytes
- feed
- collectMap
- collectMap
- collectSet
- collectSet
- feed
- feed
- source
- zip
- writeFile
- readFile
- decodeChar
- encodeChar
- decodeUTF8
- encodeUTF8
- writeFileUTF8
- readFileUTF8
- feed
- each
- collectString
- writeLine
- readLine
- snapshot
- snapshot
- teeing
- manyTee
- tee
- for
- map
- tap
- boundary
- optionally
- index
- limit
- sum
- product
- collectList
- collectArray
- collectBytes
- collectString
- writeLine
- readLine
- source
- stream Jump to source: libraries/common/stream.effekt
- emit
[A]
- emit
(value: A): Unit / {}
- read
[A]
- read
: A / {stop}
- stop
- stop
: Nothing / {}
- for
[A] { stream: => Unit / {emit[A]} } { action: (A) => Unit }: Unit / {}
- map
[A, B] { function: (A) => B } { stream: => Unit / {emit[A]} }: Unit / {emit[B]}
- tap
[A] { action: => A / {stop} } { reader: => Unit / {read[A]} }: Unit / {}
- fix
[T] { one: (T) => Unit / {emit[T]} } { stream: => Unit / {emit[T]} }: Unit / {}
- each
[A] (list: List[A]): Unit / {emit[A]}
- each
[T] (array: Array[T]): Unit / {emit[T]}
- each
(bytes: ByteArray): Unit / {emit[Byte]}
- each
[K, V] (map: Map[K, V]): Unit / {emit[Tuple2[K, V]]}
- eachKey
[K, V] (map: Map[K, V]): Unit / {emit[K]}
- eachValue
[K, V] (map: Map[K, V]): Unit / {emit[V]}
- each
[A] (set: Set[A]): Unit / {emit[A]}
- boundary
{ program: => Unit / {stop} }: Unit / {}
- exhaustively
{ program: => Unit / {stop} }: Unit / {}
- many
[A] { action: => A / {stop} }: Unit / {emit[A]}
- some
[A] { action: => A / {stop} }: Unit / {emit[A], stop}
- optionally
{ program: => Unit / {stop} }: Bool / {}
- attempt
[R] { program: => R / {stop} } { fallback: => R }: R / {}
- Indexed
[A] (index: Int, value: A)
- range
(lower: Int, upper: Int): Unit / {emit[Int]}
- rangeTo
(upper: Int): Unit / {emit[Int]}
- rangeFrom
(lower: Int): Unit / {emit[Int]}
- index
[A] { stream: => Unit / {emit[A]} }: Unit / {emit[Indexed[A]]}
- limit
[A] (number: Int) { stream: => Unit / {emit[A]} }: Unit / {emit[A]}
- replicate
[A] (number: Int) { action: => A }: Unit / {emit[A]}
- iterate
[A] (initial: A) { step: (A) => A }: Unit / {emit[A]}
- sum
{ stream: => Unit / {emit[Int]} }: Int / {}
- product
{ stream: => Unit / {emit[Int]} }: Int / {}
- collectList
[A] { stream: => Unit / {emit[A]} }: List[A] / {}
- collectArray
[A] { stream: => Unit / {emit[A]} }: Array[A] / {}
- collectBytes
{ stream: => Unit / {emit[Byte]} }: ByteArray / {}
- feed
[T, R] (list: List[T]) { reader: => R / {read[T]} }: R / {}
- collectMap
[K, V, R] (compare: (K, K) => Ordering at {}) { stream: => R / {emit[Tuple2[K, V]]} }: Tuple2[R, Map[K, V]] / {}
- collectMap
[K, V] (compare: (K, K) => Ordering at {}) { stream: => Any / {emit[Tuple2[K, V]]} }: Map[K, V] / {}
- collectSet
[A, R] (compare: (A, A) => Ordering at {}) { stream: => R / {emit[A]} }: Tuple2[R, Set[A]] / {}
- collectSet
[A] (compare: (A, A) => Ordering at {}) { stream: => Any / {emit[A]} }: Set[A] / {}
- feed
[T, R] (array: Array[T]) { reader: => R / {read[T]} }: R / {}
- feed
[R] (bytes: ByteArray) { reader: => R / {read[Byte]} }: R / {}
- source
[A] { stream: => Unit / {emit[A]} } { reader: => Unit / {read[A]} }: Unit / {}
- zip
[A, B] { stream1: => Unit / {emit[A]} } { stream2: => Unit / {emit[B]} } { action: (A, B) => Unit }: Unit / {}
- writeFile
[R] (path: String) { stream: => R / {emit[Byte]} }: R / {Exception[IOError]}
- readFile
[R] (path: String) { reader: => R / {read[Byte]} }: R / {Exception[IOError]}
- decodeChar
: Char / {read[Byte], stop}
- encodeChar
(char: Char): Unit / {emit[Byte]}
- decodeUTF8
[R] { reader: => R / {read[Char]} }: R / {read[Byte]}
- encodeUTF8
[R] { stream: => R / {emit[Char]} }: R / {emit[Byte]}
- writeFileUTF8
[R] (path: String) { stream: => R / {emit[Char]} }: R / {Exception[IOError]}
- readFileUTF8
[R] (path: String) { reader: => R / {read[Char]} }: R / {Exception[IOError]}
- feed
[R] (string: String) { reader: => R / {read[Char]} }
- each
(string: String): Unit / {emit[Char]}
- collectString
{ stream: => Unit / {emit[Char]} }: String / {}
- writeLine
{ body: => Unit / {emit[Char]} }: Unit / {emit[Char]}
- readLine
{ body: => Unit / {read[Char]} }: Unit / {read[Char], stop}
- snapshot
- snapshot
: Unit / {}
- teeing
[A] { cons: { => Unit / {emit[A]}} => Unit } { prod: => Unit / {emit[A]} }: Unit / {emit[A]}
- manyTee
[A] { body: {{{ => Unit / {emit[A]}} => Unit, => Unit / {emit[A]}} => Unit / {emit[A]}} => Unit / {emit[A]} }: Unit / {}
- tee
[A] { cons1: { => Unit / {emit[A]}} => Unit } { cons2: { => Unit / {emit[A]}} => Unit } { prod: => Unit / {emit[A]} }: Unit / {}
- for
[A, R] { stream: => R / {emit[A]} } { action: (A) => Unit }: R / {}
- map
[A, B, R] { function: (A) => B } { stream: => R / {emit[A]} }: R / {emit[B]}
- tap
[A, R] { action: => A / {stop} } { reader: => R / {read[A]} }: R / {}
- boundary
[R] { program: => R / {stop} }: Option[R] / {}
- optionally
[R] { program: => R / {stop} }: Option[R] / {}
- index
[A, R] { stream: => R / {emit[A]} }: R / {emit[Indexed[A]]}
- limit
[A, R] (number: Int) { stream: => R / {emit[A]} }: R / {emit[A], stop}
- sum
[R] { stream: => R / {emit[Int]} }: Tuple2[R, Int] / {}
- product
[R] { stream: => R / {emit[Int]} }: Tuple2[R, Int] / {}
- collectList
[A, R] { stream: => R / {emit[A]} }: Tuple2[R, List[A]] / {}
- collectArray
[A, R] { stream: => R / {emit[A]} }: Tuple2[R, Array[A]] / {}
- collectBytes
[R] { stream: => R / {emit[Byte]} }: Tuple2[R, ByteArray] / {}
- collectString
[R] { stream: => R / {emit[Char]} }: Tuple2[R, String] / {}
- writeLine
[R] { body: => R / {emit[Char]} }: R / {emit[Char]}
- readLine
[R] { body: => R / {read[Char]} }: R / {read[Char], stop}
- source
[A, R] { stream: => Unit / {emit[A]} } { reader: => R / {read[A]} }: R / {}
Example usage: examples/stdlib/stream
Describes push streams by emitting values of type `A`.
Describes push streams by emitting values of type `A`.
Describes pull streams by reading values of type `A`. The producer can decide to `stop` emitting values.
Describes pull streams by reading values of type `A`. The producer can decide to `stop` emitting values.
Signal from a producer to a consumer that there are no further values.
Signal from a producer to a consumer that there are no further values.
Like `for[A, R]`, but ignores the result of the stream, and consequently works for any type. Use this to annotate the type of stream elements `A` without having to also annotate `R`. e.g. for[Int] { prog() } { el => println(el) }
Applies the given function on each emitted element and reemits the result. e.g. map[Int,String]{ x => x.show } { range(0, 5) }
Runs the given action whenever the given reader reads an element. e.g. var i = -1; with tap { i = i + 1; i }; reader()
Runs the stream as a message queue, handling each of its elements that can produce new elements ("messages"), until a fixed point is reached.
Turns a `list` into a producer of a push stream by emitting each contained value left-to-right.
Turns an `array` into a producer of a push stream by emitting each contained value from 0 to length - 1.
Turns `bytes` into a producer of a push stream by emitting each contained value from 0 to length - 1.
Turns a `map` into a producer of a push stream of `(key, value)` pairs by emitting each contained *in order*.
Turns a `map` into a producer of a push stream of its keys by emitting each contained *in order*.
Turns a `map` into a producer of a push stream of its values by emitting each contained.
Turns a `set` into a producer of a push stream by emitting each contained *in order*.
Run `program` forever until `stop` is thrown.
In Effekt lower bounds are inclusive and upper bounds are exclusive
If `number` is zero or negative it does nothing
If `number` is zero or negative it does nothing
Creates an infinite iterated stream given by an `initial` seed and a `step` function: iterate(a){f} ~> a, f(a), f(f(a)), f(f(f(a))), ...
Combines two streams together producing a stream of pairs in lockstep. Terminates when either of them terminates.
Use cons to handle prod, but also emit to the outside. The outside controls iteration, i.e., if cons aborts, the rest of prod will still be emitted. Starts by executing cons. Will stop cons during execution if the outside stops consuming. Example, printing all values consumed by the outside: with teeing{ {s} => for{s}{ e => println(e) } } prod() // some producer Laws-ish (hopefully): - hnd{ prd() } === hnd{ teeing{t}{ prd() } } for all hnd and t that calls its argument at most once (and has no other captures)
Binds a `teeing`-like function in the body, stopping the inner producer once all consumers in tees are done consuming. Example of use, equivalent to `tee[A]{cns1}{cns2}{prd}`: manyTee[A] { {tee} => with tee{cns1} with tee{cns2} prd() }
Streams prod to both cons1 and cons2. Stops once both cons1 and cons2 stopped. Only runs prod once, resuming at most once at each emit. var sumRes = 0 var productRes = 0 tee{ s => sumRes = sum{s} }{ s => productRes = product{s} }{ range(0, 10) } assertEquals(sum{ range(0, 10) }, sumRes) assertEquals(product{ range(0, 10) }, productRes)
Canonical handler of push streams that performs `action` for every value emitted by `stream`.
If `number` is zero or negative it does nothing