Basic asynchronous constructs #
The need for a new Future construct
#
The current implementation of the Future monadic construct suffers the following main cons:
- Lack of referential transparency;
- Lack of cancellation mechanisms and structured concurrency;
- Accidental Sequentiality.
To show these weaknesses in practice, a simple example of the core of a web service implementation is presented.
Example: a blog posts service #
Idea: develop a very simple (mocked) service that allows retrieving and storing from a repository blog posts, performing checks on the content and author before the actual storage.
The example has been implemented using:
- the continuation style through the current Scala
Futuremonadic constructs; - the direct style, through:
- the abstractions offered by Gears;
- Kotlin coroutines.
The example is organized into Gradle submodules:
blog-ws-commonscontains code that has been reused for both the monadic and direct versions;blog-ws-monadiccontains the monadic Scala style;blog-ws-directcontains the direct version using Scala Gears;blog-ws-direct-ktcontains the direct version using Kotlin Coroutines.
For this example just the tests are provided. You can explore them in the test folders and run via Gradle using the name of the submodule:
./gradlew :blog-ws-<monadic | direct | direct-kt>:test
Structure #
The domain is modeled using abstract data types in a common PostsModel trait:
/** The model of a simple blog posts service. */
trait PostsModel:
/** The post author's identifier. */
type AuthorId
/** The posts title. */
type Title
/** The posts body. */
type Body
/** The content of the post. */
type PostContent = (Title, Body)
/** A post author and their info. */
case class Author(authorId: AuthorId, name: String, surname: String)
/** A blog post, comprising an author, title, body and the last modification. */
case class Post(author: Author, title: Title, body: Body, lastModification: Date)
/** A function that verifies the content of the post, returning a [[scala.util.Success]] with
* the content of the post if the verification succeeds or a [[scala.util.Failure]] otherwise.
*/
type ContentVerifier = (Title, Body) => Try[PostContent]
/** A function that verifies the author has appropriate permissions, returning a
* [[scala.util.Success]] with their information or a [[scala.util.Failure]] otherwise.
*/
type AuthorsVerifier = AuthorId => Try[Author]
To implement the service two components have been conceived, following the Cake Pattern:
PostsRepositoryComponent- exposes the
Repositorytrait allowing to store and retrieve blog posts; - mocks a DB technology with an in-memory collection.
- exposes the
PostsServiceComponent- is the component exposing the
Serviceinterface. - it could be called by the controller of the ReSTful web service.
- is the component exposing the
Both must be designed in an async way.
Current monadic Future
#
The interface of the repository and services component of the monadic version are presented hereafter and their complete implementation is available here.
/** The component exposing blog posts repositories. */
trait PostsRepositoryComponent:
context: PostsModel =>
/** The repository instance. */
val repository: PostsRepository
/** The repository in charge of storing and retrieving blog posts. */
trait PostsRepository:
/** Save the given [[post]]. */
def save(post: Post)(using ExecutionContext): Future[Post]
/** @return a [[Future]] completed with true if a post exists with
* the given title, false otherwise. */
def exists(postTitle: Title)(using ExecutionContext): Future[Boolean]
/** @return a [[Future]] completed either with a defined optional
* post with given [[postTitle]] or an empty one. */
def load(postTitle: Title)(using ExecutionContext): Future[Option[Post]]
/** Load the post with the given [[postTitle]]. */
def loadAll()(using ExecutionContext): Future[LazyList[Post]]
/** The component blog posts service. */
trait PostsServiceComponent:
context: PostsRepositoryComponent & PostsModel =>
/** The blog post service instance. */
val service: PostsService
/** The service exposing a set of functionalities to interact with blog posts. */
trait PostsService:
/** Creates a new blog post with the given [[title]] and [[body]], authored by [[authorId]]. */
def create(authorId: AuthorId, title: Title, body: Body)(using ExecutionContext): Future[Post]
/** Get a post from its [[title]]. */
def get(title: Title)(using ExecutionContext): Future[Option[Post]]
/** Gets all the stored blog posts in a lazy manner. */
def all()(using ExecutionContext): Future[LazyList[Post]]
All the exposed functions, since they are asynchronous, return an instance of Future[T] and require to be called in a scope where a given instance of the ExecutionContext is declared.
What’s important to delve into is the implementation of the service, and, more precisely, of the create method. As already mentioned, before saving the post two checks need to be performed:
- the post author must have permission to publish a post and their information needs to be retrieved (supposing they are managed by another service);
- the content of the post is analyzed in order to prevent the storage and publication of inappropriate content.
Since these operations are independent from each other they can be spawned and run in parallel.
override def create(authorId: AuthorId, title: Title, body: Body)(using ExecutionContext): Future[Post] =
for
exists <- context.repository.exists(title)
if !exists
post <- save(authorId, title, body)
yield post
private def save(authorId: AuthorId, title: Title, body: Body)(using ExecutionContext): Future[Post] =
val authorAsync = authorBy(authorId)
val contentAsync = verifyContent(title, body)
for
content <- contentAsync
author <- authorAsync
post = Post(author, content._1, content._2, Date())
_ <- context.repository.save(post)
yield post
/* Pretending to make a call to the Authorship Service that keeps track of authorized authors. */
private def authorBy(id: AuthorId)(using ExecutionContext): Future[Author] = ???
/* Some local computation that verifies the content of the post is appropriate. */
private def verifyContent(title: Title, body: Body)(using ExecutionContext): Future[PostContent] = ???
This implementation shows the limits of the current monadic Future mechanism:
-
if we want to achieve the serialization of futures execution we need to compose them using the
flatMap, like in thecreatefunction: first, the check on the post existence is performed, and only if it is successful and another post with same title doesn’t exist thesavefunction is started-
as a consequence, if we want two futures to run in parallel we have to spawn them before the
for-yield, as in thesavefunction. This is error-prone and could lead to unexpected sequentiality, like this:// THIS IS WRONG: the two futures are started sequentially! for content <- verifyContent(title, body) author <- authorBy(authorId) post = Post(author, content._1, content._2, Date()) _ <- context.repository.save(post) yield post
-
-
since the publication of a post can be performed only if both of these checks succeed, it is desirable that, whenever one of the two fails, the other gets canceled. Unfortunately, currently, Scala Futures are not cancellable and provide no structured concurrency mechanism.
-
moreover, they lack referential transparency, i.e. future starts running when they are defined. This means that passing a reference to a future is not the same as passing the referenced expression.
Direct style: Scala version with gears
#
The API of the gears library is presented hereafter and is built on top of four main abstractions, three of which are here presented (the fourth in the next example):
Asynccontext is “a capability that allows a computation to suspend while waiting for the result of an async source”. Code that has access to an instance of theAsynctrait is said to be in an async context and can suspend its execution. Usually, it is provided viagiveninstance:def suspendingFunction(using Async): IntAsync.blockingcreates an Async context blocking the current thread for suspension and it is good practice to use it only in the main function of an application (or in test suites).@main def launcher(): Unit = Async.blocking: // inside this scope the `Async` capability is provided // (hence we can suspend and call suspendable functions!)
Async.Sourcemodel an asynchronous source of data that can be polled or awaited by suspending the computation, as well as composed using combinator functions.Futures are the primary (in fact, the only) active elements that encapsulate a control flow that, eventually, will deliver a result (either a computed or a failure value that contains an exception). To be spawned anAsync.Spawncapability is required, which is provided by theAsync.groupmethod.def suspendingFunction(using Async): Int = Async.group: // needs the `Async` capability! // here the `Async.Spawn` capability is available, hence we can spawn Futures... Future: // ...- Since
Futures areAsync.Sources can be awaited and combined with otherFutures, suspending their execution. Tasks are the abstraction created to create delayedFutures, responding to the lack of referential transparency problem. They take the body of aFutureas an argument; itsrunmethod converts that body to aFuture, starting its execution.Promises allow us to defineFuture’s result value externally, instead of executing a specific body of code.
- Since
classDiagram class Async { << trait >> +group: CompletionGroup +withGroup(group: CompletionGroup) Async +await[T](src: Async.Source[T]) T +current() Async$ +blocking[T](body: Async ?=> T) T$ +group[T](body: Async ?=> T) T$ } class `Async.Spawn` { << type >> } Async <|-- `Async.Spawn` class `Async.Source[+T]` { << trait >> +poll(k: Listener[T]) Boolean +poll() Option[T] +onComplete(k: Listener[T]) +dropListener(k: Listener[T]) +awaitResult()(using Async) T } Async *--> `Async.Source[+T]` class OriginalSource { << abstract class >> } `Async.Source[+T]` <|-- OriginalSource class `Listener[-T]` { << trait >> +lock: Listener.ListenerLock | Null +complete(data: T, source: Async.Source[T]) +completeNow(data: T, source: Async.Source[T]) Boolean +apply[T](consumer: (T, Source[T]) => Unit) Listener[T]$ } `Async.Source[+T]` *--> `Listener[-T]` class `Future[+T]` { << trait >> +apply[T](body: Async.Spawn ?=> T)(using Async, Async.Spawn) Future[T]$ +now[T](result: Try[T]) Future[T] +zip[U](f2: Future[U]) Future[T, U] +or(f2: Future[T]) Future[T] +orWithCancel(f2: Future[T]) Future[T] } class `Promise[+T]` { << trait >> +apply() Promise[T]$ +asFuture Future[T] +complete(result: Try[T]) } OriginalSource <|-- `Future[+T]` `Future[+T]` <|-- `Promise[+T]` class `Task[+T]` { +apply(body: (Async, AsyncOperations) ?=> T) Task[T]$ +start(using Async, Async.Spawn, AsyncOperations) Future[+T] } `Future[+T]` <--* `Task[+T]` class Cancellable { << trait >> +group: CompletionGroup +cancel() +link(group: CompletionGroup) +unlink() } Cancellable <|-- `Future[+T]` class Tracking { << trait >> +isCancelled Boolean } Cancellable <|-- Tracking class CompletionGroup { +add(member: Cancellable) +drop(member: Cancellable) } Tracking <|-- CompletionGroup Async *--> CompletionGroup
Going back to our example, the interface of both the repository and service components becomes (here you can find the complete sources):
/** The component exposing blog posts repositories. */
trait PostsRepositoryComponent:
context: PostsModel =>
/** The repository instance. */
val repository: PostsRepository
/** The repository in charge of storing and retrieving blog posts. */
trait PostsRepository:
/** Save the given [[post]]. */
def save(post: Post)(using Async, CanFail): Post
/** Return true if a post exists with the given title, false otherwise. */
def exists(postTitle: Title)(using Async, CanFail): Boolean
/** Load the post with the given [[postTitle]]. */
def load(postTitle: Title)(using Async, CanFail): Option[Post]
/** Load all the saved post. */
def loadAll()(using Async, CanFail): LazyList[Post]
/** The blog posts service component. */
trait PostsServiceComponent:
context: PostsRepositoryComponent & PostsModel =>
/** The blog post service instance. */
val service: PostsService
/** The service exposing a set of functionalities to interact with blog posts. */
trait PostsService:
/** Creates a new blog post with the given [[title]] and [[body]], authored by [[authorId]],
* or a string explaining the reason of the failure.
*/
def create(authorId: AuthorId, title: Title, body: Body)(using Async, CanFail): Post
/** Get a post from its [[title]] or a string explaining the reason of the failure. */
def get(title: Title)(using Async, CanFail): Option[Post]
/** Gets all the stored blog posts in a lazy manner or a string explaining the reason of the failure. */
def all()(using Async, CanFail): LazyList[Post]
As you can see, Futures are gone and the return type it’s just the result of their intent.
The fact they are suspendable is expressed using the Async context, which is required to invoke those functions.
Since all these functions could fail (for example, because of a problem with the DB connection), the CanFail capability is used to model the effect of failure (as described in previous chapter).
Key inspiring principle (actually, taken by Kotlin)
❝Concurrency is hard! Concurrency has to be explicit!❞
By default the code is serial. If you want to opt-in concurrency you have to explicitly use a Future or Task spawning a new control flow that executes asynchronously, allowing the caller to continue its execution.
The other important key feature of the library is the support for structured concurrency and cancellation mechanisms:
-
Futures areCancellableinstances;-
When you cancel a future using the
cancel()method, it promptly sets its value toFailure(CancellationException). Additionally, if it’s a runnable future, the thread associated with it is interrupted usingThread.interrupt(). -
to avoid immediate cancellation, deferring the cancellation after some block, is possible using
uninterruptiblefunction:val f = Future: // this can be interrupted uninterruptible: // this cannot be interrupted *immediately* // this can be interrupted
-
-
Futures are nestable; the lifetime of nested computations is contained within the lifetime of enclosing ones. This is achieved usingCompletionGroups, which are cancellable objects themselves and serve as containers for other cancellable objects; once they are canceled, all of their members are canceled as well. EveryAsynccontext has a completion group tracking all computations in a tree structure, like the following:When a group terminates all its dangling children are canceled!
- The group is accessible through
Async.current.group; Async.blocking,Async.groupandFuturecreate a new completion group;- A cancellable object can be included inside the cancellation group of the async context using the
linkmethod; this is what the implementation of theFuturedoes, under the hood; - to make sure children’s computations are not canceled we need to await them.
- The group is accessible through
The implementation of the create function with direct style in Gears looks like this:
override def create(authorId: AuthorId, title: Title, body: Body)(using Async, CanFail): Post =
if context.repository.exists(title) then fail(s"A post entitled $title already exists")
val (post, author) = Async.group:
val content = Future(verifyContent(title, body))
val author = Future(authorBy(authorId))
content.zip(author).awaitResult.?
context.repository.save(Post(author, post._1, post._2, Date()))
/* Pretending to make a call to the Authorship Service that keeps track of authorized authors. */
private def authorBy(id: AuthorId)(using Async): Author = ...
/* Some local computation that verifies the content of the post is appropriate. */
private def verifyContent(title: Title, body: Body)(using Async): PostContent = ...
Some remarks:
- the
CanFailcapability is used to quickly break the computation with a meaningful message in case of failures; authorByandverifyContentare suspendible functions, encapsulating the logic of the two checks;- Thanks to structured concurrency and
zipcombinator we can achieve that if one of the nested two futures fails the other check is cancelled:zip: combinator function returning a pair with the results if bothFutures succeed, otherwise fail with the failure that was returned first;- in case of failure of one of the two futures, the
zipreturns immediately the control:awaitResultwould return aFailure(...)and with.?we break prematurely the computation, leaving theAsync.group, thus canceling all danglingFuture!authorByandverifyContentneeds to be programmed to throw an exception in case of failure. This is needed to makeFuturefail: without fail with an exception, thezipoperator is not able to return immediately the control and the other future is not canceled!
- Be aware of the fact to achieve cancellation is necessary to enclose both the content verification and authorization task inside a completion group (either using
Async.grouporFuture), since thezipdoesn’t provide a cancellation mechanism per se. The following code wouldn’t work as expected!// WRONG: doesn't provide cancellation! val contentVerifier = verifyContent(title, body).run val authorizer = authorBy(authorId).run val (post, author) = contentVerifier.zip(authorizer).awaitResult.?
👉🏻 To showcase the structured concurrency and cancellation mechanisms of Scala Gears tests have been prepared:
Other combinator methods, available on Futures instance:
| Combinator | Goal |
|---|---|
Future[T].zip(Future[U]) |
Parallel composition of two futures. If both futures succeed, succeed with their values in a pair. Otherwise, fail with the failure that was returned first |
Future[T].or(Future[T]) / Seq[Future[T]].awaitFirst |
Alternative parallel composition. If either task succeeds, succeed with the success that was returned first. Otherwise, fail with the failure that was returned last (race all futures). |
Future[T].orWithCancel(Future[T]) / Seq[Future[T]].awaitFirstWithCancel |
Like or/awaitFirst but the slower futures are cancelled. |
Seq[Future[T]].awaitAll |
.await for all futures in the sequence, returns the results in a sequence, or throws if any futures fail. |
Seq[Future[T]].awaitAllOrCancel |
Like awaitAll, but cancels all futures as soon as one of them fails. |
Direct style vs. monadic style comparison w.r.t. composition #
Direct style cleanly supports composability:
def transform[E, T](xs: Seq[Future[Either[E, T]]])(using Async.Spawn): Future[Either[E, Seq[T]]] =
Future:
either:
xs.map(_.await.?)
Using monads is more complex to achieve the same goal:
def transform[E, T](
xs: Seq[Future[Either[E, T]]],
)(using ExecutionContext): Future[Either[E, Seq[T]]] =
val initial: Future[Either[E, List[T]]] = Future.successful(Right(List.empty[T]))
xs.foldRight(initial): (future, acc) =>
for
f <- future
a <- acc
yield a.flatMap(lst => f.map(_ :: lst))
Again, using Cats simplifies the code, still, it’s more complex than the direct style:
def transform[E, T](
xs: Seq[Future[Either[E, T]]],
)(using ExecutionContext): Future[Either[E, Seq[T]]] =
import cats.implicits._
Future.sequence(xs) // Future[Seq[Either[E, T]]
.map(_.sequence) // equivalent to: _.traverse(identity)
Kotlin Coroutines #
-
A coroutine in Kotlin is an instance of a suspendable computation.
-
Their API is quite similar to the Scala Gears, which has taken inspiration from Kotlin coroutines. To try to make a comparison, the following table shows the correspondence between the two libraries:
Scala Gears Kotlin Coroutines AsyncCoroutineScopeFuture[Unit]JobFuture[T]Deferred<T>def all()(using Async)suspend fun all()-
In the Kotlin Coroutine library,
CoroutineScopeis an interface that defines a single property,coroutineContext, which returns theCoroutineContextthat defines the scope in which the coroutine runs.public interface CoroutineScope { /** Returns the context of this scope. */ public val coroutineContext: CoroutineContext }Every coroutine must be executed in a coroutine context, which is a collection of key-value pairs that provide contextual information for the coroutine, including a dispatcher, that determines what thread or threads the coroutine uses for its execution, and the
Jobof the coroutine, which represents a cancellable background piece of work with a life cycle that culminates in its completion.-
Different ways to create a scope:
GlobalScope.launchlaunching a new coroutine in the global scope – discouraged because it can lead to memory leaks;CoroutineScope(Dispatchers.Default), using the constructor with a dispatcher;runBlocking- equivalent to the GearsAsync.blocking- provides a way to run a coroutine in theMainScope, i.e. on the main/UI thread.
-
Useful dispatchers:
Default dispatcher: to run CPU-intensive functions. If we forget to choose our dispatcher, this dispatcher will be selected by default;IO dispatcher: to run I-O bound computation, where we block waiting for input-output operations to complete, like network-related operations, file operations, etc.;Unconfined dispatcher: it isn’t restricted to a particular thread, i.e. doesn’t change the thread of the coroutine, it operates on the same thread where it was initiated;Main dispatcher: used when we want to interact with the UI. It is restricted to the main thread.
-
Several coroutine builders exist, like
launch,async,withContextwhich accept an optionalCoroutineContextparameter that can be used to specify the dispatcher and other context elements.
fun main(): Unit = runBlocking { // this: CoroutineScope val job: Job = this.launch(Dispatchers.IO) { // launch a new coroutine and continue delay(1000L) // non-blocking delay for 1 second (default time unit is ms) print("Kotlin Coroutine!") // print after delay } val job2: Deferred<String> = async { delay(100L) "Hello" } print(job2.await() + " ") // wait until child coroutine completes job.join() // wait until the job is done and "Kotlin Coroutine!" is printed } -
-
suspending functions are marked with the
suspendkeyword; they can use other suspending functions to suspend the execution of a coroutine.
-
-
Coroutines follow the principle of structured concurrency: coroutines can be arranged into parent-child hierarchies where the cancellation of a parent leads to the immediate cancellation of all its children recursively. Failure of a child with an exception immediately cancels its parent and, consequently, all its other children.
Going back to our example, the interface of the service with Kotlin coroutines looks like this (here you can find the complete sources):
/** The service exposing a set of functionalities to interact with blog posts. */
interface PostsService {
/** Creates a new post. */
suspend fun create(authorId: String, title: String, body: String): Result<Post>
/** Retrieves a post by its title. */
suspend fun get(title: String): Result<Post>
/** Retrieves all the posts. */
suspend fun getAll(): Result<Sequence<Post>>
}
The implementation of the create function:
override suspend fun create(authorId: String, title: String, body: String): Result<Post> = runCatching {
coroutineScope {
require(!repository.exists(title)) { "Post with title $title already exists" }
val content = async { verifyContent(title, body) }
val author = async { authorBy(authorId) }
val post = Post(author.await(), content.await(), Date())
repository.save(post)
}
}
/* Pretending to make a call to the Authorship Service that keeps track of authorized authors. */
private suspend fun authorBy(id: String): Author { ... }
/* Some local computation that verifies the content of the post is appropriate. */
private suspend fun verifyContent(title: String, body: String): PostContent { ... }
- a
coroutineScopeis a suspending function used to create a new coroutine scope: it suspends the execution of the current coroutine, releasing the underlying thread for other usages; - As we said previously, the failure of a child with an exception immediately cancels its parent and, consequently, all its other children: this means that, for handling the cancellation of nested coroutines, we don’t need to do anything special
- with
coroutineScopeno matter the order in which coroutines are awaited, if one of them fails with an exception it is propagated upwards, cancelling all other ones- this is not the case for
supervisorScope, a coroutine builder ensuring that child coroutines can fail independently without affecting the parent coroutine. - have a look to this test
- this is not the case for
- This is an advantage over the Scala Gears, where operators like
zipandaltWithCancelare necessary!
- with
Takeaways #
Previous: boundary & break Next: Channels as a communication primitive
- Scala Gears offers, despite the syntactical differences, very similar concepts to Kotlin Coroutines, with structured concurrency and cancellation mechanisms;
- Kotlin Coroutines handles the cancellation of nested coroutines more easily than Scala Gears, where special attention is required;
- As stated by M. Odersky the
Asynccapability is better thansuspendbecause let defines functions that work for synchronous as well as asynchronous function arguments without changing anything, while in Kotlin suspendable functions passed as an argument in higher-order functions must be tagged withsuspendkeyword.