Skip to content

Commit

Permalink
[finagle#1078] Streaming with Monix's Observable
Browse files Browse the repository at this point in the history
  • Loading branch information
Avasil committed May 15, 2019
1 parent 32c6ac5 commit eb7f264
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 3 deletions.
18 changes: 15 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy val iterateeVersion = "0.18.0"
lazy val refinedVersion = "0.9.5"
lazy val catsEffectVersion = "1.3.0"
lazy val fs2Version = "1.0.4"
lazy val monixVersion = "3.0.0-RC2"

lazy val compilerOptions = Seq(
"-deprecation",
Expand Down Expand Up @@ -236,7 +237,7 @@ lazy val finch = project.in(file("."))
"io.circe" %% "circe-generic" % circeVersion
))
.aggregate(
core, fs2, iteratee, generic, argonaut, circe, benchmarks, test, jsonTest, examples, refined
core, fs2, iteratee, monix, generic, argonaut, circe, benchmarks, test, jsonTest, examples, refined
)
.dependsOn(core, iteratee, generic, circe)

Expand Down Expand Up @@ -264,6 +265,16 @@ lazy val fs2 = project
)
.dependsOn(core % "compile->compile;test->test")

lazy val monix = project
.settings(moduleName := "finchx-monix")
.settings(allSettings)
.settings(
libraryDependencies ++= Seq(
"io.monix" %% "monix-reactive" % monixVersion
)
)
.dependsOn(core % "compile->compile;test->test")

lazy val generic = project
.settings(moduleName := "finchx-generic")
.settings(allSettings)
Expand Down Expand Up @@ -306,10 +317,11 @@ lazy val circe = project
"io.circe" %% "circe-iteratee" % circeIterateeVersion,
"io.circe" %% "circe-fs2" % circeFs2Version,
"io.circe" %% "circe-jawn" % circeVersion,
"io.monix" %% "monix-circe" % "0.0.1",
"io.circe" %% "circe-generic" % circeVersion % "test"
)
)
.dependsOn(core, jsonTest % "test")
.dependsOn(core, monix, jsonTest % "test")

lazy val refined = project
.settings(moduleName := "finchx-refined")
Expand Down Expand Up @@ -357,7 +369,7 @@ lazy val examples = project
"com.twitter" %% "twitter-server" % twitterVersion
)
)
.dependsOn(core, circe, iteratee)
.dependsOn(core, circe, iteratee, monix)

lazy val benchmarks = project
.settings(moduleName := "finchx-benchmarks")
Expand Down
16 changes: 16 additions & 0 deletions circe/src/main/scala/io/finch/circe/Decoders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.circe.iteratee
import io.circe.jawn._
import io.finch.{Application, Decode, DecodeStream}
import io.finch.internal.HttpContent
import io.finch.monix.ObservableF
import io.iteratee.Enumerator
import java.nio.charset.StandardCharsets

Expand Down Expand Up @@ -53,4 +54,19 @@ trait Decoders {
}
parsed.through(fs2.decoder[F, A])
})

implicit def monixCirce[F[_], A: Decoder]: DecodeStream.Json[ObservableF, F, A] =
DecodeStream.instance[ObservableF, F, A, Application.Json]((stream, cs) => {
val parsed = cs match {
case StandardCharsets.UTF_8 =>
stream
.map(_.asByteArray)
.liftByOperator(monix.circe.byteStreamParser)
case _ =>
stream
.map(_.asString(cs))
.liftByOperator(monix.circe.stringStreamParser)
}
monix.circe.decoder(parsed)
})
}
84 changes: 84 additions & 0 deletions examples/src/main/scala/io/finch/monix/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.finch.monix

import _root_.monix.eval.{Task, TaskApp}
import _root_.monix.execution.Scheduler
import _root_.monix.reactive.Observable
import cats.effect.{ExitCode, Resource}
import cats.implicits._
import com.twitter.finagle.{Http, ListeningServer}
import com.twitter.util.Future
import io.circe.generic.auto._
import io.finch._
import io.finch.circe._
import scala.util.Random

