Practical Introduction to Akka Streaming

Akka Streaming is part of the Typesafe platform, it can be used to build high performance, fault tolerant and scalable streaming data services. In this post I will describe how you can implement some of the features included in Akka Streaming using only simple streams of integers and strings.

Dependencies in SBT

First, we need to to specify some library dependencies in build.sbt:

name := "akkaStreamExample"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= {
val akkaV = "2.3.12"
val akkaStreamV = "2.0-M1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-stream-experimental" % akkaStreamV)
}

Source

A streaming source can come from an API such as Twitter or Reddit, using Akka HTTP for queries and transformation from JSON to Scala objects for further manipulation. We will cover this in a later post. For now, we will define a ticking stream of integers and investigate how we can transform and output this stream using Flows and Sinks respectively. Here is a source which outputs a steady stream of 1s every second

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladas.Source

object streaming {
  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

    val in = Source.tick(1.second, 1.second, 1)
}

Flow

A Flow is a data processing stage, it can filter and map the stream. We will define a data flow which takes in an integer, then doubles it

val doubleFlow = Flow[Int].map(_*2)

Sink

A Sink is an endpoint to a stream, we can use it to print to the console, write to a database or another external service. Our sink is defined to take Ints and print each one to the console

val out = Sink.foreach(println)

Putting it all together

Now we want to get our stream of twos printing to the console, we must define a main method for the streaming object connecting the Source to the Flow and finally to the Sink.

import akka.actor.ActorSystem
import akka.stream.{ClosedShape, ActorMaterializer}
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{Sink, FlowGraph, Flow, Source}
import scala.concurrent.duration._

object streaming {
  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val in = Source.tick(1.second, 1.second, 1)
  val doubleFlow = Flow[Int].map(_*2)
  val out = Sink.foreach(println)

  def main(args: Array[String]) {
      in.via(doubleFlow).runWith(out)
  }
}

You can now run this code block using sbt run from the terminal. We should get a stream of twos emitting once every second.

Graph DSL

Akka Streaming provides a domain specific language (DSL) to express stream processing pipelines using a graph. Here is another way to define the main method using a RunnableGraph:

import FlowGraph.Implicits._
def main(args: Array[String]) {
    val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
        in ~> doubleFlow ~> out
        ClosedShape
    })
    
    g.run(materializer)
}

The graph DSL requires a bit of work, namely defining a runnable graph and specifying that the graph is closed. This is because we can define partial graphs (a graph which isn't connected to a Source, Sink or both) which compose with other graphs, handy if you want to reuse a block of processing. We specify a partial graph using FlowShape(inlet, outlet) instead of ClosedGraph.

Merging a stream

Let's consider we have a stream of ones, and our current stream of twos. We can merge these two streams into one, because they contain the same element type (Int). We cannot guarantee the order in which the elements will emit from the merge state, MergePreferred gives full control over how the merge is specified.

Let's reuse our stream of ones and merge it into the stream of twos we already have:

def main(args: Array[String]): Unit = {
    RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
      val merge = builder.add(Merge[Int](2))
      in ~> doubleFlow ~> merge ~> out
                    in ~> merge
      ClosedShape
    }).run(materializer)
  }

We add a Merge to the graph builder, Merge requires we specify the type of the stream elements we are merging and the number of stream sources we are merging. We then alter the stream processing flow to include a stream of ones and perform the merge.

In this case, if the stream of ones was publishing at a higher frequency than the other stream, we would have a stream with more ones than twos. ie:

1 1 1 2 1 1 1 2 …

Zipping a Stream

We can zip a stream just as we can zip collections in Scala. This results in a tuple, which can have heterogeneous types. Zip requires that both streams have an element available, so if one stream is publishing at a quicker rate than the others there will be buffering of those elements.

To illustrate this, we will build a continuous stream of natural numbers:

val naturalNumbers = Source(Stream.from(1))

Now if we zip together a continuous source of ones which publishes every ten seconds, Zip will wait for both streams to have an element before publishing the tuple of both streams, guaranteeing order.

val in = Source.tick(1.second, 10.seconds, 1)

val zipStream = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
    val zip = builder.add(Zip[Int, Int])
    naturalNumbers ~> zip.in0
    in ~> zip.in1
    zip.out ~> out

    ClosedShape
  })
  
  def main(args: Array[String]): Unit = {
    zipStream.run(materializer)
  }

The output from this stream is: (1,1) (2,1) (3,1) …

Broadcasting a stream

The opposite of merging a stream, is splitting it at a point, this could be because you want to perform different processing steps on the same stream elements.

We shall work with the stream of natural numbers, broacast them into two streams, double one side and print to the console and simply ignore the other stream.

val broadcastStream = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
    val bcast = builder.add(Broadcast[Int](2))
    naturalNumbers ~> bcast

    bcast ~> doubleFlow ~> out
    bcast ~> Sink.ignore

    ClosedShape
  })

  def main(args: Array[String]): Unit = {
    broadcastStream.run(materializer)
  }

Throttling a stream

When we run the previous example, the stream prints to the screen extremely fast. We can barely see the even numbers rushing past. We might want to make a flow which throttles the rate at which the stream publishes, we remember that Zip guarantees the order of elements and waits until all elements are available. If we Zip the stream with a ticking stream of unit elements and return only the integer item of the tuple, we can reduce the rate the natural number source publishes:

val broadcastStream = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
    val bcast = builder.add(Broadcast[Int](2))
    val zip = builder.add(Zip[Int, Unit.type])
    Source.tick(1.second, 1.second, Unit) ~> zip.in1

    naturalNumbers ~> bcast

    bcast ~> doubleFlow ~> zip.in0
    bcast ~> Sink.ignore

    zip.out ~> Flow[(Int, Unit.type)].map(_._1) ~> out

    ClosedShape
  })

def main(args: Array[String]): Unit = {
    broadcastStream.run(materializer)
}

There are many more built in processing stages detailed in the Akka Streaming Documentation