<style>
.reveal section img {
margin: 0px 0px;
}
.hljs .line {
font-size: 0.60em;
line-height: 1.2em;
}
</style>
## Functional Stream for Factorians
![](assets/crash-site.jpg)
---
### About Me
* Senior scala developer
* Toying with scala since 2016
* Joining Zendesk
---
### Introduction
--
### Factorio
![](assets/factorio.jpg)
--
### Backpressure
![](assets/backpressure1.png)
--
### Backpressure
![](assets/backpressure2.png)
--
### Backpressure
![](assets/backpressure3.png)
--
### FS2
![](assets/fs2-logo.png)
---
### Let's get started
---
### Mining Drill
![](assets/mining-drill.gif)
<!-- .element: id="left" -->
```scala
import fs2.Stream
val mineIron: IO[IronOre] =
for {
_ <- IO.sleep(2.second)
_ <- log("Produced 1 Iron ore")
} yield IronOre()
def ironMiningDrill: Stream[IO, IronOre] =
Stream.eval(mineIron).repeat
def run(stream: Stream[IO, _]): Unit =
stream.take(10).compile.drain.unsafeRunSync()
run(ironMiningDrill)
```
<!-- .element: id="right" class="fragment" data-fragment-index="1" -->
<asciinema-player src="assets/mining-drill.cast" cols="150" rows="10" autoplay="1" loop="1" class="fragment" data-fragment-index="2"></asciinema-player>
---
### Furnace
![](assets/furnace.gif)
<!-- .element: id="left" -->
![](assets/ironPlateRecipe.png)
<!-- .element: id="right" style="margin-bottom: 0px;"-->
```scala
type Pipe[F[_], -I, +O] =
Stream[F, I] => Stream[F, O]
```
<!-- .element: id="right" class="fragment" -->
--
### Furnace
![](assets/furnace.gif)
<!-- .element: id="left" -->
![](assets/ironPlateRecipe.png)
<!-- .element: id="right" style="margin-bottom: 0px;"-->
```scala
import fs2.Pipe
def smeltIron(ore: IronOre): IO[Iron] =
for {
_ <- IO.sleep(3.2.second)
_ <- log("Smelted 1 Iron ore in 1 Iron plate")
} yield Iron()
val ironFurnace: Pipe[IO, IronOre, Iron] =
(input: Stream[IO, IronOre]) =>
input.evalMap(smeltIron)
run(ironMiningDrill.through(ironFurnace))
```
<!-- .element: id="right" style="margin-top: 0px;"-->
<asciinema-player src="assets/furnace.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player>
<!-- .element: class="fragment" -->
--
### Improved Furnace
![](assets/furnace.gif)
<!-- .element: id="left" -->
![](assets/ironPlateRecipe.png)
<!-- .element: id="right" style="margin-bottom: 0px;"-->
```scala
import fs2.Pipe
def smeltIron(ore: IronOre): IO[Iron] =
for {
_ <- IO.sleep(3.2.second)
_ <- log("Smelted 1 Iron ore in 1 Iron plate")
} yield Iron()
val ironFurnaceWithBuffer: Pipe[IO, IronOre, Iron] =
(input: Stream[IO, IronOre]) =>
input.prefetchN(50).evalMap(smeltIron)
run(ironMiningDrill.through(ironFurnaceWithBuffer))
```
<!-- .element: id="right" style="margin-top: 0px;"-->
<asciinema-player src="assets/improved-furnace.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player>
<!-- .element: class="fragment" -->
---
### Transport Belt
![](assets/belt.gif)
<!-- .element: id="left" -->
--
### Transport Belt
```scala
trait Queue[F[_], A] {
def take: F[A]
def tryTake: F[Option[A]]
def offer(a: A): F[Unit]
def tryOffer(a: A): F[Boolean]
}
```
--
### Transport Belt
![](assets/belt.gif)
<!-- .element: id="left" -->
```scala
import cats.effect.std.Queue
val ironBelt: IO[Queue[IO,IronOre]] =
Queue.bounded[IO, IronOre](50)
val ironPipeline: Stream[IO, Iron] = for {
belt <- Stream.eval(ironBelt)
drill = ironMiningDrill.evalMap(belt.offer)
furnace1 = Stream
.repeatEval(belt.take)
.through(ironFurnaceWithBuffer)
furnace2 = Stream
.repeatEval(belt.take)
.through(ironFurnaceWithBuffer)
iron <- (furnace1 merge furnace2) concurrently drill
} yield iron
run(ironPipeline)
```
<!-- .element: id="right" -->
<asciinema-player src="assets/belt.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player>
---
### Assembly Machines
![](assets/electronic.gif)
<!-- .element: id="left" -->
![](assets/coppercablerecipe.png)
<!-- .element: id="right" -->
![](assets/electroniccircuitrecipe.png)
<!-- .element: id="right" -->
--
### Assembly Machines
![](assets/electronic.gif)
<!-- .element: id="left" -->
![](assets/coppercablerecipe.png)
<!-- .element: id="right" -->
```scala
def assembleCable(
copper: Copper
): IO[List[Cable]] = for {
_ <- IO.sleep(0.5.second)
_ <- log("Built 2 Cable from 1 Copper plate")
} yield List(Cable(), Cable())
val cableAssembler: Pipe[IO, Copper, Cable] =
(input: Stream[IO, Copper]) =>
for {
copperCables <- input
.prefetchN(50)
.evalMap(assembleCopperCable)
copperCable <- Stream.emits(copperCables)
} yield copperCable
val cablePipeline: Stream[IO,Cable] =
copperPipeline.through(cableAssembler)
```
<!-- .element: id="right" -->
--
### Assembly Machines
![](assets/electronic.gif)
<!-- .element: id="left" -->
![](assets/electroniccircuitrecipe.png)
<!-- .element: id="right" -->
```scala
import fs2.Chunk
def assembleCircuit(
cables: Chunk[Cable],
plate: Iron
): IO[Circuit] =
for {
_ <- IO(assert(cables.size == 3))
_ <- IO.sleep(0.5.second)
_ <- log("Built 1 Circuit w/ 3 plates + 1 cable")
} yield Circuit()
val circuitAssembler
: Pipe[IO, (Chunk[Cable], Iron), Circuit] =
(input: Stream[IO, (Chunk[Cable], Iron)]) =>
input
.prefetchN(50)
.evalMap { case (cables, plate) =>
assembleCircuit(cables, plate)
}
val circuitPipeline: Stream[IO, Circuit] =
(cablePipeline.chunkN(3,false) parZip ironPipeline)
.through(circuitAssembler)
```
<!-- .element: id="right" -->
--
### Assembly Machines
<asciinema-player src="assets/electronic.cast" cols="150" rows="20" autoplay="1" loop="1"></asciinema-player>
---
### Splitter (Fan In)
![](assets/splitter-fanin.gif)
<!-- .element: id="left" -->
```scala
def splitterFanIn(
left: Stream[IO, Iron],
right: Stream[IO, Copper]
): Stream[IO, Metal] =
(left merge right)
run(splitterFanIn(copperPipeline, ironPipeline))
```
<!-- .element: id="right" class="fragment" data-fragment-index="1" -->
<asciinema-player src="assets/splitter-fanin.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player>
<!-- .element: class="fragment" data-fragment-index="2" -->
--
### Splitter (Filter)
![](assets/splitter-filter.gif)
<!-- .element: id="left" -->
```scala
def splitterFilter(
input: Stream[IO, Metal]
): Stream[IO, (Stream[IO, Iron], Stream[IO, Copper])] =
for {
left <- Stream.eval(Queue.bounded[IO, Iron](0))
leftOutput = Stream.repeatEval(left.take)
right <- Stream.eval(Queue.bounded[IO, Copper](0))
rightOutput = Stream.repeatEval(right.take)
filter = input
.evalTap(ore => log(s"Processing element $ore"))
.evalMap {
case copper: Copper => right.offer(copper)
case iron: Iron => left.offer(iron)
}
result <- Stream.emit(
(leftOutput, rightOutput)
) concurrently filter
} yield result
```
<!-- .element: id="right" class="fragment" data-fragment-index="1" -->
--
### Splitter (Filter)
![](assets/splitter-filter.gif)
<!-- .element: id="left" -->
```
run(
splitterFilter(copperPipeline merge ironPipeline)
.flatMap { case (ironStream, copperStream) =>
copperStream.take(2) merge ironStream
}
)
```
<!-- .element: id="right" -->
<asciinema-player src="assets/splitter-filter.cast" cols="150" rows="10" autoplay="1" loop="1"></asciinema-player>
<!-- .element: class="fragment" data-fragment-index="2" -->
---
### Let's go !
![](assets/mall.jpeg)
--
### Just kidding
---
## Conclusion
![](assets/rocket.gif)
---
## The End
#### Links:
- Sources: [cutt.ly/8RD5Xhh](https://cutt.ly/8RD5Xhh)
- Slides: [cutt.ly/XRD6qOG](https://cutt.ly/XRD6qOG)