Channels as a communication primitive #
Introduction #
The fourth, yet not mentioned, abstraction of both Kotlin Coroutines and Scala Gears is the channel.
Channels represent the primitive communication and coordination means to exchange Future
(or coroutines in the case of Kotlin) results. They are, at least conceptually, very similar to a queue where it is possible to send (and receive) data – basically, exploiting the producer-consumer pattern.
classDiagram class `SendableChannel[-T]` { << trait >> +sendSource(x: T) Async.Source[Either[Closed, Unit]] +send(x: T)(using Async) Unit } class `ReadableChannel[+T]` { << trait >> +readSource Async.Source[Either[Closed, T]] +read()(using Async) Either[Closed, T] } class `Channel[T]` { << trait >> +asSendable: SendableChannel[T] +asReadable: ReadableChannel[T] +asCloseable: java.io.Closeable } namespace java io { class Closeable { << interface >> +close() } } `SendableChannel[-T]` <|-- `Channel[T]` Closeable <|-- `Channel[T]` `ReadableChannel[+T]` <|-- `Channel[T]`
The channel is defined through three distinct interfaces: SendableChannel[-T]
, ReadableChannel[+T]
and Channel[T]
, where the latter extends from both SendableChannel
and ReadableChannel
. Typically, a Channel
is created and a SendableChannel
and ReadableChannel
instances are respectively provided to the producer and the consumer, restricting their access to it. The same, almost identical, design is present also in Kotlin Coroutines where SendChannel
and ReceiveChannel
take over, respectively, the Gears SendableChannel
and ReadableChannel
.
Channel
inherits fromjava.io.Closable
, making them closable objects: once closed, they raiseChannelClosedException
when attempting to write to them and immediately return aLeft(Closed)
when attempting to read from them, preventing the consumer from finishing reading all the values sent on the channel before its closing. This is not the case for Kotlin Coroutines where closing a channel indicates that no more values are coming, but doesn’t prevent consuming already sent values. Moreover, in Kotlin is possible to use a regular for loop to receive elements from a channel (blocking the coroutine):
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
for (y in channel) println(y) // blocks until channel is closed
println("Done!")
Similar behavior can be achieved also in Gears extending the framework with the concept of Terminable
channel. After all, closing a channel in coroutines is a matter of sending a special token to it, allowing stop the iteration as soon as this token is received.
[The full implementation can be found in commons
submodule, pimping
package.]
/** A token to be sent to a channel to signal that it has been terminated. */
case object Terminated
type Terminated = Terminated.type
/** A union type of [[T]] and [[Terminated]]. */
type Terminable[T] = T | Terminated
/** Exception being raised by [[TerminableChannel.send()]] on terminated [[TerminableChannel]]. */
class ChannelTerminatedException extends Exception
/** A [[Channel]] that can be terminated, signalling no more items will be sent,
* still allowing to consumer to read pending values.
* Trying to `send` values after its termination arise a [[ChannelTerminatedException]].
* When one consumer reads the [[Terminated]] token, the channel is closed. Any subsequent
* read will return `Left(Channel.Closed)`.
*/
trait TerminableChannel[T] extends Channel[Terminable[T]]:
def terminate()(using Async): Unit
object TerminableChannel:
/** Creates a [[TerminableChannel]] backed to [[BufferedChannel]]. */
def ofBuffered[T](size: Int): TerminableChannel[T] = TerminableChannelImpl(BufferedChannel(size))
/** Creates a [[TerminableChannel]] backed to an [[UnboundedChannel]]. */
def ofUnbounded[T]: TerminableChannel[T] = TerminableChannelImpl(UnboundedChannel())
private class TerminableChannelImpl[T](c: Channel[Terminable[T]]) extends TerminableChannel[T]:
opaque type Res[R] = Either[Channel.Closed, R]
private var _terminated: Boolean = false
override val readSource: Async.Source[Res[Terminable[T]]] =
c.readSource.transformValuesWith:
case Right(Terminated) => c.close(); Left(Channel.Closed)
case v @ _ => v
override def sendSource(x: Terminable[T]): Async.Source[Res[Unit]] =
synchronized:
if _terminated then throw ChannelTerminatedException()
else if x == Terminated then _terminated = true
c.sendSource(x)
override def close(): Unit = c.close()
override def terminate()(using Async): Unit =
try send(Terminated)
// It happens only at the close of the channel due to the call (inside Gears library) of
// a CellBuf.dequeue(channels.scala:239) which is empty!
catch case _: NoSuchElementException => () // e.printStackTrace()
Now, thanks to this extension, also in Scala Gears is possible to write:
val channel = TerminableChannel.ofUnbounded[Int]
Future:
(0 until 10).foreach(channel.send(_))
channel.terminate() // we're done sending
channel.foreach(println(_)) // blocks until channel is closed
println("Done!")
[Other tests can be found in TerminableChannelTest
.]
On top of this new abstraction is possible to implement, for example, the foreach
and toSeq
methods, which can be useful to wait for all the items sent over the channel.
object TerminableChannelOps:
extension [T: ClassTag](c: TerminableChannel[T])
/** Consume channel items, executing the given function [[f]] for each element.
* This is a blocking operation. */
@tailrec
def foreach[U](f: T => U)(using Async): Unit = c.read() match
case Left(Channel.Closed) => ()
case Right(value) =>
value match
case Terminated => ()
case v: T => f(v); foreach(f)
/** @return a [[Seq]] containing channel items, after having them read.
* This is a blocking operation. */
def toSeq(using Async): Seq[T] =
var results = Seq[T]()
c.foreach(t => results = results :+ t)
results
Three types of channels exist:
- Synchronous Channels: links a
read
request with asend
within a rendezvoussend
(read
) suspend the process until a consumerread
(send
) the value;- in Kotlin they are called Rendezvous Channels.
- Buffered Channels: a version of a channel with an internal buffer of fixed size
send
suspend the producer process if it is full; otherwise, it appends the value to the buffer, returning immediately;read
suspend if the channel is empty, waiting for a new value.
- Unbounded Channels: a version of a channel with an unbounded buffer
- if the programs run out of memory you can get an out-of-memory exception!
- in Kotlin they are called Unlimited Channel.
Kotlin offers also a fourth type: the Conflated Channel, where every new element sent to it overwrites the previously sent one, never blocking, so that the receiver gets always the latest element.
Concerning channel behavior, it is important to note that:
- Multiple producers can send data to the channel, as well as multiple consumers can read them, but each element is handled only once, by one of them, i.e. consumers compete with each other for sent values;
- Once the element is handled, it is immediately removed from the channel;
- Channels are fair:
send
andread
operations to channels are fair w.r.t. the order of their invocations from multiple threads (they are served in first-in first-out order).
Analyzer example #
To show channels in action an example has been prepared:
Idea: we want to realize a little asynchronous library allowing clients to collect the common statistics about repositories (issues, stars, last release) and contributors of a given GitHub organization.
The final result is a GUI application that, given an organization name, starts the analysis of all its repositories, listing their information along with all their contributors as soon as they are computed. Moreover, the application allows the user to cancel the current computation at any point in time.
[This example has been inspired by this tutorial.]
To start the application:
./gradlew analyzer-<direct | direct-kt | monadic>:run
In order to run the application you need to place inside the
analyzer-commons
directory a.env
file containing your personal GitHub access token, like:GH_TOKEN=....
or having set an environment variable named
GH_TOKEN
.
The example is structured in two different packages: lib
and client
. The former contains the logic of the library, while the latter contains the application (client code).
As usual, it has been implemented using monadic Future
s, as well as using Scala Gears and Kotlin Coroutines.
Future monadic version #
[The sources are available inside the analyzer-monadic
submodule.]
The entry point of the library is the Analyzer
interface which takes in input the organization name and a function through which is possible to react to results while they are computed.
Since we want to achieve cancellation, the monadic version leverages Monix Task
, which is returned by the analyze
method wrapped in an EitherT
monad transformer to allow handling errors functionally.
/** A generic analyzer of organization/group/workspace repositories. */
trait Analyzer:
/** @return a [[EitherT]] encapsulating a [[Task]] that performs the analysis of the
* [[organizationName]]'s repositories, providing the results incrementally to the
* [[updateResults]] function.
*/
def analyze(organizationName: String)(
updateResult: RepositoryReport => Unit,
): EitherT[Task, String, Seq[RepositoryReport]]
To retrieve data from GitHub, a RepositoryService
interface has been created, following the same pattern:
/** A service exposing functions to retrieve data from a central hosting repository service. */
trait RepositoryService:
/** @return a [[EitherT]] encapsulating a [[Task]] which get all the repositories
* owned by [[organizationName]].
*/
def repositoriesOf(organizationName: String): EitherT[Task, String, Seq[Repository]]
/** @return a [[EitherT]] encapsulating a [[Task]] which get all the contributors
* of [[repositoryName]] owned by [[organizationName]].
*/
def contributorsOf(organizationName: String, repositoryName: String): EitherT[Task, String, Seq[Contribution]]
/** @return a [[EitherT]] encapsulating a [[Task]] which get the last release
* of [[repositoryName]] owned by [[organizationName]].
*/
def lastReleaseOf(organizationName: String, repositoryName: String): EitherT[Task, String, Release]
The implementation of the Analyzer
is shown in the following code snippet and performs the following steps:
- first, the list of repositories is retrieved;
- if no error occurs, the analysis of each repository is performed concurrently, thanks to the
parTraverse
operator; - the analysis of each repository consists of retrieving the contributors and the last release of the repository and then updating the result through the
updateResult
function. Since both the contributors and last release retrieval are independent of each other, they are performed concurrently, thanks toTask.parZip2
.
override def analyze(organizationName: String)(
updateResult: RepositoryReport => Unit,
): EitherT[Task, String, Seq[RepositoryReport]] =
for
repositories <- repositoryService.repositoriesOf(organizationName)
reports <- repositories.parTraverse(r => EitherT.right(r.performAnalysis(updateResult)))
yield reports
extension (r: Repository)
private def performAnalysis(updateResult: RepositoryReport => Unit): Task[RepositoryReport] =
val contributorsTask = repositoryService.contributorsOf(r.organization, r.name).value
val releaseTask = repositoryService.lastReleaseOf(r.organization, r.name).value
for
result <- Task.parZip2(contributorsTask, releaseTask)
report = RepositoryReport(r.name, r.issues, r.stars, result._1.getOrElse(Seq.empty), result._2.toOption)
_ <- Task(updateResult(report))
yield report
Client-side, when a new session is requested, the Analyzer
is used to start the computation, during which the single reports are aggregated and the UI is updated.
Whenever desired, the current computation can be stopped by canceling the Monix CancelableFuture
returned by the runToFuture
method, through which the returned Task from the Analyzer
is started.
class MonadicAppController extends AppController:
import monix.execution.Scheduler.Implicits.global
private val view = AnalyzerView.gui(this)
private val analyzer = Analyzer(RepositoryService.ofGitHub)
private var currentComputation: Option[CancelableFuture[Unit]] = None
view.run()
override def runSession(organizationName: String): Unit =
var organizationReport: OrganizationReport = (Map(), Set())
val f = analyzer.analyze(organizationName) { report =>
organizationReport = organizationReport.mergeWith(report)
view.update(organizationReport)
}.value.runToFuture.map { case Left(value) => view.error(value); case Right(_) => view.endComputation() }
currentComputation = Some(f)
override def stopSession(): Unit = currentComputation foreach (_.cancel())
Scala Gears version #
[The sources are available inside the analyzer-direct
submodule.]
The interfaces of the Direct Style with Gears differ from the monadic one by their return type, which is a simpler Either
data type, and by the fact they are suspendable functions, hence they require an Async context to be executed.
This is the first important difference: the analyze
method, differently from the monadic version, doesn’t return immediately the control; instead, it suspends the execution of the client until the result is available (though offering the opportunity to react to each update).
This obeys the principle of explicit asynchrony: if the client wants to perform this operation asynchronously, it has to opt in explicitly, using a Future
.
/** A generic analyzer of organization/group/workspace repositories. */
trait Analyzer:
/** Performs a **suspending** analysis of the [[organizationName]]'s repositories,
* providing the results incrementally to the [[updateResults]] function.
* It may fail along the way!
* @return the overall results of the analysis.
*/
def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async, AsyncOperations, CanFail): Seq[RepositoryReport]
/** A service exposing functions to retrieve data from a central hosting repository service. */
trait RepositoryService:
/** Suspend the execution to get all the [[Repository]] owned by the given [[organizationName]].
* It may fail along the way!
* @return the [[Seq]]uence of [[Repository]].
*/
def repositoriesOf(organizationName: String)(using Async, CanFail): Seq[Repository]
/** Suspend the execution to get all the [[Contribution]] made by users to the given
* [[repositoryName]] owned by [[organizationName]]. It may fail along the way!
* @return the [[Seq]]uence of [[Contribution]].
*/
def contributorsOf(organizationName: String, repositoryName: String)(using Async, CanFail): Seq[Contribution]
/** Suspend the execution to get the last [[Release]] of the given [[repositoryName]]
* owned by [[organizationName]]. It may fail along the way!
* @return the last [[Release]] if it exists.
*/
def lastReleaseOf(organizationName: String, repositoryName: String)(using Async, CanFail): Release
The implementation of the Analyzer
leverages Channels to perform the concurrent analysis of the repositories:
override def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async, AsyncOperations, CanFail): Seq[RepositoryReport] = Async.group:
val reposInfo = repositoryService.repositoriesOf(organizationName) // 1
.map(_.performAnalysis.start()) // 2
val collector = Collector(reposInfo.toList*) // 3
reposInfo.foreach: _ =>
updateResults(collector.results.read().asTry.?.awaitResult.?) // 4
reposInfo.awaitAll // 5
extension (r: Repository)
protected def performAnalysis(using Async): Task[RepositoryReport] = Task:
Async.group:
val contributions = Future: // concurrent
either(repositoryService.contributorsOf(r.organization, r.name))
val release = Future: // concurrent (not strictly necessary, just to reveal the intent)
either(repositoryService.lastReleaseOf(r.organization, r.name))
RepositoryReport(r.name, r.issues, r.stars, contributions.await.getOrElse(Seq.empty), release.await.toOption)
- first, we get all the repositories of the requested organization
- for each of them, the contributors and the last release are retrieved concurrently, starting a
Future
Future
results are gathered inside aCollector
allowing to collect a list of futures into a channel of futures, arriving as they finish.- the retrieval of the contributors and the last release are performed in parallel
- read results from the channel as they come, calling the
updateResult
reaction function. - Overall results are returned to the client.
Although it works, the proposed solution suffers from a performance issue when the organization we want to analyze has a large number of repositories.
Indeed, the GitHub API, like many ReSTful APIs, implements pagination: if the response includes many results, they are paginated, returning a subset of them; it is the responsibility of the client to request more data (pages).
Until now, the RepositoryService
has been implemented to return the whole results in one shot, leading to suspension until all pages are retrieved.
It would be desirable, instead, to start performing the analysis as soon as one page is obtained from the API.
To do so, the interface of the RepositoryService
has been extended with new methods, incremental***
, returning a TerminableChannel
of results:
trait RepositoryService:
/** @return a [[Terminable]] [[ReadableChannel]] with the [[Repository]] owned by the given
* [[organizationName]], wrapped inside a [[Either]] for errors management.
*/
def incrementalRepositoriesOf(
organizationName: String,
)(using Async): TerminableChannel[Either[String, Repository]]
/** @return a [[Terminable]] [[ReadableChannel]] with the [[Contribution]] made by users to the
* given [[repositoryName]] owned by [[organizationName]], wrapped inside a [[Either]].
*/
def incrementalContributorsOf(
organizationName: String,
repositoryName: String,
)(using Async): TerminableChannel[Either[String, Contribution]]
// ...
Then, the implementation of the analyze
method becomes:
override def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async, AsyncOperations, CanFail): Seq[RepositoryReport] = Async.group:
val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName) // 1
var futureResults = Seq[Future[RepositoryReport]]()
reposInfo.foreach: repository => // 2
futureResults = futureResults :+ Future: // 3
val report = repository.?.performAnalysis.start().awaitResult.?
synchronized(updateResults(report))
report
futureResults.awaitAllOrCancel // 4
- we get the channel of repositories from the repository service;
- the
foreach
method ofTerminableChannel
is used to iterate over all the repositories sent over the channel as soon as they are retrieved by the service. This is a blocking operation, i.e. it suspends until all the repositories are retrieved; - we start the analysis in a separate
Future
(i.e. thread): this allows you to start the analysis as soon as a repository is fetched by the channel, preventing starting the analysis of the next repository only when the previous one is finished;- since the analysis is started in a separate thread, we need to prevent the
updateResults
function from being called concurrently using asynchronized
block;
- since the analysis is started in a separate thread, we need to prevent the
- once all the repositories are retrieved, i.e. the
foreach
terminates, we wait for the completion of all the startedFuture
s. Indeed, when theforeach
terminates, we have the guarantee that all started futures have been started, but not yet completed!
Kotlin Coroutines version #
[The sources are available inside the analyzer-direct-kt
submodule.]
The analyzer interface reflects the Scala Gears one: a Result
is used in place of Either
, and the suspendable function udateResults
is marked with the suspend
keyword in place of the using Async
context.
interface Analyzer {
suspend fun analyze(
organizationName: String,
updateResults: suspend (RepositoryReport) -> Unit = { _ -> },
): Result<Set<RepositoryReport>>
}
Its channel-based implementation, despite syntactic differences, is also very similar to that of Scala Gears, at least conceptually:
- we get all the repositories;
- for each of them, an analysis is started to retrieve the contributors and the last release;
- each analysis is started in a separate coroutine whose results are sent to a channel;
- as usual, the contributors and the last release are retrieved concurrently, using the
async
coroutine builder;
- results are aggregated as they come from the channel.
override suspend fun analyze(
organizationName: String,
updateResults: suspend (RepositoryReport) -> Unit,
): Result<Set<RepositoryReport>> = coroutineScope {
runCatching {
val repositories = provider.repositoriesOf(organizationName).getOrThrow() // 1
val resultsChannel = analyzeAll(organizationName, repositories) // 2
collectResults(resultsChannel, repositories.size, updateResults) // 3
}
}
private fun CoroutineScope.analyzeAll(organizationName: String, repositories: List<Repository>) =
Channel<RepositoryReport>().also {
repositories.map { r ->
launch { // a new coroutine for each repository is started
val contributors = async { provider.contributorsOf(organizationName, r.name).getOrThrow() }
val release = provider.lastReleaseOf(organizationName, r.name).getOrThrow()
it.send(RepositoryReport(r.name, r.issues, r.stars, contributors.await(), release))
}
}
}
private suspend fun collectResults(
resultsChannel: Channel<RepositoryReport>,
expectedResults: Int,
updateResults: suspend (RepositoryReport) -> Unit,
) = mutableSetOf<RepositoryReport>().apply {
repeat(expectedResults) {
val report = resultsChannel.receive()
add(report)
updateResults(report)
}
resultsChannel.close()
}
Where, instead, Kotlin Coroutines shine is the implementation of the RepositoryService
for supporting incremental retrieval of repositories and contributors.
Indeed, Kotlin has a built-in support for cold streams, called Flow
. They are very similar (actually they have been inspired to) cold observable in reactive programming, and they are the perfect fit for functions that need to return a stream of asynchronously computed values.
The RepositoryService
has been here extended with new methods, flowing***
, returning a Flow
of results:
class GitHubRepositoryProvider {
fun flowingRepositoriesOf(organizationName: String): Flow<List<Repository>>
fun flowingContributorsOf(organizationName: String, repositoryName: String): Flow<List<Contribution>>
}
As already mentioned, the Flow
is a cold stream, meaning that it is not started until it is collect
ed. Once the collect
method is called a new stream is created and data starts to “flow”.
They offer several useful operators for transforming and combining them functionally (not a complete list):
Intermediate flow operators:
filter
/filterNot
to filter out unwanted values;map
to transform the values;transform
to implement more complex transformations (possibly involving suspending operations);take
and its variant (e.g.takeWhile
) to limit the number of values emitted;onEach
to perform side-effects for each value emitted.
Terminal flow operators:
- conversions to various collection types, like
toList
,toSet
; first
,last
,single
to retrieve the first, last or single value emitted;reduce
to perform some kind of operation over all items, reducing them to a single one;fold
to perform some kind of operation over all items, starting from an initial value, accumulating a result.
Flows combining operators:
merge
to combine multiple flows into a single one, emitting values from all of them;zip
combines the corresponding values of two flows;flatMapConcat
/flatMapMerge
to transform each value into a flow and then concatenate/merge them.
Moreover, like in Rx:
- it is possible to control the context in which the flow is executed using the
flowOn
operator, which changes the context for all the steps above it (so it is typically used as the last step in a function); - some backpressure strategies are supported, which are used to handle the situation when the producer is emitting values faster than the consumer can process them. In Kotlin Coroutines, the backpressure is managed by the
buffer
(and its variant, likeconflated
,sample
,debounce
) operator, which allows to buffer a certain number of values before the consumer starts to process them.
override suspend fun analyze(
organizationName: String,
updateResults: suspend (RepositoryReport) -> Unit,
): Result<Set<RepositoryReport>> = coroutineScope {
runCatching {
val reports = provider.flowingRepositoriesOf(organizationName)
.flatMapConcat { analyzeAll(it) }
.flowOn(Dispatchers.Default)
var allReports = emptySet<RepositoryReport>()
// until here just "configuration"
reports.collect {
updateResults(it)
allReports = allReports + it
}
allReports
}
}
Concerning flows, an important thing to note is that they are just asynchronous generators that run some suspending code when you collect them. Thus, per se, they don’t introduce new coroutines or concurrency mechanism.
To achieve concurrency, for example emitting values concurrently by multiple coroutines, is necessary to use a channelFlow
, a pre-cooked way to inject a coroutineContext
and a Channel
through which is possible to pass the values to be emitted from a background coroutines back to the main control flow:
fun analyzeAll(repositories: List<Repository>): Flow<RepositoryReport> = channelFlow {
repositories.forEach { repository ->
launch {
val release = async {
provider.lastReleaseOf(repository.organization, repository.name).getOrThrow()
}
provider.flowingContributorsOf(repository.organization, repository.name).toList().forEach {
// emit this value
send(RepositoryReport(repository.name, repository.issues, repository.stars, it, release.await()))
}
}
}
}
Introducing Flow
s in Gears
#
A similar abstraction of KotlinFlow
s can be implemented in Scala Gears leveragingTask
s andTerminableChannel
s. The following section describes the attempt made to implement it and what has been achieved.
- When building the
Flow
, the client provides a block of code through which emits values, which is wrapped inside aTask
that is started only when thecollect
method is called; - The values are sent (emitted) on a
TerminableChannel
which is created when thecollect
method is called;- the behavior of the
emit
method is defined inside theapply
method ofFlow
and injected inside caller code via the context parameter(it: FlowCollector[T]) ?=>
.
- the behavior of the
- Once the task has finished, the channel is terminated.
[Source code can be found in commons
submodule, pimpimg
package.]
/** An asynchronous cold data stream that emits values, inspired to Kotlin Flows. */
trait Flow[+T]:
/** Start the flowing of data which can be collected reacting through the given [[collector]] function. */
def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit
/** An interface modeling an entity capable of [[emit]]ting [[Flow]]s values. */
trait FlowCollector[-T]:
/** Emits a value to the flow. */
def emit(value: T)(using Async): Unit
object Flow:
/** Creates a new asynchronous cold [[Flow]] from the given [[body]].
* Since it is cold, it starts emitting values only when the [[Flow.collect]] method is called.
* To emit a value use the [[FlowCollector]] given instance.
*/
def apply[T](body: (it: FlowCollector[T]) ?=> Unit): Flow[T] =
val flow = FlowImpl[T]()
flow.task = Task:
val channel = flow.channel
flow.sync.release()
val collector: FlowCollector[T] = new FlowCollector[T]:
override def emit(value: T)(using Async): Unit = channel.send(Success(value))
try body(using collector)
catch case e: Exception => channel.send(Failure(e))
flow
private class FlowImpl[T] extends Flow[T]:
private[Flow] var task: Task[Unit] = uninitialized
private[Flow] var channel: TerminableChannel[Try[T]] = uninitialized
private[Flow] val sync = Semaphore(0)
override def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit =
val myChannel = TerminableChannel.ofUnbounded[Try[T]]
synchronized:
channel = myChannel
task.run.onComplete(() => myChannel.terminate())
// Ensure to leave the synchronized block after the task has been initialized
// with the correct channel instance.
sync.acquire()
myChannel.foreach(t => collector(t))
map
and flatMap
combinators have been implemented on top of Flow
:
object FlowOps:
extension [T](flow: Flow[T])
/** @return a new [[Flow]] whose values has been transformed according to [[f]]. */
def map[R](f: T => R): Flow[R] = new Flow[R]:
override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit =
catchFailure(collector):
flow.collect(item => collector(Success(f(item.get))))
/** @return a new [[Flow]] whose values are created by flattening the flows generated
* by the given function [[f]] applied to each emitted value of this.
*/
def flatMap[R](f: T => Flow[R]): Flow[R] = new Flow[R]:
override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit =
catchFailure(collector):
flow.collect(item => f(item.get).collect(x => collector(Success(x.get))))
Showcasing Flow
s
#
Library use case:
type Name = String
type WriterId = Int
type Writer = (Name, WriterId)
type Book = String
object LibraryService:
private val users: Set[Writer] = Set(("Alice", 987), ("Bob", 123), ("Charlie", 342))
private val books: Map[WriterId, Set[Book]] = Map(
987 -> Set("Alice's Adventures in Wonderland", "Through the Looking-Glass"),
123 -> Set("The Shining"),
342 -> Set("The Raven", "The Tell-Tale Heart"),
)
def allWriters(using Async): Flow[Writer] = Flow:
users.foreach { u =>
sleep(2_000)
it.emit(u)
}
def booksByWriter(writer: WriterId)(using Async): Flow[Book] = Flow:
books(writer).foreach(it.emit)
Flows are cold
@main def useSimple(): Unit = Async.blocking:
val service = LibraryService
val writers = service.allWriters
log(s"Not collecting yet!")
sleep(1_000) // something meaningful
log("Starting collecting users...")
writers.collect(u => log(s"User: $u"))
println("Done")
What we get is something like:
[1709559932492] Not collecting yet!
[1709559933500] Starting collecting users...
[1709559935532] User: Success((Alice,987))
[1709559937536] User: Success((Bob,123))
[1709559939541] User: Success((Charlie,342))
Done
If something goes wrong during the task execution, a Failure
is emitted and the task terminates (no more values are emitted).
For example:
def failingWriters(using Async): Flow[Writer] = Flow:
throw IllegalStateException("Library is closed")
it.emit(users.head)
@main def useFailingFlow(): Unit = Async.blocking:
val service = LibraryService
val writers = service.failingWriters
writers.collect(println)
Results in:
Failure(java.lang.IllegalStateException:
The library is closed)
Flows can be transformed
map
ping Writer
s to their identifier:
@main def useWithMapping(): Unit = Async.blocking:
val service = LibraryService
val writersId = service.allWriters.map(_._2)
writersId.collect(a => println(s"Id: $a"))
Result:
Id: Success(987)
Id: Success(123)
Id: Success(342)
flatMap
ping to get all the books:
@main def useWithFlatMap(): Unit = Async.blocking:
val service = LibraryService
val allBooks = service.allWriters.flatMap(w =>
service.booksByWriter(w._2)
)
allBooks.collect(println)
Result:
Success(Alice's Adventures in Wonderland)
Success(Through the Looking-Glass)
Success(The Shining)
Success(The Raven)
Success(The Tell-Tale Heart)
👉🏻 More tests on Flows
can be found in commons
, pimping
pakcage.
Takeaways #
Previous: Basic asynchronous constructs Next: Reactivity in direct style
Channel
s are the basic communication and synchronization primitive for exchanging data betweenFuture
s/Coroutine
s.
- Scala Gears support for
Terminable
channels or a review of the closing mechanism should be considered.- The
Flow
abstraction in Kotlin Coroutines is a powerful tool for handling cold streams of data, and it is a perfect fit for functions that need to return a stream of asynchronously computed values upon request.
- A similar abstraction can be implemented in Scala Gears leveraging
Task
s andTerminableChannel
s, enabling improved support for an asynchronous flow of data also in Gears, which is currently lacking.