An Early look at Kotlin Coroutine’s Flow

Mayowa Adegeye
ProAndroidDev
Published in
7 min readApr 16, 2019

--

Starting from version 1.2.0-alpha-2 of coroutines, the library will be adding a new type called Flow which is an abstraction for a cold stream. To understand what this means and why it is needed, it is pertinent to get the difference between a hot stream vs a cold stream.

What is a cold stream

Currently, a stream of items can be transferred using a Channel<T>. For example using the produce coroutine builder, we can create a stream of items that can be consumed by a receiver, but in this case, the body of the producer represents a hot stream. The code within the producer is invoked and items are emitted with or without the presence of a consumer.

Hot stream of ints that runs and emits two items before suspending — no consumer present

The above snippet shows a producer thats send out a stream of Ints and returns a ReceiveChannel which a consumer can use to consume each emitted item. However, even without the presence of any consumer, the body of the producer executes. Before the code suspends, it outputs the following:

producer body
sent 0
sent 1
//code suspends at this point since buffer size is 2

Also, when the items sent through the channel have been consumed, no other consumer can consume those same items from the channel. The channel closes automatically when it is done emitting items with or without the items being consumed.

The snippet below shows a producer that sends items through its channel and completes successfully without the items being consumed by any consumer. Since this is a hot stream and we have a buffer size larger than number of emitted items, this code runs to completion without suspending.

The property that the code within the producer gets executed and items are emitted without the presence of a consumer is what basically defines a hot stream. A side effect of this is that items are consumed only once.

A cold stream on the other hand is just like a blue print defining a computation that emits a stream of items. Nothing gets executed or emitted until it is being requested for by a consumer. Observable and Flowable types in RxJava are an example of a structure that represents a cold stream of items. Its body does not get executed until it is subscribed to by a subscriber. Upon subscription, each subscriber gets its own copy of the items emitted by the source.

The new Flow type is going to give us exactly this same behaviour. A call to any of its builders returns a flow instance which represents a computation that emits a stream of items. The execution of the code within the builder happens when a terminal operator is invoked.

Flow type itself is an interface with a single function, collect that takes a FlowCollector

Flow type with a suspending function to collect produced items

Items produced from the flow are sent out to the consumer through the FlowCollector which in itself is also a single function interface. The emit function in the FlowCollector will be called to emit produced items to the downstream.

FlowCollector interface with the emit suspend function

Both the collect function from Flow and emit from FlowCollector are suspend functions, this means that back pressure is baked into this by default. If the consumer consumes items slower that producer produces, the call to emit simply suspends until the consumer is able to receive the next item.

Flow Builders

To create a Flow, you need a Flow builder and the most important one of the builders which is fundamental to the others is the flow function shown below:

creates a Flow from the given suspension function

This function takes as a parameter a suspending extension function on a FlowCollector<T>, and returns an implementation of a Flow which invokes our suspending function block when the collect function is called. The SafeCollector wrapper is to ensure that the coroutineContext where the Flow was collected is the same context used to emit the items. An exception gets thrown if you try to change context while emitting items. For example the following code will throw an IllegalStateException:

IllegalStateException arising from attempting to switch context before emit

Since the suspending lambda we are passing to the flow function is an extension on FlowCollector , we are able to call emit within this lambda to send items to the collector. For example:

Send out items calling emit from the FlowCollector

flowViaChannel is an alternative builder that allows you create a flow from items sent to a channel. This takes an optional buffer size and a suspending lambda that takes a SendChannel as a parameter. Items sent to this channel are going to be emitted as a Flow when a terminal operator is invoked.

flowViaChannel builder
flowViaChannel Example

flowOn and flowWith are operators that can be used to switch the flow’s context similar what we do with subscribeOn and observeOn in RxJava . While flowOn is used to change the context of the upstream, i.e everything before it:

flowOn, switching the context where the upstream is executed, think of Rx subscribe

