Apache Pekko Series, Part 4: Streams & Reactive Processing
Source, Flow, and Sink — the building blocks of Pekko Streams. Backpressure by design, composable pipelines, and how to process data without dropping messages or crashing.
Push-based data pipelines break under load. A producer that generates data faster than the consumer can process it fills up queues, blows up buffers, or starts dropping messages. The fix is backpressure: a mechanism for consumers to signal upstream how fast they can accept data.
Pekko Streams implements the Reactive Streams specification — a standard for asynchronous stream processing with non-blocking backpressure. The consumer drives the rate. The producer only sends what was requested.
The Three Building Blocks
Source[Out] ──► Flow[In, Out] ──► Sink[In]
│ │ │
│ (transform, filter, │
│ map, async, etc.) │
└─────────────────────────────────────┘
RunnableGraph
Source[Out, Mat]— produces elements; has no inputFlow[In, Out, Mat]— transforms elements; has both input and outputSink[In, Mat]— consumes elements; has no outputMat— materialized value (the result produced when the stream runs, e.g., aFuture[Done]or a count)
Your First Stream
libraryDependencies += "org.apache.pekko" %% "pekko-stream" % PekkoVersion
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.stream.scaladsl._
implicit val system: ActorSystem[Nothing] =
ActorSystem(Behaviors.empty, "streams-example")
implicit val ec = system.executionContext
val result: Future[Int] =
Source(1 to 100) // Source[Int]
.filter(_ % 2 == 0) // Flow[Int, Int]: keep even numbers
.map(_ * 10) // Flow[Int, Int]: multiply
.runWith(Sink.fold(0)(_ + _)) // Sink[Int, Future[Int]]: sum
result.foreach(sum => println(s"Sum of even*10: $sum"))
// Output: Sum of even*10: 27500
The stream does not run until .runWith(sink) or .run() is called — this is the materialization step. Before that, it is just a description (a Graph).
Backpressure in Action
Pekko Streams propagates demand upstream automatically. A slow sink applies backpressure to all upstream stages:
Source
.tick(initialDelay = 0.seconds, interval = 1.millis, tick = "tick") // 1000/sec
.mapAsync(parallelism = 4) { _ =>
Future {
Thread.sleep(100) // simulate slow processing
"processed"
}
}
.runWith(Sink.ignore)
// The source will slow down automatically — backpressure prevents overflow
Compare this to a raw Iterator or Observable: if processing is slow you buffer, drop, or crash. With Pekko Streams you wait — the source is paused until the downstream is ready.
Common Operators
val numbers = Source(1 to 1000)
// Transform
numbers.map(_ * 2)
numbers.mapConcat(n => List(n, n + 1)) // flatMap for elements
// Filter
numbers.filter(_ > 500)
numbers.take(10)
numbers.drop(10)
// Async processing
numbers.mapAsync(parallelism = 8)(n => Future(expensiveCompute(n)))
numbers.mapAsyncUnordered(8)(n => Future(expensiveCompute(n))) // faster, order not preserved
// Grouping
numbers.grouped(10) // Source[Seq[Int]]
numbers.sliding(3) // Source[Seq[Int]] with a sliding window
numbers.groupedWithin(10, 1.second) // batch by count OR time
// Throttle
numbers.throttle(elements = 100, per = 1.second)
Fan-out and Fan-in
Streams can branch and merge using GraphDSL:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Int](2)) // one input, two outputs
val merge = b.add(Merge[String](2)) // two inputs, one output
val source = Source(1 to 10)
val sink = Sink.foreach[String](println)
source ~> broadcast
broadcast.out(0) ~> Flow[Int].map(n => s"path-A: $n") ~> merge
broadcast.out(1) ~> Flow[Int].map(n => s"path-B: ${n * 10}") ~> merge
merge ~> sink
ClosedShape
})
graph.run()
Integration with Actors
You can connect streams to actors using ActorSource and ActorSink, or the lower-level ask pattern:
import org.apache.pekko.stream.scaladsl.Flow
import org.apache.pekko.pattern.ask
val workerRef: ActorRef[WorkItem] = ...
val processingFlow: Flow[WorkItem, WorkResult, NotUsed] =
Flow[WorkItem].mapAsync(4) { item =>
workerRef.ask[WorkResult](ref => item.copy(replyTo = ref))
}
Streams provide the buffering, backpressure, and error handling. Actors provide stateful processing. Together they handle both stateless transformations and stateful business logic.
Key Takeaways
Source → Flow → Sinkis a declarative description;.run()materializes it- Backpressure is automatic — fast producers slow down when downstream is busy
mapAsync(parallelism)(f)— runfconcurrently with bounded parallelismgroupedWithin— batch by count or time window, whichever comes firstGraphDSLenables fan-out (Broadcast) and fan-in (Merge, Zip)- Streams and actors are composable — use each for what it does best
Next: HTTP with Pekko