Reactivity in direct style #
So far, we’ve explored the basics of asynchronous abstraction mechanisms provided by the direct style of the Scala Gears and Kotlin Coroutines frameworks. The goal of this last example is to investigate, using a simple example, whether these two frameworks offer sufficient idiomatic abstractions to deal with event-based reactive systems.
Smart Hub example #
Idea: in an IoT context, a multitude of sensors of different types, each replicated to ensure accuracy, transmit their measurements to a central hub, which in turn needs to react, in real-time, forwarding to the appropriate controller the data, possibly performing some kind of transformation.
Scala Gears version #
Before delving into the example, two abstractions of Gears, yet not covered, are introduced:
Tasks provide a way, not only to run asynchronous computation, essentially wrapping a() => Future[T], but also to schedule it, possibly repeating it. Different scheduling strategies are available:Every,ExponentialBackoff,FibonacciBackoff,RepeatUntilFailure,RepeatUntilSuccess.- This allows for implementing easily proactive computations
classDiagram class `Task[+T]` { +apply(body: (Async, AsyncOperations) ?=> T) Task[T]$ +start(using Async, Async.Spawn, AsyncOperations) Future[+T] +schedule(s: TaskSchedule) Task[T] } `Task[+T]` o--> TaskSchedule class TaskSchedule { << enum >> + Every(millis: Long, maxRepetitions: Long = 0) + ExponentialBackoff(millis: Long, exponentialBase: Int = 2, maxRepetitions: Long = 0) + FibonacciBackoff(millis: Long, maxRepetitions: Long = 0) + RepeatUntilFailure(millis: Long = 0, maxRepetitions: Long = 0) + RepeatUntilSuccess(millis: Long = 0, maxRepetitions: Long = 0) }
Warning: when
Tasks are scheduled withRepeatUntil*:
- if the body of a
Taskdoes not perform any suspending operations theAsync.blockingblocks the current thread until the task is completed (either successfully or not);- if the body of a
Taskdoes perform suspending operations then theAsync.blockingdoes not wait for the task to complete and its context is left as soon as reaches its end.
- If we want to wait for the task completion, it’s the client’s responsibility to explicitly
Async.await(orawaitResult)- Cons: depending on the content of the block, the behavior is different! This is error-prone!
Follows some considerations (to be tested with the new version 0.2.0 and the introduction of Async.Spawn capability):
Warning: with high-order functions if we deal with repeatedTasks, in some cases anAsync ?=>label is required to not suspend the whole block, even if a suspending operation is performed: the code below behaves differently if theAsync ?=>label is present or not. Note: this may be an unintended effect of the library, yet to be investigated (sometimes on Ubuntu it doesn’t work, suggesting to be a “bug”, see here vs. here)
In this case despite we suspend to wait for the timer tick, the Async.blocking blocks until the Task is completed.
Async.blocking:
val timer = Timer(2.seconds)
Future(timer.run())
produce { _ =>
timer.src.awaitResult // SUSPENDING OPERATION!
// ...
}
def produce[T](
action: SendableChannel[T] => Try[Unit]
)(using Async): ReadableChannel[T] =
val channel = UnboundedChannel[T]()
Task {
action(channel.asSendable)
}.schedule(RepeatUntilFailure()).run
channel.asReadable
With the Async ?=> label, the Async.blocking does not wait for the Task to complete!
Async.blocking:
val timer = Timer(2.seconds)
Future(timer.run())
produceWithLabel { _ =>
timer.src.awaitResult // SUSPENDING OPERATION!
// ....
}
def produceWithLabel[T](
action: Async ?=> SendableChannel[T] => Try[Unit]
)(using Async): ReadableChannel[T] =
val channel = UnboundedChannel[T]()
Task {
action(channel.asSendable)
}.schedule(RepeatUntilFailure()).run
channel.asReadable
[See the tests for more details.]
- To avoid the work-stealing behavior of channel consumers, a
ChannelMultiplexercan be used. It is essentially a container ofReadableandSendablechannels, which can be added and removed at runtime. Internally, it is implemented with a thread that continuously races the set of publishers and once it reads a value, it forwards it to each subscriber channel.- Order is guaranteed only per producer;
- Typically, the consumer creates a channel and adds it to the multiplexer, then starts reading from it, possibly using a scheduled task.
- if the consumer attaches to the channel after the producer has started, the values sent during this interval are lost, like hot observables in Rx.
classDiagram namespace javaio { class Closeable { << interface >> +close() } } class `ChannelMultiplexer[T]` { << trait >> +run()(using Async) +addPublisher(c: ReadableChannel[T]) +removePublisher(c: ReadableChannel[T]) +addSubscriber(c: SendableChannel[Try[T]]) +removeSubscriber(c: SendableChannel[Try[T]]) } Closeable <|-- `ChannelMultiplexer[T]`
In the proposed strawman Scala Gears library, there are no other kinds of abstractions, nor a way to manipulate channels with functions inspired by Rx.
The attempt, described in the following, has been to extend this framework adding first-class support for Producer and Consumer’s concepts and implementing some of the most common Rx operators, completely leaving out performance concerns.
[Sources can be found in the rears submodule.].
- A
Produceris a runnable entity, programmed with aTask, producing items on a channel. It exposes thepublishingChannelmethod, which returns aReadableChannelthrough which interested consumers can read produced items. - A
Consumeris a runnable entity devoted to consuming data from a channel, exposed by thelisteningChannelmethod which returns aSendableChannelto send items to.- It can be made stateful by mixing it with the
Statetrait, allowing it to keep track of its state, which is updated every time with the result of thereaction (i.e. its return type). - Warning Like in an event-loop, the
reaction logic should not perform long-lasting blocking operation, otherwise, the whole system will not react to new events: theAsynccapability is though needed if you want to give the client the ability to invokeFutures within this block; otherwise, another option (alternative to the following) would be to encapsulate the reaction behavior within aTaskand run it at every received data. However, race conditions could take place in this last case.
- It can be made stateful by mixing it with the
/** A producer, i.e. a runnable entity producing items on a channel. */
trait Producer[E]:
/** The [[Channel]] where specific [[Producer]]s send items to. */
protected val channel: Channel[E] = UnboundedChannel()
/** @return the publisher's behavior encoded as a runnable [[Task]]. */
def asRunnable: Task[Unit]
/** @return the [[ReadableChannel]] where produced items are placed. */
def publishingChannel: ReadableChannel[E] = channel.asReadable
/** A consumer, i.e. a runnable entity devoted to consume data from a channel. */
trait Consumer[E, S]:
/** The [[SendableChannel]] to send items to, where the consumer listen for new items. */
val listeningChannel: SendableChannel[Try[E]] = UnboundedChannel()
/** @return a runnable [[Task]]. */
def asRunnable(using Async.Spawn): Task[Unit] = Task:
listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach(react)
.schedule(RepeatUntilFailure())
/** The suspendable reaction triggered upon a new read of an item succeeds. */
protected def react(e: Try[E])(using Async.Spawn): S
/** A mixin to turn consumer stateful. Its state is updated with the result of the [[react]]ion.
* Initially its state is set to [[initialValue]].
*/
trait State[E, S](initialValue: S):
consumer: Consumer[E, S] =>
private var _state: S = initialValue
/** @return the current state of the consumer. */
def state: S = synchronized(_state)
override def asRunnable(using Async.Spawn): Task[Unit] = Task:
listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach: e =>
synchronized:
_state = react(e)
.schedule(RepeatUntilFailure())
- The
Controllerobject exposes methods wiringProducerandConsumers altogether, possibly performing some kind of transformation on thepublisherChannel.- the
oneToOnemethod just wires one single consumer to thepublisherChannelgiven in input, possibly having it transformed with the provided transformation. - the
oneToManyallows many consumers to be wired to thepublisherChannel, possibly having it transformed.- to accomplish this, a
ChannelMultiplexeris used, which is in charge of forwarding the items read from the transformedpublisherChannelto all consumers’ channels.
- to accomplish this, a
- the
object Controller:
/** Creates a runnable [[Task]] forwarding the items read from the [[publisherChannel]]
* to the given [[consumer]], after having it transformed with the given [[transformation]].
*/
def oneToOne[T, R](
publisherChannel: ReadableChannel[T],
consumer: Consumer[R, ?],
transformation: PipelineTransformation[T, R] = identity,
): Task[Unit] =
val transformedChannel = transformation(publisherChannel)
Task:
consumer.listeningChannel.send(transformedChannel.read())
.schedule(RepeatUntilFailure())
/** Creates a runnable [[Task]] forwarding the items read from the [[publisherChannel]] to
* all consumers' channels, after having it transformed with the given [[transformation]].
*/
def oneToMany[T, R](
publisherChannel: ReadableChannel[T],
consumers: Set[Consumer[R, ?]],
transformation: PipelineTransformation[T, R] = identity,
): Task[Unit] = Task:
val multiplexer = ChannelMultiplexer[R]()
consumers.foreach(c => multiplexer.addSubscriber(c.listeningChannel))
multiplexer.addPublisher(transformation(publisherChannel))
// blocking call: the virtual thread on top of which this task is executed needs to block
// to continue publishing publisher's events towards the consumer by means of the multiplexer.
multiplexer.run()
The following PipelineTransformations have been implemented (inspired by Rx). Tests in rears submodule provide the necessary examples to understand their behavior.
Filter #
/** @return a new [[ReadableChannel]] whose elements passes the given predicate [[p]]. */
def filter(p: T => Boolean): ReadableChannel[T]
Example:
----1---2-------3----4---5--6----7---8---9---10--->
| | | | | | | | | |
----V---V-------V----V---V--V----V---V---V---V-----
filter(_ % 2 == 0)
--------|--------------|------|-------|---------|--
V V V V V
--------2--------------4------6-------8--------10->
Map #
/** @return a new [[ReadableChannel]] whose values are transformed accordingly to the given function [[f]]. */
def map[R](f: T => R): ReadableChannel[R]
Example:
----1---2-------3----4---5------6--------7-------->
| | | | | | |
----V---V-------V----V---V------V--------V---------
map(x => x * x)
----|---|-------|----|---|------|--------|---------
V V V V V V V
----1---4-------9----16--25-----36-------49------->
Debounce #
/** @return a new [[ReadableChannel]] whose elements are emitted only after
* the given [[timespan]] has elapsed since the last emission. */
def debounce(timespan: Duration): ReadableChannel[T]
Example:
----1---2-------3----4---5--6-----7---8---9---10-->
| | | | | | | | | |
V V V V V V V V V V
T----------T----------T----------T----------T------
debounce(1 second)
---------------------------------------------------
| | | | |
V V V V V
-------1---------3---------5------7------------10->
GroupBy #
/** Groups the items emitted by a [[ReadableChannel]] according to the given [[keySelector]].
* @return key-value pairs, where the keys are the set of results obtained from applying the
* [[keySelector]] coupled to a new [[ReadableChannel]] where only items belonging to
* that grouping are emitted.
*/
def groupBy[K](keySelector: T => K): ReadableChannel[(K, ReadableChannel[T])]
Example:
----1---2-3--4---5--6--->
| | | | | |
V V V V V V
-------------------------
groupBy(_ % 2)
-------------------------
\ \
----false--true------------>
1 2
\ \
\ 4
3 \
\ \
5 6
Buffer #
/** @return a new [[ReadableChannel]] whose elements are buffered in a [[List]] of size [[n]].
* If [[timespan]] duration is elapsed since last read the list is emitted
* with collected elements until that moment (default: 5 seconds).
*/
def buffer(n: Int, timespan: Duration = 5 seconds): ReadableChannel[List[T]]
Example:
----1---2-3----4---5--6----7---8-------->
| | | | | | | |
V V V V V V V V
|---------|-----------|------------T-----
buffer(n = 3, timespan = 5 seconds)
|---------|-----------|------------|-----
V V V
------[1, 2, 3]---[4, 5, 6]------[7, 8]->
BufferWithin #
/** @return a new [[ReadableChannel]] whose elements are buffered in a [[List]] of items
* if emitted within [[timespan]] duration after the first one (default: 5 seconds).
*/
def bufferWithin(timespan: Duration = 5 seconds): ReadableChannel[List[T]]
Example:
----1---2-3-4---5--6--7----------8----------->
| | | | | | | |
V V V V V V V V
----|--------T--|--------T-------|--------T---
buffer(timespan = 5 seconds)
-------------|-----------|----------------|---
V V V
-------[1, 2, 3, 4]--[5, 6, 7]-----------[8]->
Going back to the example here is presented a schema summarizing the flows of data and the transformations to apply to them. This is just a simple example used to test the proposed abstractions.
[Sources are available in smart-hub-direct submodule].
-
For simplicity, two types of sensors are considered:
TemperatureSensorandLuminositySensor; -
sensors send data to the smart hub
SensorSource(e.g., in a real case scenario, via MQTT)-
SensorSourceis aProducer[SensorEvent], publishing received data on itspublishingChannel:trait SensorSource extends Producer[SensorEvent] sealed trait SensorEvent(val name: String) case class TemperatureEntry(sensorName: String, temperature: Temperature) extends SensorEvent(sensorName) case class LuminosityEntry(sensorName: String, luminosity: Temperature) extends SensorEvent(sensorName)
-
-
three main controllers:
-
SensorHealthCheckeris a stateful consumer of genericSensorEvents that checks the health of the sensors, sending alerts in case of malfunctioning. Here the state is necessary to determine the health of the sensors, based on the last detection./** A [[state]]ful consumer of [[SensorEvent]] detecting possible * malfunctioning and keeping track of last known active sensing units. */ trait SensorHealthChecker extends Consumer[Seq[E], Seq[E]] with State[Seq[E], Seq[E]] -
The
Thermostatis a stateful consumer of temperature entries, taking care of controlling the heating system. The fact the thermostat keeps track of the last average detection could be useful to a ReSTful API, for example./** A [[state]]ful consumer of [[TemperatureEntry]]s in charge of controlling * the heater and keeping track of the last detected average temperature. */ trait Thermostat extends Consumer[Seq[TemperatureEntry], Option[Temperature]] with State[Seq[TemperatureEntry], Option[Temperature]]: val scheduler: T -
LightingSystemis a basic consumer (non-stateful) of luminosity entries, taking care of controlling the lighting system./** A consumer of [[LuminosityEntry]], in charge of controlling the lighting system. */ trait LightingSystem extends Consumer[Seq[LuminosityEntry], Unit]
Each of these controllers reacts to the data received based on their logic and their actual state to accomplish a specific task. For example:
-
the sensor checker sends alerts whether, compared with the previous detection, it did not receive data from some sensors:
override protected def react(e: Try[Seq[E]])(using Async.Spawn): Seq[E] = e match case Success(current) => val noMoreActive = state.map(_.name).toSet -- current.map(_.name).toSet if noMoreActive.nonEmpty then sendAlert(s"[$currentTime] ${noMoreActive.mkString(", ")} no more active!") current case Failure(es) => sendAlert(es.getMessage); Seq() -
the thermostat computes the average temperature and, based on a scheduler, decides whether to turn on or off the heating system:
override protected def react(e: Try[Seq[TemperatureEntry]])(using Async.Spawn): Option[Temperature] = for averageTemperature <- e.map { entries => entries.map(_.temperature).sum / entries.size }.toOption _ = averageTemperature.evaluate() yield averageTemperature
-
-
The
HubManagertakes care of grouping sensor data by their type and forwarding them to the appropriate manager, eitherThermostatManagerorLightingManager:val channelBySensor = sensorsSource.publishingChannel.groupBy(_.getClass) Task: channelBySensor.read() match case Right((clazz, c)) if clazz == classOf[TemperatureEntry] => thermostatManager.run(c.asInstanceOf[ReadableChannel[TemperatureEntry]]) case Right((clazz, c)) if clazz == classOf[LuminosityEntry] => lightingManager.run(c.asInstanceOf[ReadableChannel[LuminosityEntry]]) case _ => () .schedule(RepeatUntilFailure()).start() sensorsSource.asRunnable.start().await -
Both
ThermostatManagerandLightingManagerare in charge of creating the appropriateControllerinstance, based on the number ofConsumers and pipeline transformation we need to implement:// ThermostatManager def run(source: ReadableChannel[TemperatureEntry])(using Async.Spawn, AsyncOperations): Unit = thermostat.asRunnable.start() sensorHealthChecker.asRunnable.start() Controller.oneToMany( publisherChannel = source, consumers = Set(thermostat, sensorHealthChecker), transformation = _.bufferWithin(samplingWindow), ).start()
To produce a testable version of this example, a simulated source of sensor data has been created, backed to a GUI, through which the user can simulate the behavior of the sensors. The example is runnable via:
./gradlew smart-hub-<direct | direct-kt>:run
Three panels should pop up, one for each sensor type, and a dashboard showing the state of the system. Entering some value in the panels and pressing the “Send” button, after 10 seconds (the configured sampling window), the system should react to the data received, updating the dashboard with the new state.
Kotlin Coroutines version #
Kotlin Coroutines offers two other abstractions to deal with asynchronous data streams, belonging to the flow “family”, which are: SharedFlow and StateFlow.
Despite their names including flow, which we’ve seen are cold streams, they are actually hot (the terminology is a bit misleading…):
SharedFlowis a hot flow that allows for multiple collectors to subscribe to it, enabling the broadcasting of values to multiple consumers or having multiple consumers be “attached” to the same stream of data.- they can be configured to buffer a certain number of previously emitted values for new collectors so that they can catch up with the latest values – the so-called,
replaycache;
- they can be configured to buffer a certain number of previously emitted values for new collectors so that they can catch up with the latest values – the so-called,
StateFlowis an extension of theSharedFlow: it is a hot flow that maintains a single value representing a state, holding one value at a time. It operates as a conflated flow, meaning that when a new value is emitted, it replaces the previous value and is immediately sent to new collectors- this type of flow is beneficial for maintaining a single source of truth for a state and automatically updating all collectors with the latest state (for example in ViewModels in Android applications)
In our example, SharedFlow is used to model the flow of sensor data:
interface SensorSource<out E : SensorEvent> {
/** The flow of sensor events. */
val sensorEvents: SharedFlow<E>
}
Like all flows, they have all the kinds of operators presented in the previous example. Despite this, they do not support, at the moment, all the operators that Rx offers, like groupBy, buffer (in the Rx conception), etc… (even if some proposals are pending to add them in the framework).
For this reason, the consumer of events has been implemented manually, using a loop that, every samplingWindow time, reacts to the data received, updating the state of the system.
By the way, if this solution appears to be less elegant, since Flows are, de facto, the porting of Rx’s Observable's into the Coroutines world, libraries exists to convert them to Observable and vice versa.
This could offer (in some cases and where necessary) a way to use the full power of Rx operator, if needed.
/** A consumer of sensor events. */
interface SensorSourceConsumer<in E : SensorEvent, out S> {
/** The current state of the source consumer. */
val state: S
/** Reacts to a sensor event. */
suspend fun react(e: E)
}
/** A scheduled consumer of sensor events. */
interface ScheduledConsumer<in E : SensorEvent, out S> : SensorSourceConsumer<E, S>, CoroutineScope {
/** The interval period. */
val samplingWindow: Duration
/** The update logic of the consumer. */
suspend fun update()
/** Runs the consumer scheduler. */
fun run() = launch {
while (true) {
update()
delay(samplingWindow)
}
}
}
The managers just take care of collecting the data and forwarding it to the appropriate consumer. For example, the ThermostatManager:
suspend fun run(sensorSource: Flow<TemperatureEntry>) {
thermostat.run()
temperatureSensorsChecker.run()
sensorSource.collect {
thermostat.react(it)
temperatureSensorsChecker.react(it)
}
}
Takeaways #
- Channels in Scala Gears are good to model flow of data that exist without application’s request from them: incoming network connections, event streams, etc…
- The scheduling mechanism of Task, along with the mutiplexer abstraction, despite having some stability issues, allows to implement flows of hot data which are listened by multiple consumers.
- Transformation operators inspired by the Reactive world could enhance the expressiveness of the framework, making it more suitable for modeling reactive event-based systems.