flowWith switches the context where the code within its body is executed. It takes a Dispatcher , an optional buffer size and a function which is extension function on a Flow .

Within this extension function, you define the set of operations you want to execute in the new context. For example:

There is also a builder function called flowOf that allows you to convert a vararg to a flow that emits each vararg item.

flowOf syntax

An asFlow extension function is also defined on the following types, that converts from these types to a Flow :

(() -> T).asFlow() (suspend () -> T).asFlow()Iterable<T>.asFlow()Iterator<T>.asFlow() Sequence<T>.asFlow() Array<T>.asFlow() IntArray<T>.asFlow() LongArray<T>.asFlow()IntRange<T>.asFlow()LongRange<T>.asFlow() 

Given these extension functions above, we can create a Flow in the following ways:

Operators

Asides the context switching operators flowOn and flowWith which I already introduced above, Flow comes with a host of other operators most of which are the familiar and expected operators on a stream type. Some of these operators are listed below:

  • map
  • flatMapConcat
  • flatMapMerge
  • filter
  • filterNot
  • combineLatest
  • zip
  • distinctUntilChanged
  • drop
  • take
  • takeWhile
  • dropWhile

Flow is not currently as operator rich as RxJava but as at when this post was written, there are 27 operators on the Flow type. Work is currently still on going on the project and some more operators might be added. On the other hand, the simplistic nature of the library makes it quite easy to write your own operators if the need for that arises. For example, I think an implementation of a mergeWith operator might look like this:

Merge a flow with another with possible interleaving

Terminals

Terminal operators are the way we trigger the execution of a Flow and consumption of the emitted items. As said earlier, a Flow represents a cold stream which means it is always in a sort of dormant state until a terminal operator is invoked, which causes a trigger on the flow’s collect function, which in turn triggers the execution of the flow.

Central to all other Flow terminal operators is another collect function declared as an extension function on the Flow type. This collect extension function takes a suspending lambda that gets passed each item emitted from the flow. We can do whatever processing we want on the items within this lambda.

collect extension function

As shown in the image above, calling the collect terminal operator with a suspending lambda callback, initiates a call to the Flow ‘s collect function passing an implementation of a FlowCollector that forwards the emitted items to our lambda.

A sample usage is shown below

collects each item and send to the console

If you are expecting a single item from the Flow , use the Flow<T>.single() terminal operator. Flow<T>.singleOrNull() returns either a single item or null. These operators are somewhat synonymous to the Single and Maybe types in RxJava .

There is a count terminal operator and a second variant that takes a predicate, that returns the number of emitted items and the number of items that satisfies the predicate respectively.

returns the count of items satisfying the given predicate

To accumulate emitted items, you have the reduce and fold operators. These two terminal operators are similar in the way they work. They both take an accumulator function but fold also take an initial value that the accumulator begins with. At the end, they both return a single value which is the result of accumulating all emitted items using the accumulator suspending function.

Finally, to get your emitted items as a list or as a set, you have the toList and toSet terminal operators which returns a List<T> and Set<T> respectively.

Error Handling

Use onErrorReturn and onErrorCollect to recover from an exception thrown by an upstream flow. onErrorReturn returns a single item when an error occurs while onErrorCollect can be used to switch to another flow entirely. For example:

onErrorCollect switching to another flow

Both operators take an optional predicate parameter if you only want to recover from the error if the error satisfies a particular condition. The default value for this predicate returns true for all errors.

With the retry operator, you can retry collecting a flow N number of times or retry the collection when an optional predicate is satisfied. It should be noted that retry only kicks in when the error occurs during collection and not while the downstream is processing emitted items.

In conclusion, a lot of work has already been done on this library by the awesome folks at JetBrains. More changes are expected before the final release but what has already been done is pretty impressive. I look forward to how this project develops in the future.

For more information and to follow the progress of this awesome library, check out the project on Github, also check out this article on cold streams by Roman Elizarov.

--

--