Akka Streaming is part of the Typesafe platform, it can be used to build high performance, fault tolerant and scalable streaming data services. In this post I will describe how you can implement some of the features included in Akka Streaming using only simple streams of integers and strings.
Dependencies in SBT
First, we need to to specify some library dependencies in build.sbt:
name := "akkaStreamExample"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= {
val akkaV = "2.3.12"
val akkaStreamV = "2.0-M1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-stream-experimental" % akkaStreamV)
}
Source
A streaming source can come from an API such as Twitter or Reddit, using Akka HTTP for queries and transformation from JSON to Scala objects for further manipulation. We will cover this in a later post. For now, we will define a ticking stream of integers and investigate how we can transform and output this stream using Flow
s and Sink
s respectively. Here is a source which outputs a steady stream of 1s every second
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladas.Source
object streaming {
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val in = Source.tick(1.second, 1.second, 1)
}
Flow
A Flow
is a data processing stage, it can filter and map the stream. We will define a data flow which takes in an integer, then doubles it
val doubleFlow = Flow[Int].map(_*2)
Sink
A Sink
is an endpoint to a stream, we can use it to print to the console, write to a database or another external service. Our sink is defined to take Ints and print each one to the console
val out = Sink.foreach(println)
Putting it all together
Now we want to get our stream of twos printing to the console, we must define a main method for the streaming
object connecting the Source
to the Flow
and finally to the Sink
.
import akka.actor.ActorSystem
import akka.stream.{ClosedShape, ActorMaterializer}
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{Sink, FlowGraph, Flow, Source}
import scala.concurrent.duration._
object streaming {
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val in = Source.tick(1.second, 1.second, 1)
val doubleFlow = Flow[Int].map(_*2)
val out = Sink.foreach(println)
def main(args: Array[String]) {
in.via(doubleFlow).runWith(out)
}
}
You can now run this code block using sbt run
from the terminal. We should get a stream of twos emitting once every second.
Graph DSL
Akka Streaming provides a domain specific language (DSL) to express stream processing pipelines using a graph. Here is another way to define the main method using a RunnableGraph
:
import FlowGraph.Implicits._
def main(args: Array[String]) {
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
in ~> doubleFlow ~> out
ClosedShape
})
g.run(materializer)
}
The graph DSL requires a bit of work, namely defining a runnable graph and specifying that the graph is closed. This is because we can define partial graphs (a graph which isn't connected to a Source
, Sink
or both) which compose with other graphs, handy if you want to reuse a block of processing. We specify a partial graph using FlowShape(inlet, outlet)
instead of ClosedGraph
.
Merging a stream
Let's consider we have a stream of ones, and our current stream of twos. We can merge these two streams into one, because they contain the same element type (Int). We cannot guarantee the order in which the elements will emit from the merge state, MergePreferred
gives full control over how the merge is specified.
Let's reuse our stream of ones and merge it into the stream of twos we already have:
def main(args: Array[String]): Unit = {
RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
val merge = builder.add(Merge[Int](2))
in ~> doubleFlow ~> merge ~> out
in ~> merge
ClosedShape
}).run(materializer)
}
We add a Merge
to the graph builder, Merge
requires we specify the type of the stream elements we are merging and the number of stream sources we are merging. We then alter the stream processing flow to include a stream of ones and perform the merge.
In this case, if the stream of ones was publishing at a higher frequency than the other stream, we would have a stream with more ones than twos. ie:
1 1 1 2 1 1 1 2 …
Zipping a Stream
We can zip a stream just as we can zip collections in Scala. This results in a tuple, which can have heterogeneous types. Zip requires that both streams have an element available, so if one stream is publishing at a quicker rate than the others there will be buffering of those elements.
To illustrate this, we will build a continuous stream of natural numbers:
val naturalNumbers = Source(Stream.from(1))
Now if we zip together a continuous source of ones which publishes every ten seconds, Zip
will wait for both streams to have an element before publishing the tuple of both streams, guaranteeing order.
val in = Source.tick(1.second, 10.seconds, 1)
val zipStream = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
val zip = builder.add(Zip[Int, Int])
naturalNumbers ~> zip.in0
in ~> zip.in1
zip.out ~> out
ClosedShape
})
def main(args: Array[String]): Unit = {
zipStream.run(materializer)
}
The output from this stream is: (1,1) (2,1) (3,1) …
Broadcasting a stream
The opposite of merging a stream, is splitting it at a point, this could be because you want to perform different processing steps on the same stream elements.
We shall work with the stream of natural numbers, broacast them into two streams, double one side and print to the console and simply ignore the other stream.
val broadcastStream = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
val bcast = builder.add(Broadcast[Int](2))
naturalNumbers ~> bcast
bcast ~> doubleFlow ~> out
bcast ~> Sink.ignore
ClosedShape
})
def main(args: Array[String]): Unit = {
broadcastStream.run(materializer)
}
Throttling a stream
When we run the previous example, the stream prints to the screen extremely fast. We can barely see the even numbers rushing past. We might want to make a flow which throttles the rate at which the stream publishes, we remember that Zip
guarantees the order of elements and waits until all elements are available. If we Zip
the stream with a ticking stream of unit elements and return only the integer item of the tuple, we can reduce the rate the natural number source publishes:
val broadcastStream = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Unit.type])
Source.tick(1.second, 1.second, Unit) ~> zip.in1
naturalNumbers ~> bcast
bcast ~> doubleFlow ~> zip.in0
bcast ~> Sink.ignore
zip.out ~> Flow[(Int, Unit.type)].map(_._1) ~> out
ClosedShape
})
def main(args: Array[String]): Unit = {
broadcastStream.run(materializer)
}
There are many more built in processing stages detailed in the Akka Streaming Documentation