<style> .reveal section img { margin: 0px 0px; } .hljs .line { font-size: 0.60em; line-height: 1.2em; } </style> ## Functional Stream for Factorians ![](assets/crash-site.jpg) --- ### About Me * Senior scala developer * Toying with scala since 2016 * Joining Zendesk --- ### Introduction -- ### Factorio ![](assets/factorio.jpg) -- ### Backpressure ![](assets/backpressure1.png) -- ### Backpressure ![](assets/backpressure2.png) -- ### Backpressure ![](assets/backpressure3.png) -- ### FS2 ![](assets/fs2-logo.png) --- ### Let's get started --- ### Mining Drill ![](assets/mining-drill.gif) <!-- .element: id="left" --> ```scala import fs2.Stream val mineIron: IO[IronOre] = for { _ <- IO.sleep(2.second) _ <- log("Produced 1 Iron ore") } yield IronOre() def ironMiningDrill: Stream[IO, IronOre] = Stream.eval(mineIron).repeat def run(stream: Stream[IO, _]): Unit = stream.take(10).compile.drain.unsafeRunSync() run(ironMiningDrill) ``` <!-- .element: id="right" class="fragment" data-fragment-index="1" --> <asciinema-player src="assets/mining-drill.cast" cols="150" rows="10" autoplay="1" loop="1" class="fragment" data-fragment-index="2"></asciinema-player> --- ### Furnace ![](assets/furnace.gif) <!-- .element: id="left" --> ![](assets/ironPlateRecipe.png) <!-- .element: id="right" style="margin-bottom: 0px;"--> ```scala type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O] ``` <!-- .element: id="right" class="fragment" --> -- ### Furnace ![](assets/furnace.gif) <!-- .element: id="left" --> ![](assets/ironPlateRecipe.png) <!-- .element: id="right" style="margin-bottom: 0px;"--> ```scala import fs2.Pipe def smeltIron(ore: IronOre): IO[Iron] = for { _ <- IO.sleep(3.2.second) _ <- log("Smelted 1 Iron ore in 1 Iron plate") } yield Iron() val ironFurnace: Pipe[IO, IronOre, Iron] = (input: Stream[IO, IronOre]) => input.evalMap(smeltIron) run(ironMiningDrill.through(ironFurnace)) ``` <!-- .element: id="right" style="margin-top: 0px;"--> <asciinema-player src="assets/furnace.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player> <!-- .element: class="fragment" --> -- ### Improved Furnace ![](assets/furnace.gif) <!-- .element: id="left" --> ![](assets/ironPlateRecipe.png) <!-- .element: id="right" style="margin-bottom: 0px;"--> ```scala import fs2.Pipe def smeltIron(ore: IronOre): IO[Iron] = for { _ <- IO.sleep(3.2.second) _ <- log("Smelted 1 Iron ore in 1 Iron plate") } yield Iron() val ironFurnaceWithBuffer: Pipe[IO, IronOre, Iron] = (input: Stream[IO, IronOre]) => input.prefetchN(50).evalMap(smeltIron) run(ironMiningDrill.through(ironFurnaceWithBuffer)) ``` <!-- .element: id="right" style="margin-top: 0px;"--> <asciinema-player src="assets/improved-furnace.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player> <!-- .element: class="fragment" --> --- ### Transport Belt ![](assets/belt.gif) <!-- .element: id="left" --> -- ### Transport Belt ```scala trait Queue[F[_], A] { def take: F[A] def tryTake: F[Option[A]] def offer(a: A): F[Unit] def tryOffer(a: A): F[Boolean] } ``` -- ### Transport Belt ![](assets/belt.gif) <!-- .element: id="left" --> ```scala import cats.effect.std.Queue val ironBelt: IO[Queue[IO,IronOre]] = Queue.bounded[IO, IronOre](50) val ironPipeline: Stream[IO, Iron] = for { belt <- Stream.eval(ironBelt) drill = ironMiningDrill.evalMap(belt.offer) furnace1 = Stream .repeatEval(belt.take) .through(ironFurnaceWithBuffer) furnace2 = Stream .repeatEval(belt.take) .through(ironFurnaceWithBuffer) iron <- (furnace1 merge furnace2) concurrently drill } yield iron run(ironPipeline) ``` <!-- .element: id="right" --> <asciinema-player src="assets/belt.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player> --- ### Assembly Machines ![](assets/electronic.gif) <!-- .element: id="left" --> ![](assets/coppercablerecipe.png) <!-- .element: id="right" --> ![](assets/electroniccircuitrecipe.png) <!-- .element: id="right" --> -- ### Assembly Machines ![](assets/electronic.gif) <!-- .element: id="left" --> ![](assets/coppercablerecipe.png) <!-- .element: id="right" --> ```scala def assembleCable( copper: Copper ): IO[List[Cable]] = for { _ <- IO.sleep(0.5.second) _ <- log("Built 2 Cable from 1 Copper plate") } yield List(Cable(), Cable()) val cableAssembler: Pipe[IO, Copper, Cable] = (input: Stream[IO, Copper]) => for { copperCables <- input .prefetchN(50) .evalMap(assembleCopperCable) copperCable <- Stream.emits(copperCables) } yield copperCable val cablePipeline: Stream[IO,Cable] = copperPipeline.through(cableAssembler) ``` <!-- .element: id="right" --> -- ### Assembly Machines ![](assets/electronic.gif) <!-- .element: id="left" --> ![](assets/electroniccircuitrecipe.png) <!-- .element: id="right" --> ```scala import fs2.Chunk def assembleCircuit( cables: Chunk[Cable], plate: Iron ): IO[Circuit] = for { _ <- IO(assert(cables.size == 3)) _ <- IO.sleep(0.5.second) _ <- log("Built 1 Circuit w/ 3 plates + 1 cable") } yield Circuit() val circuitAssembler : Pipe[IO, (Chunk[Cable], Iron), Circuit] = (input: Stream[IO, (Chunk[Cable], Iron)]) => input .prefetchN(50) .evalMap { case (cables, plate) => assembleCircuit(cables, plate) } val circuitPipeline: Stream[IO, Circuit] = (cablePipeline.chunkN(3,false) parZip ironPipeline) .through(circuitAssembler) ``` <!-- .element: id="right" --> -- ### Assembly Machines <asciinema-player src="assets/electronic.cast" cols="150" rows="20" autoplay="1" loop="1"></asciinema-player> --- ### Splitter (Fan In) ![](assets/splitter-fanin.gif) <!-- .element: id="left" --> ```scala def splitterFanIn( left: Stream[IO, Iron], right: Stream[IO, Copper] ): Stream[IO, Metal] = (left merge right) run(splitterFanIn(copperPipeline, ironPipeline)) ``` <!-- .element: id="right" class="fragment" data-fragment-index="1" --> <asciinema-player src="assets/splitter-fanin.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player> <!-- .element: class="fragment" data-fragment-index="2" --> -- ### Splitter (Filter) ![](assets/splitter-filter.gif) <!-- .element: id="left" --> ```scala def splitterFilter( input: Stream[IO, Metal] ): Stream[IO, (Stream[IO, Iron], Stream[IO, Copper])] = for { left <- Stream.eval(Queue.bounded[IO, Iron](0)) leftOutput = Stream.repeatEval(left.take) right <- Stream.eval(Queue.bounded[IO, Copper](0)) rightOutput = Stream.repeatEval(right.take) filter = input .evalTap(ore => log(s"Processing element $ore")) .evalMap { case copper: Copper => right.offer(copper) case iron: Iron => left.offer(iron) } result <- Stream.emit( (leftOutput, rightOutput) ) concurrently filter } yield result ``` <!-- .element: id="right" class="fragment" data-fragment-index="1" --> -- ### Splitter (Filter) ![](assets/splitter-filter.gif) <!-- .element: id="left" --> ``` run( splitterFilter(copperPipeline merge ironPipeline) .flatMap { case (ironStream, copperStream) => copperStream.take(2) merge ironStream } ) ``` <!-- .element: id="right" --> <asciinema-player src="assets/splitter-filter.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player> <!-- .element: class="fragment" data-fragment-index="2" --> --- ### Let's go ! ![](assets/mall.jpeg) -- ### Just kidding --- ## Conclusion ![](assets/rocket.gif) --- ## The End #### Links: - Sources: [cutt.ly/8RD5Xhh](https://cutt.ly/8RD5Xhh) - Slides: [cutt.ly/XRD6qOG](https://cutt.ly/XRD6qOG)