/**
* A Finch application featuring Monix Observable-based streaming support.
* This approach is more advanced and performant then basic [[com.twitter.concurrent.AsyncStream]]
*
* There are three endpoints in this example:
*
* 1. `sumJson` - streaming request
* 2. `streamJson` - streaming response
* 3. `isPrime` - end-to-end (request - response) streaming
*
* Use the following sbt command to run the application.
*
* {{{
* $ sbt 'examples/runMain io.finch.monix.Main'
* }}}
*
* Use the following HTTPie/curl commands to test endpoints.
*
* {{{
* $ curl -X POST --header "Transfer-Encoding: chunked" -d '{"i": 40} {"i": 2}' localhost:8081/sumJson
*
* $ http --stream GET :8081/streamJson
*
* $ curl -X POST --header "Transfer-Encoding: chunked" -d '{"i": 40} {"i": 42}' localhost:8081/streamPrime
* }}}
*/
object Main extends TaskApp with EndpointModule[Task] {

override implicit def scheduler: Scheduler = super.scheduler

final case class Result(result: Int) {
def add(n: Number): Result = copy(result = result + n.i)
}

final case class Number(i: Int) {
def isPrime: IsPrime = IsPrime(!(2 :: (3 to Math.sqrt(i.toDouble).toInt by 2).toList exists (i % _ == 0)))
}

final case class IsPrime(isPrime: Boolean)

private def stream: Stream[Int] = Stream.continually(Random.nextInt())

val sumJson: Endpoint[Task, Result] = post("sumJson" :: jsonBodyStream[ObservableF, Number]) {
o: Observable[Number] =>
o.foldLeftL(Result(0))(_ add _).map(Ok)
}

val streamJson: Endpoint[Task, ObservableF[Task, Number]] = get("streamJson") {
Ok(Observable.fromIterable(stream).map(Number.apply))
}

val isPrime: Endpoint[Task, ObservableF[Task, IsPrime]] =
post("streamPrime" :: jsonBodyStream[ObservableF, Number]) { o: Observable[Number] =>
Ok(o.map(_.isPrime))
}

def serve: Task[ListeningServer] = Task(
Http.server
.withStreaming(enabled = true)
.serve(":8081", (sumJson :+: streamJson :+: isPrime).toServiceAs[Application.Json])
)

def run(args: List[String]): Task[ExitCode] = {
val server = Resource.make(serve)(s =>
Task.suspend(implicitly[ToAsync[Future, Task]].apply(s.close()))
)

server.use(_ => Task.never).as(ExitCode.Success)
}
}
154 changes: 154 additions & 0 deletions monix/src/main/scala/io/finch/monix/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.finch

import java.nio.charset.Charset

import _root_.monix.eval.TaskLift
import _root_.monix.reactive.Observable
import cats.effect._
import com.twitter.io.{Buf, Pipe, Reader}
import com.twitter.util.Future
import io.finch.internal.newLine
import io.finch.monix.ObservableF

package object monix extends ObservableConcurrentEffectInstances {

type ObservableF[F[_], A] = Observable[A]

implicit def aliasResponseToRealResponse[F[_], A, CT <: Application.Json](implicit
tr: ToResponse.Aux[F, ObservableF[F, A], CT]
): ToResponse.Aux[F, Observable[A], CT] = tr

implicit def observableLiftReader[F[_]](implicit
F: Effect[F],
TA: ToAsync[Future, F]
): LiftReader[ObservableF, F] =
new LiftReader[ObservableF, F] {
final def apply[A](reader: Reader[Buf], process: Buf => A): ObservableF[F, A] = {
Observable
.repeatEvalF(F.suspend(TA(reader.read())))
.takeWhile(_.isDefined)
.collect { case Some(buf) => process(buf) }
.guaranteeF(F.delay(reader.discard()))
}
}

implicit def encodeBufConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, CT <: String]: EncodeStream.Aux[F, ObservableF, Buf, CT] =
new EncodeConcurrentEffectObservable[F, Buf, CT] {
protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk
}
}

trait ObservableConcurrentEffectInstances extends ObservableEffectInstances {

implicit def encodeJsonConcurrentObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit
A: Encode.Json[A]
): EncodeStream.Json[F, ObservableF, A] =
new EncodeNewLineDelimitedConcurrentEffectObservable[F, A, Application.Json]

implicit def encodeSseConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit
A: Encode.Aux[A, Text.EventStream]
): EncodeStream.Aux[F, ObservableF, A, Text.EventStream] =
new EncodeNewLineDelimitedConcurrentEffectObservable[F, A, Text.EventStream]

implicit def encodeTextConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit
A: Encode.Text[A]
): EncodeStream.Text[F, ObservableF, A] =
new EncodeConcurrentEffectObservable[F, A, Text.Plain] {
override protected def encodeChunk(chunk: A, cs: Charset): Buf =
A(chunk, cs)
}

implicit def encodeBufEffectObservable[F[_] : Effect : TaskLift, CT <: String]: EncodeStream.Aux[F, ObservableF, Buf, CT] =
new EncodeEffectObservable[F, Buf, CT] {
protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk
}
}

trait ObservableEffectInstances extends ObservableInstances {

implicit def encodeJsonEffectObservable[F[_] : Effect : TaskLift, A](implicit
A: Encode.Json[A]
): EncodeStream.Json[F, ObservableF, A] =
new EncodeNewLineDelimitedEffectObservable[F, A, Application.Json]

implicit def encodeSseEffectObservable[F[_] : Effect : TaskLift, A](implicit
A: Encode.Aux[A, Text.EventStream]
): EncodeStream.Aux[F, ObservableF, A, Text.EventStream] =
new EncodeNewLineDelimitedEffectObservable[F, A, Text.EventStream]

implicit def encodeTextEffectObservable[F[_] : Effect : TaskLift, A](implicit
A: Encode.Text[A]
): EncodeStream.Text[F, ObservableF, A] =
new EncodeEffectObservable[F, A, Text.Plain] {
override protected def encodeChunk(chunk: A, cs: Charset): Buf =
A(chunk, cs)
}
}

trait ObservableInstances {

protected final class EncodeNewLineDelimitedConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A, CT <: String](implicit
A: Encode.Aux[A, CT]
) extends EncodeConcurrentEffectObservable[F, A, CT] {
protected def encodeChunk(chunk: A, cs: Charset): Buf =
A(chunk, cs).concat(newLine(cs))
}

protected final class EncodeNewLineDelimitedEffectObservable[F[_] : Effect : TaskLift, A, CT <: String](implicit
A: Encode.Aux[A, CT]
) extends EncodeEffectObservable[F, A, CT] {
protected def encodeChunk(chunk: A, cs: Charset): Buf =
A(chunk, cs).concat(newLine(cs))
}

protected abstract class EncodeConcurrentEffectObservable[F[_] : TaskLift, A, CT <: String](implicit
F: ConcurrentEffect[F],
TA: ToAsync[Future, F]
) extends EncodeObservable[F, A, CT] {
protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]] =
F.bracketCase(F.start(run))(_ => F.pure(reader)) {
case (f, ExitCase.Canceled) => f.cancel
case _ => F.unit
}
}

protected abstract class EncodeEffectObservable[F[_]: TaskLift, A, CT <: String](implicit
F: Effect[F],
TA: ToAsync[Future, F]
) extends EncodeObservable[F, A, CT] with (Either[Throwable, Unit] => IO[Unit]) {

def apply(cb: Either[Throwable, Unit]): IO[Unit] = IO.unit

protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]] =
F.productR(F.runAsync(run)(this).to[F])(F.pure(reader))
}

protected abstract class EncodeObservable[F[_]: TaskLift, A, CT <: String](implicit
F: Effect[F],
TA: ToAsync[Future, F]
) extends EncodeStream[F, ObservableF, A] {

type ContentType = CT

protected def encodeChunk(chunk: A, cs: Charset): Buf

protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]]

override def apply(s: ObservableF[F, A], cs: Charset): F[Reader[Buf]] =
F.suspend {
val p = new Pipe[Buf]

val run = s
.map(chunk => encodeChunk(chunk, cs))
.mapEvalF(chunk => TA(p.write(chunk)))
.guaranteeF(F.suspend(TA(p.close())))
.completedL
.to[F]

dispatch(p, run)

}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.finch.monix

import cats.effect.{ConcurrentEffect, Effect, IO}
import com.twitter.io.Buf
import io.finch.{FinchSpec, StreamingLaws}
import monix.eval.TaskLift
import monix.execution.Scheduler
import monix.reactive.Observable
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks

class MonixObservableStreamingSpec extends FinchSpec with ScalaCheckDrivenPropertyChecks {

implicit val s = Scheduler.global

checkEffect[IO]
checkConcurrentEffect[IO]

def checkEffect[F[_]: TaskLift](implicit F: Effect[F]): Unit =
checkAll("monixObservable.streamBody[F[_]: Effect]", StreamingLaws[ObservableF, F](
list => Observable(list:_*),
stream => F.toIO(stream.map(array => Buf.ByteArray.Owned(array)).toListL.to[F]).unsafeRunSync()
).all)

def checkConcurrentEffect[F[_]: TaskLift](implicit F: ConcurrentEffect[F]): Unit =
checkAll("monixObservable.streamBody[F[_]: ConcurrentEffect]", StreamingLaws[ObservableF, F](
list => Observable(list:_*),
stream => F.toIO(stream.map(array => Buf.ByteArray.Owned(array)).toListL.to[F]).unsafeRunSync()
).all)
}

0 comments on commit eb7f264

Please sign in to comment.