From 39c28304803d2c8dcd5ef4a3531dd440794fcac4 Mon Sep 17 00:00:00 2001 From: Dima Date: Mon, 3 Sep 2018 11:58:58 +0300 Subject: [PATCH] Connection refactoring (#136) * add range call to js client, add random nodes processing for get and range requests * WIP experiments with StreamHandler * add StreamHandler implementation to websocket * grpc logic refactoring * delete unused class * fix default headers bug * refactoring, grpc-monix util methods * reshuffle libs * missed class * renaming and some docs * add TODOs, revert some unnecessary changes * client put refactoring * ClientPut refactoring * ClientGet refactoring * ClientRange refactoring * small fixes * delete println * add backpressure * small fixes --- build.sbt | 96 +++++- .../client/grpc/ClientWebsocketServices.scala | 10 +- .../client/grpc/ClientGrpcServices.scala | 26 +- .../grpc/client/ContractAllocatorClient.scala | 100 ------- .../grpc/client/ContractsCacheClient.scala | 99 ------- .../grpc/client/ContractAllocatorClient.scala | 36 ++- .../grpc/client/ContractsCacheClient.scala | 37 ++- .../fluence/dataset/client/ClientGet.scala | 218 ++++++++------ .../fluence/dataset/client/ClientPut.scala | 276 ++++++++++-------- .../fluence/dataset/client/ClientRange.scala | 190 +++++++----- .../dataset/client/DatasetClientUtils.scala | 50 ---- .../scala/fluence/dataset/client/Flow.scala | 8 + .../grpc/client/DatasetStorageClient.scala | 133 --------- .../grpc/client/DatasetStorageClient.scala | 74 ++--- .../main/scala/fluence/grpc/GrpcMonix.scala | 64 ++-- .../kad/grpc/client/KademliaClient.scala | 103 ------- .../kad/grpc/client/KademliaClient.scala} | 47 ++- .../fluence/kad/NetworkSimulationSpec.scala | 13 +- .../node/ClientNodeIntegrationSpec.scala | 5 +- project/plugins.sbt | 2 + .../scala/fluence/grpc/GrpcConnection.scala | 108 +++++++ .../scala/fluence/grpc/ServiceManager.scala | 71 +++++ .../scala/fluence/stream/Connection.scala | 53 ++++ .../grpc/proxy/GrpcWebsocketProxy.scala | 3 +- .../fluence/grpc/proxy/InProcessGrpc.scala | 7 +- ...oxyGrpc.scala => ProxyWebsocketGrpc.scala} | 28 +- .../fluence/grpc/proxy/ProxyCallSpec.scala | 14 +- .../fluence/transport}/ProtobufCodec.scala | 2 +- .../transport/websocket/GrpcProxyClient.scala | 100 ------- .../websocket/WebsocketConnection.scala | 87 ++++++ 30 files changed, 1012 insertions(+), 1048 deletions(-) delete mode 100644 contract/grpc/jvm/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala delete mode 100644 contract/grpc/jvm/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala rename contract/grpc/{js => }/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala (73%) rename contract/grpc/{js => }/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala (72%) delete mode 100644 dataset/client/src/main/scala/fluence/dataset/client/DatasetClientUtils.scala create mode 100644 dataset/client/src/main/scala/fluence/dataset/client/Flow.scala delete mode 100644 dataset/grpc/jvm/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala rename dataset/grpc/{js => }/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala (62%) rename {transport/grpc-monix => grpc-monix-converter}/src/main/scala/fluence/grpc/GrpcMonix.scala (53%) delete mode 100644 kademlia/grpc/jvm/src/main/scala/fluence/kad/grpc/client/KademliaClient.scala rename kademlia/grpc/{js/src/main/scala/fluence/kad/grpc/client/KademliaWebsocketClient.scala => src/main/scala/fluence/kad/grpc/client/KademliaClient.scala} (65%) create mode 100644 scala-multistream/grpc/jvm/src/main/scala/fluence/grpc/GrpcConnection.scala create mode 100644 scala-multistream/grpc/jvm/src/main/scala/fluence/grpc/ServiceManager.scala create mode 100644 scala-multistream/src/main/scala/fluence/stream/Connection.scala rename transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/{ProxyGrpc.scala => ProxyWebsocketGrpc.scala} (83%) rename transport/{websocket-js/src/main/scala/fluence/transport/websocket => grpc/src/main/scala/fluence/transport}/ProtobufCodec.scala (97%) delete mode 100644 transport/websocket-js/src/main/scala/fluence/transport/websocket/GrpcProxyClient.scala create mode 100644 transport/websocket-js/src/main/scala/fluence/transport/websocket/WebsocketConnection.scala diff --git a/build.sbt b/build.sbt index 54725edb63..10f2a01605 100644 --- a/build.sbt +++ b/build.sbt @@ -138,6 +138,7 @@ lazy val `kademlia-grpc-js` = `kademlia-grpc`.js .dependsOn(`transport-websocket-js`) lazy val `kademlia-grpc-jvm` = `kademlia-grpc`.jvm + .dependsOn(`grpc-monix-converter`) lazy val `kademlia-monix` = crossProject(JVMPlatform, JSPlatform) @@ -176,13 +177,15 @@ lazy val `kademlia` = crossProject(JVMPlatform, JSPlatform) scalaJSModuleKind in Test := ModuleKind.CommonJSModule ) .enablePlugins(AutomateHeaderPlugin) - .dependsOn(`kademlia-monix`, `kademlia-grpc`, `kademlia-testkit` % Test) + .dependsOn(`kademlia-monix`, `kademlia-grpc`, `scala-multistream-grpc`, `kademlia-testkit` % Test) lazy val `kademlia-js` = `kademlia`.js lazy val `kademlia-jvm` = `kademlia`.jvm -lazy val `transport-grpc-monix` = project - .in(file("transport/grpc-monix")) +//TODO utility project, could be replaced to another project +//TODO add unit tests for this project +lazy val `grpc-monix-converter` = project + .in(file("grpc-monix-converter")) .settings( commons, grpc, @@ -191,6 +194,7 @@ lazy val `transport-grpc-monix` = project scalatestKit ) ) + .dependsOn(`scala-multistream-jvm`) .enablePlugins(AutomateHeaderPlugin) lazy val `transport-grpc` = crossProject(JVMPlatform, JSPlatform) @@ -218,7 +222,7 @@ lazy val `transport-grpc` = crossProject(JVMPlatform, JSPlatform) scalaJSModuleKind in Test := ModuleKind.CommonJSModule ) .enablePlugins(AutomateHeaderPlugin) - .dependsOn(`transport-core`, `kademlia-protocol`) + .dependsOn(`transport-core`, `kademlia-protocol`, `scala-multistream`) lazy val `transport-grpc-js` = `transport-grpc`.js lazy val `transport-grpc-jvm` = `transport-grpc`.jvm @@ -258,7 +262,7 @@ lazy val `transport-grpc-proxy` = project scalatest ) ) - .dependsOn(`transport-core-jvm`, `websocket-protobuf-jvm`, `transport-grpc-monix`) + .dependsOn(`transport-core-jvm`, `websocket-protobuf-jvm`, `grpc-monix-converter`, `scala-multistream-grpc-jvm`) .enablePlugins(AutomateHeaderPlugin) lazy val `transport-websocket-js` = project @@ -282,7 +286,7 @@ lazy val `transport-websocket-js` = project ) .enablePlugins(ScalaJSPlugin) .enablePlugins(AutomateHeaderPlugin) - .dependsOn(`websocket-protobuf-js`) + .dependsOn(`websocket-protobuf-js`, `scala-multistream-js`) lazy val `transport-core` = crossProject(JVMPlatform, JSPlatform) .withoutSuffixFor(JVMPlatform) @@ -508,7 +512,7 @@ lazy val `dataset-grpc` = crossProject(JVMPlatform, JSPlatform) .enablePlugins(AutomateHeaderPlugin) .dependsOn(`dataset-client`, `transport-grpc`) -lazy val `dataset-grpc-jvm` = `dataset-grpc`.jvm.dependsOn(`dataset-node`, `transport-grpc-monix`) +lazy val `dataset-grpc-jvm` = `dataset-grpc`.jvm.dependsOn(`dataset-node`, `grpc-monix-converter`) lazy val `dataset-grpc-js` = `dataset-grpc`.js .enablePlugins(ScalaJSBundlerPlugin) .dependsOn(`transport-websocket-js`) @@ -669,15 +673,16 @@ lazy val `client-grpc` = crossProject(JVMPlatform, JSPlatform) .in(file("client/grpc")) .settings(commons) .jsSettings( - scalaJSModuleKind in Test := ModuleKind.CommonJSModule, - fork in Test := false + scalaJSModuleKind in Test := ModuleKind.CommonJSModule, + fork in Test := false ) .enablePlugins(AutomateHeaderPlugin) - .dependsOn(`client-core`, `transport-grpc`, `kademlia-grpc`, `dataset-grpc`, `contract-grpc`) + .dependsOn(`client-core`, `kademlia-grpc`, `dataset-grpc`, `contract-grpc`) lazy val `client-grpc-js` = `client-grpc`.js .enablePlugins(ScalaJSBundlerPlugin) lazy val `client-grpc-jvm` = `client-grpc`.jvm + .dependsOn(`scala-multistream-grpc-jvm`) lazy val `client-cli` = crossProject(JVMPlatform, JSPlatform) .withoutSuffixFor(JVMPlatform) @@ -715,14 +720,14 @@ lazy val `client-cli-app` = crossProject(JVMPlatform, JSPlatform) ) ) .jsSettings( - fork in Test := false, + fork in Test := false, scalaJSUseMainModuleInitializer := false, jsEnv in Compile := new org.scalajs.jsenv.jsdomnodejs.JSDOMNodeJSEnv(), workbenchStartMode := WorkbenchStartModes.Manual, skip in packageJSDependencies := false, scalaJSModuleKind in Test := ModuleKind.CommonJSModule, - webpackBundlingMode := BundlingMode.LibraryAndApplication(), - emitSourceMaps := false + webpackBundlingMode := BundlingMode.LibraryAndApplication(), + emitSourceMaps := false ) .enablePlugins(AutomateHeaderPlugin) .dependsOn(`client-cli`, `client-grpc`) @@ -765,3 +770,68 @@ lazy val `node` = project ) .enablePlugins(AutomateHeaderPlugin) .dependsOn(`node-grpc`) + +//TODO replace to another project +lazy val `scala-multistream` = crossProject(JVMPlatform, JSPlatform) + .withoutSuffixFor(JVMPlatform) + .crossType(FluenceCrossType) + .in(file("scala-multistream")) + .settings( + commons, + libraryDependencies ++= Seq( + "io.monix" %%% "monix" % MonixV + ) + ) + .jsSettings( + fork in Test := false + ) + .enablePlugins(AutomateHeaderPlugin) + +lazy val `scala-multistream-js` = `scala-multistream`.js +lazy val `scala-multistream-jvm` = `scala-multistream`.jvm + +lazy val `scala-multistream-grpc` = crossProject(JVMPlatform, JSPlatform) + .withoutSuffixFor(JVMPlatform) + .crossType(FluenceCrossType) + .in(file("scala-multistream/grpc")) + .settings( + commons, + protobuf, + grpc, + libraryDependencies ++= Seq( + "io.monix" %%% "monix" % MonixV, + "one.fluence" %%% "codec-protobuf" % CodecV, + scalatest + ) + ) + .jsSettings( + fork in Test := false + ) + .dependsOn(`scala-multistream`) + .enablePlugins(AutomateHeaderPlugin) + +lazy val `scala-multistream-grpc-js` = `scala-multistream-grpc`.js +lazy val `scala-multistream-grpc-jvm` = `scala-multistream-grpc`.jvm + .dependsOn(`grpc-monix-converter`) + +lazy val `scala-multistream-websocket` = crossProject(JVMPlatform, JSPlatform) + .withoutSuffixFor(JVMPlatform) + .crossType(FluenceCrossType) + .in(file("scala-multistream/websocket")) + .settings( + commons, + protobuf, + libraryDependencies ++= Seq( + "io.monix" %%% "monix" % MonixV, + "one.fluence" %%% "codec-protobuf" % CodecV, + scalatest + ) + ) + .jsSettings( + fork in Test := false + ) + .dependsOn(`scala-multistream`) + .enablePlugins(AutomateHeaderPlugin) + +lazy val `scala-multistream-websocket-js` = `scala-multistream-websocket`.js +lazy val `scala-multistream-websocket-jvm` = `scala-multistream-websocket`.jvm \ No newline at end of file diff --git a/client/grpc/js/src/main/scala/fluence/client/grpc/ClientWebsocketServices.scala b/client/grpc/js/src/main/scala/fluence/client/grpc/ClientWebsocketServices.scala index 70d947964c..560b579675 100644 --- a/client/grpc/js/src/main/scala/fluence/client/grpc/ClientWebsocketServices.scala +++ b/client/grpc/js/src/main/scala/fluence/client/grpc/ClientWebsocketServices.scala @@ -25,10 +25,10 @@ import fluence.contract.protocol.{ContractAllocatorRpc, ContractsCacheRpc} import fluence.crypto.signature.SignAlgo.CheckerFn import fluence.dataset.grpc.client.DatasetStorageClient import fluence.dataset.protocol.DatasetStorageRpc -import fluence.kad.grpc.client.KademliaWebsocketClient +import fluence.kad.grpc.client.KademliaClient import fluence.kad.protocol.{Contact, KademliaRpc} import fluence.proxy.grpc.WebsocketMessage -import fluence.transport.websocket.{ConnectionPool, WebsocketPipe, WebsocketT} +import fluence.transport.websocket._ import monix.execution.Scheduler import monix.reactive.Observable @@ -54,12 +54,12 @@ class ClientWebsocketServices(connectionPool: ConnectionPool[WebsocketMessage, W contact.websocketPort.map { wsPort ⇒ val url = "ws://" + contact.addr + ":" + wsPort - def connection: IO[WebsocketPipe[WebsocketMessage, WebsocketMessage]] = - IO(connectionPool.getOrCreateConnection(url)) + def connection: WebsocketConnection = + new WebsocketConnection(IO(connectionPool.getOrCreateConnection(url))) new ClientServices[F, BasicContract, Contact] { override def kademlia: KademliaRpc[Contact] = - new KademliaWebsocketClient(connection) + new KademliaClient(connection) override def contractsCache: ContractsCacheRpc[BasicContract] = new ContractsCacheClient[BasicContract](connection) diff --git a/client/grpc/jvm/src/main/scala/fluence/client/grpc/ClientGrpcServices.scala b/client/grpc/jvm/src/main/scala/fluence/client/grpc/ClientGrpcServices.scala index 7ede53d8b8..542e5913d9 100644 --- a/client/grpc/jvm/src/main/scala/fluence/client/grpc/ClientGrpcServices.scala +++ b/client/grpc/jvm/src/main/scala/fluence/client/grpc/ClientGrpcServices.scala @@ -17,17 +17,22 @@ package fluence.client.grpc -import cats.effect.Effect +import cats.effect.{Effect, IO} import fluence.client.core.ClientServices import fluence.contract.BasicContract import fluence.contract.grpc.client.{ContractAllocatorClient, ContractsCacheClient} +import fluence.contract.protobuf.grpc.{ContractAllocatorGrpc, ContractsCacheGrpc} import fluence.contract.protocol.{ContractAllocatorRpc, ContractsCacheRpc} import fluence.crypto.signature.SignAlgo.CheckerFn import fluence.dataset.grpc.client.DatasetStorageClient +import fluence.dataset.protobuf.grpc.DatasetStorageRpcGrpc import fluence.dataset.protocol.DatasetStorageRpc +import fluence.grpc.{GrpcConnection, ServiceManager} import fluence.kad.grpc.client.KademliaClient +import fluence.kad.protobuf.grpc.KademliaGrpc import fluence.kad.protocol.{Contact, KademliaRpc} import fluence.transport.grpc.client.GrpcClient +import io.grpc.{CallOptions, ManagedChannel} import monix.execution.Scheduler import monix.reactive.Observable import shapeless.HNil @@ -46,11 +51,22 @@ object ClientGrpcServices { import fluence.contract.grpc.BasicContractCodec.{codec ⇒ contractCodec} import fluence.kad.KademliaNodeCodec.{pureCodec ⇒ nodeCodec} + //TODO is it possible to avoid this? + val services = List( + KademliaGrpc.SERVICE, + ContractsCacheGrpc.SERVICE, + ContractAllocatorGrpc.SERVICE, + DatasetStorageRpcGrpc.SERVICE + ) + + val serviceManager = ServiceManager(services) + val handlerBuilder: IO[(ManagedChannel, CallOptions)] => GrpcConnection = GrpcConnection.builder(serviceManager) + val client = builder - .add(KademliaClient.register()) - .add(ContractsCacheClient.register[BasicContract]()) - .add(ContractAllocatorClient.register[BasicContract]()) - .add(DatasetStorageClient.register[F]()) + .add(handlerBuilder andThen KademliaClient.apply) + .add(handlerBuilder andThen ContractsCacheClient.apply[BasicContract]) + .add(handlerBuilder andThen ContractAllocatorClient.apply[BasicContract]) + .add(handlerBuilder andThen DatasetStorageClient.apply[F]) .build contact ⇒ diff --git a/contract/grpc/jvm/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala b/contract/grpc/jvm/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala deleted file mode 100644 index 74441a1f88..0000000000 --- a/contract/grpc/jvm/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (C) 2017 Fluence Labs Limited - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package fluence.contract.grpc.client - -import cats.effect.IO -import fluence.codec.Codec -import fluence.contract.ops.ContractValidate -import fluence.contract.protobuf.BasicContract -import fluence.contract.protobuf.grpc.ContractAllocatorGrpc -import fluence.contract.protobuf.grpc.ContractAllocatorGrpc.ContractAllocatorStub -import fluence.contract.protocol.ContractAllocatorRpc -import fluence.crypto.signature.SignAlgo.CheckerFn -import io.grpc.{CallOptions, ManagedChannel} - -import scala.concurrent.{ExecutionContext, Future} - -// todo unit test -class ContractAllocatorClient[C: ContractValidate]( - stub: IO[ContractAllocatorStub] -)( - implicit - codec: Codec[IO, C, BasicContract], - checkerFn: CheckerFn, - ec: ExecutionContext -) extends ContractAllocatorRpc[C] { - import ContractValidate.ContractValidatorOps - - private def run[A](fa: ContractAllocatorStub ⇒ Future[A]): IO[A] = IO.fromFuture(stub.map(fa)) - - /** - * Offer a contract. Node should check and preallocate required resources, save offer, and sign it. - * - * @param contract A blank contract - * @return Signed contract, or F is an error - */ - override def offer(contract: C): IO[C] = - for { - // we should validate contract before send outside for 'offering' - _ ← contract.validateME[IO] - offer ← codec.encode(contract) - resp ← run(_.offer(offer)) - respContract ← codec.decode(resp) - // contract from the outside required validation - _ ← respContract.validateME[IO] - } yield respContract - - /** - * Allocate dataset: store the contract, create storage structures, form cluster. - * - * @param contract A sealed contract with all nodes and client signatures - * @return Allocated contract - */ - override def allocate(contract: C): IO[C] = - for { - // we should validate contract before send outside for 'allocating' - _ ← contract.validateME[IO] - offer ← codec.encode(contract) - resp ← run(_.allocate(offer)) - respContract ← codec.decode(resp) - // contract from the outside required validation - _ ← respContract.validateME[IO] - } yield respContract - -} - -object ContractAllocatorClient { - - /** - * Shorthand to register inside NetworkClient. - * - * @param channelOptions Channel to remote node and Call options - */ - def register[C: ContractValidate]()( - channelOptions: IO[(ManagedChannel, CallOptions)] - )( - implicit - codec: Codec[IO, C, BasicContract], - checkerFn: CheckerFn, - ec: ExecutionContext - ): ContractAllocatorRpc[C] = - new ContractAllocatorClient[C](channelOptions.map { - case (channel, callOptions) ⇒ new ContractAllocatorGrpc.ContractAllocatorStub(channel, callOptions) - }) - -} diff --git a/contract/grpc/jvm/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala b/contract/grpc/jvm/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala deleted file mode 100644 index 7b10f9dec4..0000000000 --- a/contract/grpc/jvm/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (C) 2017 Fluence Labs Limited - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package fluence.contract.grpc.client - -import cats.effect.IO -import cats.syntax.applicativeError._ -import com.google.protobuf.ByteString -import fluence.codec.{Codec, PureCodec} -import fluence.contract.ops.ContractValidate -import fluence.contract.protobuf.grpc.ContractsCacheGrpc -import fluence.contract.protobuf.grpc.ContractsCacheGrpc.ContractsCacheStub -import fluence.contract.protobuf.{BasicContract, FindRequest} -import fluence.contract.protocol.ContractsCacheRpc -import fluence.crypto.signature.SignAlgo.CheckerFn -import fluence.kad.KeyProtobufCodecs._ -import fluence.kad.protocol.Key -import io.grpc.{CallOptions, ManagedChannel} - -import scala.concurrent.ExecutionContext - -class ContractsCacheClient[C: ContractValidate](stub: IO[ContractsCacheStub])( - implicit - codec: Codec[IO, C, BasicContract], - checkerFn: CheckerFn, - ec: ExecutionContext -) extends ContractsCacheRpc[C] with slogging.LazyLogging { - import ContractValidate.ContractValidatorOps - - private val keyC = PureCodec.codec[Key, ByteString] - - /** - * Tries to find a contract in local cache. - * - * @param id Dataset ID - * @return Optional locally found contract - */ - override def find(id: Key): IO[Option[C]] = - (for { - idBs ← keyC.direct.runF[IO](id) - req = FindRequest(idBs) - binContract ← IO.fromFuture(stub.map(_.find(req))) - contract ← codec.decode(binContract) - // contract from the outside required validation - _ ← contract.validateME[IO] - } yield Option(contract)).recover { - case err ⇒ - logger.warn(s"Finding contract failed, cause=$err", err) - None - } - - /** - * Ask node to cache the contract. - * - * @param contract Contract to cache - * @return If the contract is cached or not - */ - override def cache(contract: C): IO[Boolean] = - for { - // we should validate contract before send outside to caching - _ ← contract.validateME[IO] - binContract ← codec.encode(contract) - resp ← IO.fromFuture(stub.map(_.cache(binContract))) - } yield resp.cached -} - -object ContractsCacheClient { - - /** - * Shorthand to register inside NetworkClient. - * - * @param channelOptions Channel to remote node and Call options - */ - def register[C: ContractValidate]()( - channelOptions: IO[(ManagedChannel, CallOptions)] - )( - implicit - codec: Codec[IO, C, BasicContract], - checkerFn: CheckerFn, - ec: ExecutionContext - ): ContractsCacheRpc[C] = - new ContractsCacheClient[C](channelOptions.map { - case (channel, callOptions) ⇒ new ContractsCacheGrpc.ContractsCacheStub(channel, callOptions) - }) -} diff --git a/contract/grpc/js/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala b/contract/grpc/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala similarity index 73% rename from contract/grpc/js/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala rename to contract/grpc/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala index fd5997b093..231aa3c024 100644 --- a/contract/grpc/js/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala +++ b/contract/grpc/src/main/scala/fluence/contract/grpc/client/ContractAllocatorClient.scala @@ -23,24 +23,21 @@ import fluence.contract.ops.ContractValidate import fluence.contract.protobuf.BasicContract import fluence.contract.protocol.ContractAllocatorRpc import fluence.crypto.signature.SignAlgo.CheckerFn -import fluence.proxy.grpc.WebsocketMessage -import fluence.transport.websocket.{ConnectionPool, GrpcProxyClient, WebsocketPipe} +import fluence.stream.Connection import monix.execution.Scheduler /** - * Contract allocator client for websocket. + * Contract allocator client. * - * @param connection Websocket pipe. */ -class ContractAllocatorClient[C: ContractValidate](connection: IO[WebsocketPipe[WebsocketMessage, WebsocketMessage]])( +class ContractAllocatorClient[C: ContractValidate](connection: Connection)( implicit codec: Codec[IO, C, BasicContract], checkerFn: CheckerFn, ec: Scheduler ) extends ContractAllocatorRpc[C] { import ContractValidate.ContractValidatorOps - - import fluence.transport.websocket.ProtobufCodec._ + import fluence.transport.ProtobufCodec._ private val service = "fluence.contract.protobuf.grpc.ContractAllocator" @@ -55,10 +52,10 @@ class ContractAllocatorClient[C: ContractValidate](connection: IO[WebsocketPipe[ // we should validate contract before send outside for 'offering' _ ← contract.validateME[IO] offer ← codec.encode(contract) - websocket ← connection - proxy = GrpcProxyClient - .proxy(service, "offer", websocket, generatedMessageCodec, protobufDynamicCodec(BasicContract)) - resp ← IO.fromFuture(IO(proxy.requestAndWaitOneResult(offer))) + request ← generatedMessageCodec.runF[IO](offer) + responseBytes ← connection + .handleUnary(service, "offer", request) + resp ← protobufDynamicCodec(BasicContract).runF[IO](responseBytes) respContract ← codec.decode(resp) // contract from the outside required validation _ ← respContract.validateME[IO] @@ -75,13 +72,22 @@ class ContractAllocatorClient[C: ContractValidate](connection: IO[WebsocketPipe[ // we should validate contract before send outside for 'allocating' _ ← contract.validateME[IO] offer ← codec.encode(contract) - websocket ← connection - proxy = GrpcProxyClient - .proxy(service, "allocate", websocket, generatedMessageCodec, protobufDynamicCodec(BasicContract)) - resp ← IO.fromFuture(IO(proxy.requestAndWaitOneResult(offer))) + request ← generatedMessageCodec.runF[IO](offer) + responseBytes ← connection + .handleUnary(service, "allocate", request) + resp ← protobufDynamicCodec(BasicContract).runF[IO](responseBytes) respContract ← codec.decode(resp) // contract from the outside required validation _ ← respContract.validateME[IO] } yield respContract } + +object ContractAllocatorClient { + def apply[C : ContractValidate](streamHandler: Connection)( + implicit + codec: Codec[IO, C, BasicContract], + checkerFn: CheckerFn, + ec: Scheduler + ): ContractAllocatorRpc[C] = new ContractAllocatorClient(streamHandler) +} diff --git a/contract/grpc/js/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala b/contract/grpc/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala similarity index 72% rename from contract/grpc/js/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala rename to contract/grpc/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala index 61152adb8a..0c684dbd9e 100644 --- a/contract/grpc/js/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala +++ b/contract/grpc/src/main/scala/fluence/contract/grpc/client/ContractsCacheClient.scala @@ -27,16 +27,14 @@ import fluence.contract.protocol.ContractsCacheRpc import fluence.crypto.signature.SignAlgo.CheckerFn import fluence.kad.KeyProtobufCodecs._ import fluence.kad.protocol.Key -import fluence.proxy.grpc.WebsocketMessage -import fluence.transport.websocket.{ConnectionPool, GrpcProxyClient, WebsocketPipe} +import fluence.stream.Connection import monix.execution.Scheduler /** - * Contract client for websocket. + * Contract client. * - * @param connection Websocket pipe. */ -class ContractsCacheClient[C: ContractValidate](connection: IO[WebsocketPipe[WebsocketMessage, WebsocketMessage]])( +class ContractsCacheClient[C: ContractValidate](connection: Connection)( implicit codec: Codec[IO, C, BasicContract], checkerFn: CheckerFn, @@ -46,7 +44,7 @@ class ContractsCacheClient[C: ContractValidate](connection: IO[WebsocketPipe[Web private val keyC = PureCodec.codec[Key, ByteString] - import fluence.transport.websocket.ProtobufCodec._ + import fluence.transport.ProtobufCodec._ private val service = "fluence.contract.protobuf.grpc.ContractsCache" @@ -59,12 +57,10 @@ class ContractsCacheClient[C: ContractValidate](connection: IO[WebsocketPipe[Web override def find(id: Key): IO[Option[C]] = (for { idBs ← keyC.direct.runF[IO](id) - websocket ← connection - req = FindRequest(idBs) - proxy = GrpcProxyClient - .proxy(service, "find", websocket, generatedMessageCodec, protobufDynamicCodec(BasicContract)) - binContract ← IO.fromFuture(IO(proxy.requestAndWaitOneResult(req))) - contract ← codec.decode(binContract) + req ← generatedMessageCodec.runF[IO](FindRequest(idBs)) + responseBytes ← connection.handleUnary(service, "find", req) + response ← protobufDynamicCodec(BasicContract).runF[IO](responseBytes) + contract ← codec.decode(response) // contract from the outside required validation _ ← contract.validateME[IO] } yield Option(contract)).recover { @@ -84,9 +80,18 @@ class ContractsCacheClient[C: ContractValidate](connection: IO[WebsocketPipe[Web // we should validate contract before send outside to caching _ ← contract.validateME[IO] binContract ← codec.encode(contract) - websocket ← connection - proxy = GrpcProxyClient - .proxy(service, "cache", websocket, generatedMessageCodec, protobufDynamicCodec(CacheResponse)) - resp ← IO.fromFuture(IO(proxy.requestAndWaitOneResult(binContract))) + req ← generatedMessageCodec.runF[IO](binContract) + responseBytes ← connection.handleUnary(service, "cache", req) + resp ← protobufDynamicCodec(CacheResponse).runF[IO](responseBytes) } yield resp.cached } + +object ContractsCacheClient { + + def apply[C: ContractValidate](streamHandler: Connection)( + implicit + codec: Codec[IO, C, BasicContract], + checkerFn: CheckerFn, + ec: Scheduler + ): ContractsCacheRpc[C] = new ContractsCacheClient(streamHandler) +} diff --git a/dataset/client/src/main/scala/fluence/dataset/client/ClientGet.scala b/dataset/client/src/main/scala/fluence/dataset/client/ClientGet.scala index b726986ee2..48744ea080 100644 --- a/dataset/client/src/main/scala/fluence/dataset/client/ClientGet.scala +++ b/dataset/client/src/main/scala/fluence/dataset/client/ClientGet.scala @@ -19,84 +19,101 @@ package fluence.dataset.client import cats.effect.{Effect, IO} import cats.syntax.applicativeError._ -import cats.syntax.flatMap._ +import cats.syntax.functor._ import com.google.protobuf.ByteString import fluence.btree.core.{Hash, Key} import fluence.btree.protocol.BTreeRpc import fluence.dataset.protocol.{ClientError, ServerError} import fluence.dataset.protobuf._ -import monix.eval.{MVar, Task} +import monix.eval.Task import monix.execution.Scheduler -import monix.reactive.{Observable, Observer, Pipe} +import monix.reactive.subjects.PublishSubject +import monix.reactive.Observable import scala.collection.Searching +import scala.concurrent.Future import scala.language.higherKinds class ClientGet[F[_]: Effect](datasetId: Array[Byte], version: Long, getCallbacks: BTreeRpc.SearchCallback[F]) extends slogging.LazyLogging { - import DatasetClientUtils._ + private def handleClientErr(err: Throwable)(implicit sch: Scheduler): ErrorFromClient[GetCallbackReply] = + ErrorFromClient(GetCallbackReply(GetCallbackReply.Reply.ClientError(Error(err.getMessage)))) - private val clientError = MVar.empty[ClientError].memoize + private def handleContinuation( + implicit scheduler: Scheduler + ): PartialFunction[GetCallback.Callback, F[Flow[GetCallbackReply]]] = { + case ask if ask.isNextChildIndex ⇒ + val Some(nci) = ask.nextChildIndex - /** Puts error to client error(for returning error to user of this client), and return reply with error for server.*/ - private def handleClientErr(err: Throwable)(implicit sch: Scheduler): F[GetCallbackReply] = - ( - for { - ce ← clientError - _ ← ce.put(ClientError(err.getMessage)) - } yield GetCallbackReply(GetCallbackReply.Reply.ClientError(Error(err.getMessage))) - ).to[F] - - private def handleAsks( - source: Observable[GetCallback.Callback] - )(implicit sch: Scheduler): Observable[GetCallbackReply] = - source.collect { case ask if ask.isDefined && !ask.isValue && !ask.isServerError ⇒ ask } // Collect callbacks - .mapEval[F, GetCallbackReply] { - - case ask if ask.isNextChildIndex ⇒ - val Some(nci) = ask.nextChildIndex - - getCallbacks - .nextChildIndex( - nci.keys.map(k ⇒ Key(k.toByteArray)).toArray, - nci.childsChecksums.map(c ⇒ Hash(c.toByteArray)).toArray - ) - .attempt - .flatMap { - case Left(err) ⇒ - handleClientErr(err) - case Right(idx) ⇒ - Effect[F].pure(GetCallbackReply(GetCallbackReply.Reply.NextChildIndex(ReplyNextChildIndex(idx)))) - } - - case ask if ask.isSubmitLeaf ⇒ - val Some(sl) = ask.submitLeaf - - getCallbacks - .submitLeaf( - sl.keys.map(k ⇒ Key(k.toByteArray)).toArray, - sl.valuesChecksums.map(c ⇒ Hash(c.toByteArray)).toArray - ) - .attempt - .flatMap { - case Left(err) ⇒ - handleClientErr(err) - case Right(searchResult) ⇒ - Effect[F].pure( - GetCallbackReply( - GetCallbackReply.Reply.SubmitLeaf( - ReplySubmitLeaf( - searchResult match { - case Searching.Found(i) ⇒ ReplySubmitLeaf.SearchResult.Found(i) - case Searching.InsertionPoint(i) ⇒ ReplySubmitLeaf.SearchResult.InsertionPoint(i) - } - ) - ) + getCallbacks + .nextChildIndex( + nci.keys.map(k ⇒ Key(k.toByteArray)).toArray, + nci.childsChecksums.map(c ⇒ Hash(c.toByteArray)).toArray + ) + .attempt + .map { + case Left(err) ⇒ + handleClientErr(err) + case Right(idx) ⇒ + Continuation(GetCallbackReply(GetCallbackReply.Reply.NextChildIndex(ReplyNextChildIndex(idx)))) + } + + case ask if ask.isSubmitLeaf ⇒ + val Some(sl) = ask.submitLeaf + + getCallbacks + .submitLeaf( + sl.keys.map(k ⇒ Key(k.toByteArray)).toArray, + sl.valuesChecksums.map(c ⇒ Hash(c.toByteArray)).toArray + ) + .attempt + .map { + case Left(err) ⇒ + handleClientErr(err) + case Right(searchResult) ⇒ + Continuation( + GetCallbackReply( + GetCallbackReply.Reply.SubmitLeaf( + ReplySubmitLeaf( + searchResult match { + case Searching.Found(i) ⇒ ReplySubmitLeaf.SearchResult.Found(i) + case Searching.InsertionPoint(i) ⇒ ReplySubmitLeaf.SearchResult.InsertionPoint(i) + } ) ) - } + ) + ) + } + } + + private def handleResult( + implicit scheduler: Scheduler + ): PartialFunction[GetCallback.Callback, F[Flow[GetCallbackReply]]] = { + case ask if ask.isServerError ⇒ + val Some(err) = ask.serverError + val serverError = ServerError(err.msg) + // if server send an error we should close stream and lift error up + Effect[F].raiseError(serverError) + case ask if ask.isValue ⇒ + val Some(getValue) = ask._value + logger.trace(s"DatasetStorageClient.get() received server value=$getValue") + // if got success response or server error close stream and return value\error to user of this client + Effect[F].pure( + Result( + Option(getValue.value) + .filterNot(_.isEmpty) + .map(_.toByteArray) + ) + ) + } + private def handleAsks(source: Observable[GetCallback.Callback])( + implicit sch: Scheduler + ): Observable[Flow[GetCallbackReply]] = + source + .mapEval[F, Flow[GetCallbackReply]] { + handleContinuation orElse handleResult } /** @@ -106,46 +123,63 @@ class ClientGet[F[_]: Effect](datasetId: Array[Byte], version: Long, getCallback * @return returns found value, None if nothing was found. */ def runStream( - pipe: Pipe[GetCallbackReply, GetCallback] + handler: Observable[GetCallbackReply] ⇒ IO[Observable[GetCallback]] )(implicit sch: Scheduler): IO[Option[Array[Byte]]] = { - // Get observer/observable for request's bidiflow - val (pushClientReply: Observer[GetCallbackReply], pullServerAsk: Observable[GetCallback.Callback]) = pipe - .transform(_.map { - case GetCallback(callback) ⇒ - logger.trace(s"DatasetStorageClient.get() received server ask=$callback") - callback - }) - .multicast + val subj = PublishSubject[GetCallbackReply]() - val cancelable = ( + val requests = ( Observable( GetCallbackReply( GetCallbackReply.Reply.DatasetInfo(DatasetInfo(ByteString.copyFrom(datasetId), version)) ) - ) ++ handleAsks(pullServerAsk) - ).subscribe(pushClientReply) // And clientReply response back to server - - val serverErrOrVal = - pullServerAsk.collect { // Collect terminal task with value/error - case ask if ask.isServerError ⇒ - val Some(err) = ask.serverError - val serverError = ServerError(err.msg) - // if server send an error we should close stream and lift error up - Task(cancelable.cancel()) - .flatMap(_ ⇒ Task.raiseError[Option[Array[Byte]]](serverError)) - case ask if ask.isValue ⇒ - val Some(getValue) = ask._value - logger.trace(s"DatasetStorageClient.get() received server value=$getValue") - // if got success response or server error close stream and return value\error to user of this client - Task(cancelable.cancel()).map { _ ⇒ - Option(getValue.value) - .filterNot(_.isEmpty) - .map(_.toByteArray) - } - }.headOptionL // Take the first option value or server error + ) ++ subj + ).map { el => + logger.trace(s"DatasetStorageClient.get() will send message to server $el") + el + } + + for { + responses ← handler(requests) + cycle = { + + val mapped = responses.map { + case GetCallback(callback) ⇒ + logger.trace(s"DatasetStorageClient.get() received server ask=$callback") + callback + } + + handleAsks(mapped).mapFuture { + case c @ Continuation(reply) ⇒ subj.onNext(reply).map(_ ⇒ c) + case res @ Result(v) ⇒ Future(res) + case er @ ErrorFromClient(err) ⇒ subj.onNext(err).map(_ ⇒ er) + } + } - composeResult[IO](clientError, serverErrOrVal) + result <- { + cycle.concatMap { + case r@Result(_) ⇒ + Observable(r, Stop) + case er@ErrorFromClient(_) ⇒ + Observable(er, Stop) + case c@Continuation(_) ⇒ + Observable(c) + }.takeWhile { + case Stop ⇒ false + case _ ⇒ true + }.lastOptionL + .flatMap { + case Some(ErrorFromClient(err)) ⇒ + Task.raiseError(ClientError(err.reply.clientError.get.msg)) + case Some(Result(v)) ⇒ + Task(v) + case v ⇒ + logger.error("Unexpected message: " + v) + Task.raiseError(new RuntimeException("Unexpected internal error")) + } + .toIO + } + } yield result } } diff --git a/dataset/client/src/main/scala/fluence/dataset/client/ClientPut.scala b/dataset/client/src/main/scala/fluence/dataset/client/ClientPut.scala index 730fef8de8..69e5317bd0 100644 --- a/dataset/client/src/main/scala/fluence/dataset/client/ClientPut.scala +++ b/dataset/client/src/main/scala/fluence/dataset/client/ClientPut.scala @@ -19,17 +19,19 @@ package fluence.dataset.client import cats.effect.{Effect, IO} import cats.syntax.applicativeError._ -import cats.syntax.flatMap._ +import cats.syntax.functor._ import com.google.protobuf.ByteString import fluence.btree.core.{Hash, Key} import fluence.btree.protocol.BTreeRpc import fluence.dataset.protocol.{ClientError, ServerError} import fluence.dataset.protobuf._ -import monix.eval.{MVar, Task} +import monix.eval.Task import monix.execution.Scheduler -import monix.reactive.{Observable, Pipe} +import monix.reactive.subjects.PublishSubject +import monix.reactive.Observable import scala.collection.Searching +import scala.concurrent.Future import scala.language.higherKinds class ClientPut[F[_]: Effect]( @@ -39,138 +41,178 @@ class ClientPut[F[_]: Effect]( encryptedValue: Array[Byte] ) extends slogging.LazyLogging { - import DatasetClientUtils._ - - private val clientError = MVar.empty[ClientError].memoize + private def handleClientErr(err: Throwable): ErrorFromClient[PutCallbackReply] = { + ErrorFromClient(PutCallbackReply(PutCallbackReply.Reply.ClientError(Error(err.getMessage)))) + } - /** Puts error to client error(for returning error to user of this client), and return reply with error for server.*/ - private def handleClientErr(err: Throwable)(implicit sch: Scheduler): F[PutCallbackReply] = - run( - for { - ce ← clientError - _ ← ce.put(ClientError(err.getMessage)) - } yield PutCallbackReply(PutCallbackReply.Reply.ClientError(Error(err.getMessage))) - ) + private def handleContinuation(implicit scheduler: Scheduler): PartialFunction[PutCallback.Callback, F[Flow[PutCallbackReply]]] = { + case ask if ask.isNextChildIndex ⇒ + val Some(nci) = ask.nextChildIndex - private def handleAsks(source: Observable[PutCallback.Callback])(implicit sch: Scheduler) = - source.collect { case ask if ask.isDefined && !ask.isValue ⇒ ask } // Collect callbacks - .mapEval[F, PutCallbackReply] { + putCallbacks + .nextChildIndex( + nci.keys.map(k ⇒ Key(k.toByteArray)).toArray, + nci.childsChecksums.map(c ⇒ Hash(c.toByteArray)).toArray + ) + .attempt + .map { + case Left(err) ⇒ + handleClientErr(err) + case Right(idx) ⇒ + Continuation(PutCallbackReply(PutCallbackReply.Reply.NextChildIndex(ReplyNextChildIndex(idx)))) + } + + case ask if ask.isPutDetails ⇒ + val Some(pd) = ask.putDetails + + putCallbacks + .putDetails( + pd.keys.map(k ⇒ Key(k.toByteArray)).toArray, + pd.valuesChecksums.map(c ⇒ Hash(c.toByteArray)).toArray + ) + .attempt + .map { + case Left(err) ⇒ + handleClientErr(err) + case Right(cpd) ⇒ + val putDetails = ReplyPutDetails( + key = ByteString.copyFrom(cpd.key.bytes), + checksum = ByteString.copyFrom(cpd.valChecksum.bytes), + searchResult = cpd.searchResult match { + case Searching.Found(foundIndex) ⇒ ReplyPutDetails.SearchResult.Found(foundIndex) + case Searching.InsertionPoint(insertionPoint) ⇒ + ReplyPutDetails.SearchResult.InsertionPoint(insertionPoint) + } + ) + Continuation(PutCallbackReply(PutCallbackReply.Reply.PutDetails(putDetails))) + } + + case ask if ask.isVerifyChanges ⇒ + val Some(vc) = ask.verifyChanges + + putCallbacks + .verifyChanges(Hash(vc.serverMerkleRoot.toByteArray), vc.splitted) + .attempt + .map { + case Left(err) ⇒ + handleClientErr(err) + case Right(signature) ⇒ + Continuation(PutCallbackReply( + PutCallbackReply.Reply.VerifyChanges(ReplyVerifyChanges(ByteString.copyFrom(signature.toArray))) + )) + + } + + case ask if ask.isChangesStored ⇒ + putCallbacks + .changesStored() + .attempt + .map { + case Left(err) ⇒ + handleClientErr(err) + case Right(idx) ⇒ + Continuation(PutCallbackReply(PutCallbackReply.Reply.ChangesStored(ReplyChangesStored()))) + } + } - case ask if ask.isNextChildIndex ⇒ - val Some(nci) = ask.nextChildIndex + private def handleResult( + implicit scheduler: Scheduler + ): PartialFunction[PutCallback.Callback, F[Flow[PutCallbackReply]]] = { + case ask if ask.isServerError ⇒ + val Some(err) = ask.serverError + val serverError = ServerError(err.msg) + // if server send the error we should close stream and lift error up + Effect[F].raiseError[Flow[PutCallbackReply]](serverError) + case ask if ask.isValue ⇒ + val Some(getValue) = ask._value + logger.trace(s"DatasetStorageClient.put() received server value=$getValue") + // if got success response or server error close stream and return value\error to user of this client + Effect[F].pure( + Result( + Option(getValue.value) + .filterNot(_.isEmpty) + .map(_.toByteArray) + ) + ) + } - putCallbacks - .nextChildIndex( - nci.keys.map(k ⇒ Key(k.toByteArray)).toArray, - nci.childsChecksums.map(c ⇒ Hash(c.toByteArray)).toArray - ) - .attempt - .flatMap { - case Left(err) ⇒ - handleClientErr(err) - case Right(idx) ⇒ - Effect[F].pure(PutCallbackReply(PutCallbackReply.Reply.NextChildIndex(ReplyNextChildIndex(idx)))) - } - - case ask if ask.isPutDetails ⇒ - val Some(pd) = ask.putDetails - - putCallbacks - .putDetails( - pd.keys.map(k ⇒ Key(k.toByteArray)).toArray, - pd.valuesChecksums.map(c ⇒ Hash(c.toByteArray)).toArray - ) - .attempt - .flatMap { - case Left(err) ⇒ - handleClientErr(err) - case Right(cpd) ⇒ - val putDetails = ReplyPutDetails( - key = ByteString.copyFrom(cpd.key.bytes), - checksum = ByteString.copyFrom(cpd.valChecksum.bytes), - searchResult = cpd.searchResult match { - case Searching.Found(foundIndex) ⇒ ReplyPutDetails.SearchResult.Found(foundIndex) - case Searching.InsertionPoint(insertionPoint) ⇒ - ReplyPutDetails.SearchResult.InsertionPoint(insertionPoint) - } - ) - Effect[F].pure(PutCallbackReply(PutCallbackReply.Reply.PutDetails(putDetails))) - } - - case ask if ask.isVerifyChanges ⇒ - val Some(vc) = ask.verifyChanges - - putCallbacks - .verifyChanges(Hash(vc.serverMerkleRoot.toByteArray), vc.splitted) - .attempt - .flatMap { - case Left(err) ⇒ - handleClientErr(err) - case Right(signature) ⇒ - Effect[F].pure( - PutCallbackReply( - PutCallbackReply.Reply.VerifyChanges(ReplyVerifyChanges(ByteString.copyFrom(signature.toArray))) - ) - ) - } - - case ask if ask.isChangesStored ⇒ - putCallbacks - .changesStored() - .attempt - .flatMap { - case Left(err) ⇒ - handleClientErr(err) - case Right(idx) ⇒ - Effect[F].pure(PutCallbackReply(PutCallbackReply.Reply.ChangesStored(ReplyChangesStored()))) - } + private def handleAsks(source: Observable[PutCallback.Callback])( + implicit sch: Scheduler + ): Observable[Flow[PutCallbackReply]] = + source + .mapEval[F, Flow[PutCallbackReply]] { + handleContinuation orElse handleResult } /** * - * @param pipe Bidi pipe for transport layer + * @param handler + * @param sch * @return */ - def runStream(pipe: Pipe[PutCallbackReply, PutCallback])(implicit sch: Scheduler): IO[Option[Array[Byte]]] = { - val (pushClientReply, pullServerAsk) = pipe - .transform(_.map { - case PutCallback(callback) ⇒ - logger.trace(s"DatasetStorageClient.put() received server ask=$callback") - callback - }) - .multicast - - val cancelable = ( - Observable( // Pass the datasetId and value as the first, unasked pushes + def runStream( + handler: Observable[PutCallbackReply] ⇒ IO[Observable[PutCallback]] + )(implicit sch: Scheduler): IO[Option[Array[Byte]]] = { + + val subj = PublishSubject[PutCallbackReply]() + + val requests = + (Observable( // Pass the datasetId and value as the first, unasked pushes PutCallbackReply( PutCallbackReply.Reply.DatasetInfo(DatasetInfo(ByteString.copyFrom(datasetId), version)) ), PutCallbackReply( PutCallbackReply.Reply.Value(PutValue(ByteString.copyFrom(encryptedValue))) ) - ) ++ handleAsks(pullServerAsk) - ).subscribe(pushClientReply) // And push response back to server - - val serverErrOrVal = - pullServerAsk.collect { // Collect terminal task with value/error - case ask if ask.isServerError ⇒ - val Some(err) = ask.serverError - val serverError = ServerError(err.msg) - // if server send the error we should close stream and lift error up - Task(cancelable.cancel()) - .flatMap(_ ⇒ Task.raiseError[Option[Array[Byte]]](serverError)) - case ask if ask.isValue ⇒ - val Some(getValue) = ask._value - logger.trace(s"DatasetStorageClient.put() received server value=$getValue") - // if got success response or server error close stream and return value\error to user of this client - Task(cancelable.cancel()).map { _ ⇒ - Option(getValue.value) - .filterNot(_.isEmpty) - .map(_.toByteArray) + ) ++ subj).map { el => + logger.trace(s"DatasetStorageClient.put() will send message to server $el") + el + } + + for { + responses ← handler(requests) + cycle = { + + val mapped = responses.map { + case PutCallback(callback) ⇒ + logger.trace(s"DatasetStorageClient.put() received server ask=$callback") + callback + } + + handleAsks(mapped) + .mapFuture { + case c@Continuation(reply) => subj.onNext(reply).map(_ ⇒ c) + case res@Result(v) => Future(res) + case er@ErrorFromClient(err) => subj.onNext(err).map(_ ⇒ er) } - }.headOptionL // Take the first option value or server error + } + + result <- { + cycle.concatMap { + case r@Result(_) ⇒ + Observable(r, Stop) + case er@ErrorFromClient(_) ⇒ + Observable(er, Stop) + case c@Continuation(_) ⇒ + Observable(c) + }.takeWhile { + case Stop ⇒ false + case _ ⇒ true + }.lastOptionL + .flatMap { + case Some(ErrorFromClient(err)) ⇒ + Task.raiseError(ClientError(err.reply.clientError.get.msg)) + case Some(Result(v)) ⇒ + Task(v) + case v ⇒ + logger.error("Unexpected message: " + v) + Task.raiseError(new RuntimeException("Unexpected internal error")) + } + .toIO + } + + } yield result - composeResult[IO](clientError, serverErrOrVal) } } diff --git a/dataset/client/src/main/scala/fluence/dataset/client/ClientRange.scala b/dataset/client/src/main/scala/fluence/dataset/client/ClientRange.scala index ee268ffb3a..ac770eb6ef 100644 --- a/dataset/client/src/main/scala/fluence/dataset/client/ClientRange.scala +++ b/dataset/client/src/main/scala/fluence/dataset/client/ClientRange.scala @@ -17,76 +17,98 @@ package fluence.dataset.client -import cats.effect.Effect +import cats.effect.{Effect, IO} import cats.syntax.applicativeError._ -import cats.syntax.flatMap._ +import cats.syntax.functor._ import com.google.protobuf.ByteString import fluence.btree.core.{Hash, Key} import fluence.btree.protocol.BTreeRpc -import fluence.dataset.protocol.ServerError import fluence.dataset.protobuf._ +import fluence.dataset.protocol.{ClientError, ServerError} import monix.execution.Scheduler -import monix.reactive.{Observable, Pipe} +import monix.reactive.Observable +import monix.reactive.subjects.PublishSubject import scala.collection.Searching +import scala.concurrent.Future import scala.language.higherKinds class ClientRange[F[_]: Effect](datasetId: Array[Byte], version: Long, rangeCallbacks: BTreeRpc.SearchCallback[F]) extends slogging.LazyLogging { - /** Puts error to client error(for returning error to user of this client), and return reply with error for server.*/ - private def handleClientErr(err: Throwable): F[RangeCallbackReply] = { - Effect[F].pure(RangeCallbackReply(RangeCallbackReply.Reply.ClientError(Error(err.getMessage)))) + private def handleClientErr(err: Throwable): ErrorFromClient[RangeCallbackReply] = { + ErrorFromClient(RangeCallbackReply(RangeCallbackReply.Reply.ClientError(Error(err.getMessage)))) } - private def handleAsks(source: Observable[RangeCallback.Callback]): Observable[RangeCallbackReply] = - source.collect { case ask if ask.isDefined && !ask.isValue && !ask.isServerError ⇒ ask } // Collect callbacks - .mapEval[F, RangeCallbackReply] { + private def handleAsks(source: Observable[RangeCallback.Callback])( + implicit sch: Scheduler + ): Observable[Flow[RangeCallbackReply]] = + source + .mapEval[F, Flow[RangeCallbackReply]] { + handleContinuation orElse handleResult + } - case ask if ask.isNextChildIndex ⇒ - val Some(nci) = ask.nextChildIndex + private def handleContinuation( + implicit scheduler: Scheduler + ): PartialFunction[RangeCallback.Callback, F[Flow[RangeCallbackReply]]] = { + case ask if ask.isNextChildIndex ⇒ + val Some(nci) = ask.nextChildIndex - rangeCallbacks - .nextChildIndex( - nci.keys.map(k ⇒ Key(k.toByteArray)).toArray, - nci.childsChecksums.map(c ⇒ Hash(c.toByteArray)).toArray - ) - .attempt - .flatMap { - case Left(err) ⇒ - handleClientErr(err) - case Right(idx) ⇒ - Effect[F].pure(RangeCallbackReply(RangeCallbackReply.Reply.NextChildIndex(ReplyNextChildIndex(idx)))) - } - - case ask if ask.isSubmitLeaf ⇒ - val Some(sl) = ask.submitLeaf - - rangeCallbacks - .submitLeaf( - sl.keys.map(k ⇒ Key(k.toByteArray)).toArray, - sl.valuesChecksums.map(c ⇒ Hash(c.toByteArray)).toArray - ) - .attempt - .flatMap { - case Left(err) ⇒ - handleClientErr(err) - case Right(searchResult) ⇒ - Effect[F].pure( - RangeCallbackReply( - RangeCallbackReply.Reply.SubmitLeaf( - ReplySubmitLeaf( - searchResult match { - case Searching.Found(i) ⇒ ReplySubmitLeaf.SearchResult.Found(i) - case Searching.InsertionPoint(i) ⇒ ReplySubmitLeaf.SearchResult.InsertionPoint(i) - } - ) - ) + rangeCallbacks + .nextChildIndex( + nci.keys.map(k ⇒ Key(k.toByteArray)).toArray, + nci.childsChecksums.map(c ⇒ Hash(c.toByteArray)).toArray + ) + .attempt + .map { + case Left(err) ⇒ + handleClientErr(err) + case Right(idx) ⇒ + Continuation(RangeCallbackReply(RangeCallbackReply.Reply.NextChildIndex(ReplyNextChildIndex(idx)))) + } + + case ask if ask.isSubmitLeaf ⇒ + val Some(sl) = ask.submitLeaf + + rangeCallbacks + .submitLeaf( + sl.keys.map(k ⇒ Key(k.toByteArray)).toArray, + sl.valuesChecksums.map(c ⇒ Hash(c.toByteArray)).toArray + ) + .attempt + .map { + case Left(err) ⇒ + handleClientErr(err) + case Right(searchResult) ⇒ + Continuation( + RangeCallbackReply( + RangeCallbackReply.Reply.SubmitLeaf( + ReplySubmitLeaf( + searchResult match { + case Searching.Found(i) ⇒ ReplySubmitLeaf.SearchResult.Found(i) + case Searching.InsertionPoint(i) ⇒ ReplySubmitLeaf.SearchResult.InsertionPoint(i) + } ) ) - } + ) + ) + } - } + } + + private def handleResult( + implicit scheduler: Scheduler + ): PartialFunction[RangeCallback.Callback, F[Flow[RangeCallbackReply]]] = { + case ask if ask.isServerError ⇒ + val Some(err) = ask.serverError + val serverError = ServerError(err.msg) + // if server send an error we should close stream and lift error up + Effect[F].raiseError[Flow[RangeCallbackReply]](serverError) + case ask if ask.isValue ⇒ + val Some(RangeValue(key, value)) = ask._value + logger.trace(s"DatasetStorageClient.range() received server value=$value for key=$key") + Effect[F].pure(RangeResult(key.toByteArray, value.toByteArray)) + } /** * Run client bidi stream. @@ -95,38 +117,52 @@ class ClientRange[F[_]: Effect](datasetId: Array[Byte], version: Long, rangeCall * @return */ def runStream( - pipe: Pipe[RangeCallbackReply, RangeCallback] - )(implicit sch: Scheduler): Observable[(Array[Byte], Array[Byte])] = { - // Get observer/observable for request's bidiflow - val (pushClientReply, pullServerAsk) = pipe - .transform(_.map { - case RangeCallback(callback) ⇒ - logger.trace(s"DatasetStorageClient.range() received server ask=$callback") - callback - }) - .multicast + handler: Observable[RangeCallbackReply] ⇒ IO[Observable[RangeCallback]] + )(implicit sch: Scheduler): IO[Observable[(Array[Byte], Array[Byte])]] = { + + val subj = PublishSubject[RangeCallbackReply]() - val cancelable = ( + val requests = ( Observable( RangeCallbackReply( RangeCallbackReply.Reply.DatasetInfo(DatasetInfo(ByteString.copyFrom(datasetId), version)) ) - ) ++ handleAsks(pullServerAsk) - ).subscribe(pushClientReply) // And clientReply response back to server - - pullServerAsk.collect { - case ask if ask.isServerError ⇒ - val Some(err) = ask.serverError - val serverError = ServerError(err.msg) - // if server send an error we should close stream and lift error up - Observable(pushClientReply.onError(serverError)) - .flatMap(_ ⇒ Observable.raiseError[(Array[Byte], Array[Byte])](serverError)) - case ask if ask.isValue ⇒ - val Some(RangeValue(key, value)) = ask._value - logger.trace(s"DatasetStorageClient.range() received server value=$value for key=$key") - Observable(key.toByteArray → value.toByteArray) - - }.doAfterTerminate(_ ⇒ cancelable.cancel()).flatten + ) ++ subj + ).map { el ⇒ + logger.trace(s"DatasetStorageClient.range() will send message to server $el") + el + } + + for { + responses ← handler(requests) + } yield { + + val mapped = responses.map { + case RangeCallback(callback) ⇒ + logger.trace(s"DatasetStorageClient.range() received server ask=$callback") + callback + } + + val cycle = handleAsks(mapped).mapFuture { + case c @ Continuation(reply) ⇒ + subj.onNext(reply).map(_ ⇒ Observable.empty) + case er @ ErrorFromClient(err) ⇒ + subj.onNext(err).map(_ ⇒ Observable(er)) + case v => Future(Observable(v)) + }.flatten + + val result = + cycle.flatMap { + case ErrorFromClient(er) ⇒ + Observable.raiseError(ClientError(er.reply.clientError.get.msg)) + case RangeResult(k, v) ⇒ + logger.trace(s"DatasetStorageClient.range() received server value=$v for key=$k") + Observable(k → v) + case _ => Observable.empty + } + + result + } } } diff --git a/dataset/client/src/main/scala/fluence/dataset/client/DatasetClientUtils.scala b/dataset/client/src/main/scala/fluence/dataset/client/DatasetClientUtils.scala deleted file mode 100644 index a5693b00c0..0000000000 --- a/dataset/client/src/main/scala/fluence/dataset/client/DatasetClientUtils.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (C) 2017 Fluence Labs Limited - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package fluence.dataset.client - -import cats.effect.Effect -import fluence.dataset.protocol.ClientError -import monix.eval.{MVar, Task} -import monix.execution.Scheduler - -import scala.language.higherKinds - -object DatasetClientUtils { - - def run[F[_]: Effect, A](fa: Task[A])(implicit sch: Scheduler): F[A] = fa.to[F] - - /** Returns either client error if present, or server error, or value from server */ - def composeResult[F[_]: Effect]( - clientError: Task[MVar[ClientError]], - serverErrOrVal: Task[Option[Task[Option[Array[Byte]]]]] - )(implicit sch: Scheduler): F[Option[Array[Byte]]] = - run( - Task.raceMany( - Seq( - clientError.flatMap(_.read).flatMap(err ⇒ Task.raiseError(err)), // trying to return occurred clients error - serverErrOrVal.flatMap { - // return success result or server error - case Some(errOrValue) ⇒ errOrValue - // return occurred clients error - case None ⇒ clientError.flatMap(_.read).flatMap(err ⇒ Task.raiseError(err)) - } - ) - ) - ) - -} diff --git a/dataset/client/src/main/scala/fluence/dataset/client/Flow.scala b/dataset/client/src/main/scala/fluence/dataset/client/Flow.scala new file mode 100644 index 0000000000..3a3beb16ff --- /dev/null +++ b/dataset/client/src/main/scala/fluence/dataset/client/Flow.scala @@ -0,0 +1,8 @@ +package fluence.dataset.client + +trait Flow[T] +case class Continuation[T](reply: T) extends Flow[T] +case class Result[T](result: Option[Array[Byte]]) extends Flow[T] +case class RangeResult[T](key: Array[Byte], value: Array[Byte]) extends Flow[T] +case class ErrorFromClient[T](reply: T) extends Flow[T] +case object Stop extends Flow[Any] diff --git a/dataset/grpc/jvm/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala b/dataset/grpc/jvm/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala deleted file mode 100644 index d5a1856174..0000000000 --- a/dataset/grpc/jvm/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright (C) 2017 Fluence Labs Limited - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package fluence.dataset.grpc.client - -import cats.effect.{Effect, IO} -import fluence.btree.protocol.BTreeRpc -import fluence.dataset.client.{ClientGet, ClientPut, ClientRange} -import fluence.dataset.protocol.DatasetStorageRpc -import fluence.dataset.protobuf._ -import fluence.dataset.protobuf.grpc.DatasetStorageRpcGrpc.DatasetStorageRpcStub -import fluence.transport.grpc.client.GrpcClient -import io.grpc.{CallOptions, ManagedChannel} -import monix.execution.Scheduler -import monix.reactive.{Observable, Pipe} - -import scala.language.{higherKinds, implicitConversions} - -/** - * Clients implementation of [[DatasetStorageRpc]], allows talking to server via network. - * All public methods called from the client side. - * DatasetStorageClient initiates first request to server and then answered to server requests. - * - * @param stub Stub for calling server methods of [[DatasetStorageRpc]] - * @tparam F A box for returning value - */ -class DatasetStorageClient[F[_]: Effect]( - stub: IO[DatasetStorageRpcStub] -)(implicit sch: Scheduler) - extends DatasetStorageRpc[F, Observable] with slogging.LazyLogging { - - import fluence.grpc.GrpcMonix._ - - /** - * Initiates ''Get'' operation in remote MerkleBTree. - * - * @param datasetId Dataset ID - * @param version Dataset version expected to the client - * @param getCallbacks Wrapper for all callback needed for ''Get'' operation to the BTree - * @return returns found value, None if nothing was found. - */ - override def get( - datasetId: Array[Byte], - version: Long, - getCallbacks: BTreeRpc.SearchCallback[F] - ): IO[Option[Array[Byte]]] = stub.flatMap { rpcStub ⇒ - // Convert a remote stub call to monix pipe - val pipe: Pipe[GetCallbackReply, GetCallback] = callToPipe(rpcStub.get) - - ClientGet(datasetId, version, getCallbacks).runStream(pipe) - } - - /** - * Initiates ''Range'' operation in remote MerkleBTree. - * - * @param datasetId Dataset ID - * @param version Dataset version expected to the client - * @param rangeCallbacks Wrapper for all callback needed for ''Range'' operation to the BTree - * @return returns stream of found value. - */ - override def range( - datasetId: Array[Byte], - version: Long, - rangeCallbacks: BTreeRpc.SearchCallback[F] - ): Observable[(Array[Byte], Array[Byte])] = Observable.fromIO(stub).flatMap { rpcStub ⇒ - // Convert a remote stub call to monix pipe - val pipe: Pipe[RangeCallbackReply, RangeCallback] = callToPipe(rpcStub.range) - - ClientRange(datasetId, version, rangeCallbacks).runStream(pipe) - } - - /** - * Initiates ''Put'' operation in remote MerkleBTree. - * - * @param datasetId Dataset ID - * @param version Dataset version expected to the client - * @param putCallbacks Wrapper for all callback needed for ''Put'' operation to the BTree. - * @param encryptedValue Encrypted value. - * @return returns old value if old value was overridden, None otherwise. - */ - override def put( - datasetId: Array[Byte], - version: Long, - putCallbacks: BTreeRpc.PutCallbacks[F], - encryptedValue: Array[Byte] - ): IO[Option[Array[Byte]]] = stub.flatMap { rpcStub ⇒ - // Convert a remote stub call to monix pipe - val pipe: Pipe[PutCallbackReply, PutCallback] = callToPipe(rpcStub.put) - - ClientPut(datasetId, version, putCallbacks, encryptedValue).runStream(pipe) - } - - /** - * Initiates ''Remove'' operation in remote MerkleBTree. - * - * @param datasetId Dataset ID - * @param version Dataset version expected to the client - * @param removeCallbacks Wrapper for all callback needed for ''Remove'' operation to the BTree. - * @return returns old value that was deleted, None if nothing was deleted. - */ - override def remove( - datasetId: Array[Byte], - version: Long, - removeCallbacks: BTreeRpc.RemoveCallback[F] - ): IO[Option[Array[Byte]]] = IO(???) -} - -object DatasetStorageClient { - - /** - * Shorthand to register [[DatasetStorageClient]] inside [[GrpcClient]]. - * - * @param channelOptions Channel to remote node and call options - */ - def register[F[_]: Effect]()( - channelOptions: IO[(ManagedChannel, CallOptions)] - )(implicit scheduler: Scheduler): DatasetStorageRpc[F, Observable] = - new DatasetStorageClient[F](channelOptions.map { case (ch, opts) ⇒ new DatasetStorageRpcStub(ch, opts) }) -} diff --git a/dataset/grpc/js/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala b/dataset/grpc/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala similarity index 62% rename from dataset/grpc/js/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala rename to dataset/grpc/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala index c36c4e3612..7b437adcb6 100644 --- a/dataset/grpc/js/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala +++ b/dataset/grpc/src/main/scala/fluence/dataset/grpc/client/DatasetStorageClient.scala @@ -22,64 +22,59 @@ import fluence.btree.protocol.BTreeRpc import fluence.dataset.client.{ClientGet, ClientPut, ClientRange} import fluence.dataset.protobuf._ import fluence.dataset.protocol.DatasetStorageRpc -import fluence.proxy.grpc.WebsocketMessage -import fluence.transport.websocket.{GrpcProxyClient, WebsocketPipe} +import fluence.stream.Connection import monix.execution.Scheduler -import monix.reactive.{Observable, Observer, Pipe} +import monix.reactive.{MulticastStrategy, Observable, Observer, Pipe} import scala.language.higherKinds /** * Client for interaction with the database. * - * @param connection Websocket pipe. */ -class DatasetStorageClient[F[_]: Effect](connection: IO[WebsocketPipe[WebsocketMessage, WebsocketMessage]])( +class DatasetStorageClient[F[_]: Effect](connection: Connection)( implicit sch: Scheduler ) extends DatasetStorageRpc[F, Observable] with slogging.LazyLogging { - import fluence.transport.websocket.ProtobufCodec._ + import fluence.transport.ProtobufCodec._ private val service = "fluence.dataset.protobuf.grpc.DatasetStorageRpc" - def getPipe: IO[Pipe[GetCallbackReply, GetCallback]] = { + def handleGet(requests: Observable[GetCallbackReply]): IO[Observable[GetCallback]] = { + + val mapped = requests.mapEval[IO, Array[Byte]](ab ⇒ generatedMessageCodec.runF[IO](ab)) for { - websocket ← connection - proxy = GrpcProxyClient.proxy(service, "get", websocket, generatedMessageCodec, protobufDynamicCodec(GetCallback)) + responseObservable ← connection.handle(service, "get", mapped) + responseDeserialized = responseObservable.mapEval[IO, GetCallback]( + resp ⇒ protobufDynamicCodec(GetCallback).runF[IO](resp) + ) } yield { - new Pipe[GetCallbackReply, GetCallback] { - - override def unicast: (Observer[GetCallbackReply], Observable[GetCallback]) = { - (proxy.input, proxy.output) - } - } + responseDeserialized } } - val rangePipe: IO[Pipe[RangeCallbackReply, RangeCallback]] = + def handleRange(requests: Observable[RangeCallbackReply]): IO[Observable[RangeCallback]] = { + val mapped = requests.mapEval[IO, Array[Byte]](ab ⇒ generatedMessageCodec.runF[IO](ab)) for { - websocket ← connection - proxy = GrpcProxyClient - .proxy(service, "range", websocket, generatedMessageCodec, protobufDynamicCodec(RangeCallback)) + responseObservable ← connection.handle(service, "range", mapped) + responseDeserialized = responseObservable.mapEval[IO, RangeCallback]( + resp ⇒ protobufDynamicCodec(RangeCallback).runF[IO](resp) + ) } yield { - new Pipe[RangeCallbackReply, RangeCallback] { - - override def unicast: (Observer[RangeCallbackReply], Observable[RangeCallback]) = { - (proxy.input, proxy.output) - } - } + responseDeserialized } + } - val putPipe: IO[Pipe[PutCallbackReply, PutCallback]] = { + def handlePut(requests: Observable[PutCallbackReply]): IO[Observable[PutCallback]] = { + + val mapped = requests.mapEval[IO, Array[Byte]](ab ⇒ generatedMessageCodec.runF[IO](ab)) for { - websocket ← connection - proxy = GrpcProxyClient.proxy(service, "put", websocket, generatedMessageCodec, protobufDynamicCodec(PutCallback)) + responseObservable ← connection.handle(service, "put", mapped) + responseDeserialized = responseObservable.mapEval[IO, PutCallback]( + resp ⇒ protobufDynamicCodec(PutCallback).runF[IO](resp) + ) } yield { - new Pipe[PutCallbackReply, PutCallback] { - override def unicast: (Observer[PutCallbackReply], Observable[PutCallback]) = { - (proxy.input, proxy.output) - } - } + responseDeserialized } } @@ -95,7 +90,7 @@ class DatasetStorageClient[F[_]: Effect](connection: IO[WebsocketPipe[WebsocketM datasetId: Array[Byte], version: Long, searchCallbacks: BTreeRpc.SearchCallback[F] - ): IO[Option[Array[Byte]]] = getPipe.flatMap(ClientGet(datasetId, version, searchCallbacks).runStream) + ): IO[Option[Array[Byte]]] = ClientGet(datasetId, version, searchCallbacks).runStream(handleGet) /** * Initiates ''Range'' operation in remote MerkleBTree. @@ -105,13 +100,12 @@ class DatasetStorageClient[F[_]: Effect](connection: IO[WebsocketPipe[WebsocketM * @param searchCallbacks Wrapper for all callback needed for ''Range'' operation to the BTree * @return returns stream of found value. */ - // TODO range request is not working for now for websockets override def range( datasetId: Array[Byte], version: Long, searchCallbacks: BTreeRpc.SearchCallback[F] ): Observable[(Array[Byte], Array[Byte])] = - Observable.fromIO(rangePipe).flatMap(ClientRange(datasetId, version, searchCallbacks).runStream) + Observable.fromIO(ClientRange(datasetId, version, searchCallbacks).runStream(handleRange)).flatten /** * Initiates ''Put'' operation in remote MerkleBTree. @@ -127,7 +121,7 @@ class DatasetStorageClient[F[_]: Effect](connection: IO[WebsocketPipe[WebsocketM version: Long, putCallbacks: BTreeRpc.PutCallbacks[F], encryptedValue: Array[Byte] - ): IO[Option[Array[Byte]]] = putPipe.flatMap(ClientPut(datasetId, version, putCallbacks, encryptedValue).runStream) + ): IO[Option[Array[Byte]]] = ClientPut(datasetId, version, putCallbacks, encryptedValue).runStream(handlePut) /** * Initiates ''Remove'' operation in remote MerkleBTree. @@ -143,3 +137,9 @@ class DatasetStorageClient[F[_]: Effect](connection: IO[WebsocketPipe[WebsocketM removeCallbacks: BTreeRpc.RemoveCallback[F] ): IO[Option[Array[Byte]]] = ??? } + +object DatasetStorageClient { + def apply[F[_] : Effect](streamHandler: Connection)( + implicit sch: Scheduler + ): DatasetStorageRpc[F, Observable] = new DatasetStorageClient(streamHandler) +} diff --git a/transport/grpc-monix/src/main/scala/fluence/grpc/GrpcMonix.scala b/grpc-monix-converter/src/main/scala/fluence/grpc/GrpcMonix.scala similarity index 53% rename from transport/grpc-monix/src/main/scala/fluence/grpc/GrpcMonix.scala rename to grpc-monix-converter/src/main/scala/fluence/grpc/GrpcMonix.scala index 85f2770e82..4129550cfa 100644 --- a/transport/grpc-monix/src/main/scala/fluence/grpc/GrpcMonix.scala +++ b/grpc-monix-converter/src/main/scala/fluence/grpc/GrpcMonix.scala @@ -17,37 +17,62 @@ package fluence.grpc +import cats.effect.IO +import com.google.common.util.concurrent.ListenableFuture import io.grpc.stub.StreamObserver +import monix.execution.Ack.{Continue, Stop} import monix.execution.{Ack, Cancelable, Scheduler} +import monix.reactive.Observable.Operator import monix.reactive.Observer.Sync import monix.reactive._ +import monix.reactive.observers.{BufferedSubscriber, Subscriber} +import scalapb.grpc.Grpc +import scala.concurrent.Future import scala.language.implicitConversions -import scala.util.Try object GrpcMonix { - private val overflow: OverflowStrategy.Synchronous[Nothing] = OverflowStrategy.Unbounded - - def callToPipe[I, O](call: StreamObserver[O] ⇒ StreamObserver[I])(implicit sch: Scheduler): Pipe[I, O] = - new Pipe[I, O] { - override def unicast: (Observer[I], Observable[O]) = { + type GrpcOperator[I, O] = StreamObserver[O] ⇒ StreamObserver[I] - val (in, inOut) = Observable.multicast[I](MulticastStrategy.replay, overflow) + def liftByGrpcOperator[I, O](observable: Observable[I], operator: GrpcOperator[I, O]): Observable[O] = + observable.liftByOperator( + grpcOperatorToMonixOperator(operator) + ) - in -> Observable.create[O](overflow) { sync ⇒ - inOut.subscribe(streamToObserver(call(new StreamObserver[O] { - override def onError(t: Throwable): Unit = - sync.onError(t) + def grpcOperatorToMonixOperator[I, O](grpcOperator: GrpcOperator[I, O]): Operator[I, O] = { + outputSubsriber: Subscriber[O] ⇒ + val outputObserver: StreamObserver[O] = monixSubscriberToGrpcObserver(outputSubsriber) + val inputObserver: StreamObserver[I] = grpcOperator(outputObserver) + grpcObserverToMonixSubscriber(inputObserver, outputSubsriber.scheduler) + } - override def onCompleted(): Unit = - sync.onComplete() + def monixSubscriberToGrpcObserver[T](subscriber: Subscriber[T]): StreamObserver[T] = + new StreamObserver[T] { + private val rSubscriber = + BufferedSubscriber[T](subscriber, OverflowStrategy.Unbounded) + override def onError(t: Throwable): Unit = rSubscriber.onError(t) + override def onCompleted(): Unit = rSubscriber.onComplete() + override def onNext(value: T): Unit = { + //The onNext method of the buffered returns an Ack synchronously. + rSubscriber.onNext(value) + } + } - override def onNext(value: O): Unit = - sync.onNext(value) - }))) + def grpcObserverToMonixSubscriber[T](observer: StreamObserver[T], s: Scheduler): Subscriber[T] = + new Subscriber[T] { + override implicit def scheduler: Scheduler = s + override def onError(t: Throwable): Unit = observer.onError(t) + override def onComplete(): Unit = observer.onCompleted() + override def onNext(value: T): Future[Ack] = + try { + observer.onNext(value) + Continue + } catch { + case t: Throwable ⇒ + observer.onError(t) + Stop } - } } implicit def streamToObserver[T](stream: StreamObserver[T])(implicit sch: Scheduler): Observer.Sync[T] = @@ -64,6 +89,9 @@ object GrpcMonix { } } + def guavaFutureToIO[T](future: ListenableFuture[T]): IO[T] = + IO.fromFuture(IO(Grpc.guavaFuture2ScalaFuture(future))) + implicit def observerToStream[T](observer: Sync[T])(implicit sch: Scheduler): StreamObserver[T] = { new StreamObserver[T] { override def onNext(value: T): Unit = observer.onNext(value) @@ -74,6 +102,8 @@ object GrpcMonix { } } + private val overflow: OverflowStrategy.Synchronous[Nothing] = OverflowStrategy.Unbounded + def streamObservable[T](implicit sch: Scheduler): (Observable[T], StreamObserver[T]) = { val (in, out) = Observable.multicast[T](MulticastStrategy.replay, overflow) diff --git a/kademlia/grpc/jvm/src/main/scala/fluence/kad/grpc/client/KademliaClient.scala b/kademlia/grpc/jvm/src/main/scala/fluence/kad/grpc/client/KademliaClient.scala deleted file mode 100644 index 7cdbc6cdfb..0000000000 --- a/kademlia/grpc/jvm/src/main/scala/fluence/kad/grpc/client/KademliaClient.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (C) 2017 Fluence Labs Limited - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package fluence.kad.grpc.client - -import cats.effect.IO -import com.google.protobuf.ByteString -import fluence.codec.PureCodec -import fluence.kad.protocol.{Contact, KademliaRpc, Key, Node} -import fluence.kad.protocol -import fluence.kad.protobuf -import fluence.kad.protobuf.grpc.KademliaGrpc -import io.grpc.{CallOptions, ManagedChannel} -import fluence.kad.KeyProtobufCodecs._ - -import scala.concurrent.ExecutionContext -import scala.language.implicitConversions - -/** - * Implementation of KademliaClient over GRPC, with Task and Contact. - * - * @param stub GRPC Kademlia Stub - */ -class KademliaClient(stub: IO[KademliaGrpc.Kademlia])( - implicit - codec: PureCodec[protocol.Node[Contact], protobuf.Node], - ec: ExecutionContext -) extends KademliaRpc[Contact] { - - private val keyBS = PureCodec.codec[Key, ByteString].direct.toKleisli[IO] - - import cats.instances.stream._ - - private val streamCodec = PureCodec.codec[Stream[protocol.Node[Contact]], Stream[protobuf.Node]] - - /** - * Ping the contact, get its actual Node status, or fail - */ - override def ping(): IO[Node[Contact]] = - for { - n ← IO.fromFuture(stub.map(_.ping(protobuf.PingRequest()))) - nc ← codec.inverse.runF[IO](n) - } yield nc - - /** - * Perform a local lookup for a key, return K closest known nodes - * - * @param key Key to lookup - */ - override def lookup(key: Key, numberOfNodes: Int): IO[Seq[Node[Contact]]] = - for { - k ← keyBS(key) - res ← IO.fromFuture(stub.map(_.lookup(protobuf.LookupRequest(k, numberOfNodes)))) - resDec ← streamCodec.inverse.runF[IO](res.nodes.toStream) - } yield resDec - - /** - * Perform a local lookup for a key, return K closest known nodes, going away from the second key - * - * @param key Key to lookup - */ - override def lookupAway(key: Key, moveAwayFrom: Key, numberOfNodes: Int): IO[Seq[Node[Contact]]] = - for { - k ← keyBS(key) - moveAwayK ← keyBS(moveAwayFrom) - res ← IO.fromFuture( - stub.map(_.lookupAway(protobuf.LookupAwayRequest(k, moveAwayK, numberOfNodes))) - ) - resDec ← streamCodec.inverse.runF[IO](res.nodes.toStream) - } yield resDec -} - -object KademliaClient { - - /** - * Shorthand to register KademliaClient inside NetworkClient. - * - * @param channelOptions Channel to remote node and Call options - */ - def register()( - channelOptions: IO[(ManagedChannel, CallOptions)] - )( - implicit - codec: PureCodec[protocol.Node[Contact], protobuf.Node], - ec: ExecutionContext - ): KademliaRpc[Contact] = - new KademliaClient(channelOptions.map { case (ch, opts) ⇒ new KademliaGrpc.KademliaStub(ch, opts) }) - -} diff --git a/kademlia/grpc/js/src/main/scala/fluence/kad/grpc/client/KademliaWebsocketClient.scala b/kademlia/grpc/src/main/scala/fluence/kad/grpc/client/KademliaClient.scala similarity index 65% rename from kademlia/grpc/js/src/main/scala/fluence/kad/grpc/client/KademliaWebsocketClient.scala rename to kademlia/grpc/src/main/scala/fluence/kad/grpc/client/KademliaClient.scala index 05e4c9a175..ebb5a17c1b 100644 --- a/kademlia/grpc/js/src/main/scala/fluence/kad/grpc/client/KademliaWebsocketClient.scala +++ b/kademlia/grpc/src/main/scala/fluence/kad/grpc/client/KademliaClient.scala @@ -17,33 +17,27 @@ package fluence.kad.grpc.client +import cats.effect.IO import cats.syntax.compose._ import cats.syntax.profunctor._ -import cats.effect.IO import com.google.protobuf.ByteString import fluence.codec.PureCodec import fluence.kad.KeyProtobufCodecs._ -import fluence.transport.websocket.ProtobufCodec._ import fluence.kad.protobuf.{NodesResponse, PingRequest} import fluence.kad.protocol.{Contact, KademliaRpc, Key, Node} import fluence.kad.{protobuf, protocol} -import fluence.proxy.grpc.WebsocketMessage -import fluence.transport.websocket.{GrpcProxyClient, WebsocketPipe} -import monix.execution.Scheduler.Implicits.global -import scalapb.GeneratedMessage +import fluence.stream.Connection +import fluence.transport.ProtobufCodec._ -import scala.concurrent.ExecutionContext import scala.language.higherKinds /** - * Kademlia client for websocket. + * Kademlia client. * - * @param connection Websocket pipe. */ -class KademliaWebsocketClient(connection: IO[WebsocketPipe[WebsocketMessage, WebsocketMessage]])( +class KademliaClient(connection: Connection)( implicit - codec: PureCodec[protocol.Node[Contact], protobuf.Node], - ec: ExecutionContext + codec: PureCodec[protocol.Node[Contact], protobuf.Node] ) extends KademliaRpc[Contact] { private val service = "fluence.kad.protobuf.grpc.Kademlia" @@ -67,10 +61,10 @@ class KademliaWebsocketClient(connection: IO[WebsocketPipe[WebsocketMessage, Web */ override def ping(): IO[Node[Contact]] = { for { - websocket ← connection - proxy: WebsocketPipe[GeneratedMessage, Node[Contact]] = GrpcProxyClient - .proxy(service, "ping", websocket, generatedMessageCodec, pingCodec) - response ← IO.fromFuture(IO(proxy.requestAndWaitOneResult(PingRequest()))) + request ← generatedMessageCodec.runF[IO](PingRequest()) + responseBytes ← connection + .handleUnary(service, "ping", request) + response ← pingCodec.runF[IO](responseBytes) } yield response } @@ -82,10 +76,9 @@ class KademliaWebsocketClient(connection: IO[WebsocketPipe[WebsocketMessage, Web override def lookup(key: Key, numberOfNodes: Int): IO[Seq[Node[Contact]]] = { for { k ← keyBS(key) - websocket ← connection - request = protobuf.LookupRequest(k, numberOfNodes) - proxy = GrpcProxyClient.proxy(service, "lookup", websocket, generatedMessageCodec, nodeContactCodec) - res ← IO.fromFuture(IO(proxy.requestAndWaitOneResult(request))) + request ← generatedMessageCodec.runF[IO](protobuf.LookupRequest(k, numberOfNodes)) + responseBytes ← connection.handleUnary(service, "lookup", request) + res ← nodeContactCodec.runF[IO](responseBytes) } yield res } @@ -98,10 +91,16 @@ class KademliaWebsocketClient(connection: IO[WebsocketPipe[WebsocketMessage, Web for { k ← keyBS(key) moveAwayK ← keyBS(moveAwayFrom) - websocket ← connection - req = protobuf.LookupAwayRequest(k, moveAwayK, numberOfNodes) - proxy = GrpcProxyClient.proxy(service, "lookupAway", websocket, generatedMessageCodec, nodeContactCodec) - res ← IO.fromFuture(IO(proxy.requestAndWaitOneResult(req))) + req ← generatedMessageCodec.runF[IO](protobuf.LookupAwayRequest(k, moveAwayK, numberOfNodes)) + responseBytes ← connection.handleUnary(service, "lookupAway", req) + res ← nodeContactCodec.runF[IO](responseBytes) } yield res } } + +object KademliaClient { + def apply(streamHandler: Connection)( + implicit + codec: PureCodec[Node[Contact], protobuf.Node] + ): KademliaRpc[Contact] = new KademliaClient(streamHandler) +} diff --git a/kademlia/jvm/src/test/scala/fluence/kad/NetworkSimulationSpec.scala b/kademlia/jvm/src/test/scala/fluence/kad/NetworkSimulationSpec.scala index 37790dc125..e92aca0111 100644 --- a/kademlia/jvm/src/test/scala/fluence/kad/NetworkSimulationSpec.scala +++ b/kademlia/jvm/src/test/scala/fluence/kad/NetworkSimulationSpec.scala @@ -20,20 +20,21 @@ package fluence.kad import java.net.InetAddress import java.security.SecureRandom -import cats._ import cats.effect.IO import cats.instances.try_._ import cats.syntax.eq._ import com.typesafe.config.ConfigFactory import fluence.crypto.KeyPair -import fluence.kad.grpc.client.KademliaClient +import fluence.grpc.{GrpcConnection, ServiceManager} import fluence.kad.grpc.server.KademliaServer import fluence.kad.grpc.KademliaGrpcUpdate +import fluence.kad.grpc.client.KademliaClient import fluence.kad.protocol.{Contact, ContactSecurity, KademliaRpc, Key} import fluence.kad.protobuf.grpc.KademliaGrpc import fluence.transport.grpc.GrpcConf import fluence.transport.grpc.client.GrpcClient import fluence.transport.grpc.server.GrpcServer +import io.grpc.{CallOptions, ManagedChannel} import monix.execution.Scheduler.Implicits.global import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Milliseconds, Seconds, Span} @@ -74,9 +75,15 @@ class NetworkSimulationSpec extends WordSpec with Matchers with ScalaFutures wit ) .unsafe(()) + private val services = List(KademliaGrpc.SERVICE) + private val serviceManager = ServiceManager(services) + + val builder: IO[(ManagedChannel, CallOptions)] => GrpcConnection = + GrpcConnection.builder(serviceManager) + private val client = GrpcClient .builder(key, IO.pure(contact.b64seed), clientConf) - .add(KademliaClient.register()) + .add(builder andThen KademliaClient.apply) .build private val kademliaClientRpc: Contact ⇒ KademliaRpc[Contact] = c ⇒ { diff --git a/node/src/test/scala/fluence/node/ClientNodeIntegrationSpec.scala b/node/src/test/scala/fluence/node/ClientNodeIntegrationSpec.scala index 3d29567d2b..46d8c28a5e 100644 --- a/node/src/test/scala/fluence/node/ClientNodeIntegrationSpec.scala +++ b/node/src/test/scala/fluence/node/ClientNodeIntegrationSpec.scala @@ -28,18 +28,17 @@ import com.typesafe.config.{ConfigFactory, ConfigValueFactory} import fluence.btree.client.MerkleBTreeClient.ClientState import fluence.client.core.{ClientServices, FluenceClient} import fluence.client.grpc.ClientGrpcServices -import fluence.contract.{BasicContract, ContractError} import fluence.contract.client.Contracts import fluence.contract.client.Contracts.NotFound +import fluence.contract.{BasicContract, ContractError} import fluence.crypto.ecdsa.Ecdsa -import fluence.crypto.{Crypto, CryptoError, DumbCrypto, KeyPair} import fluence.crypto.hash.JdkCryptoHasher import fluence.crypto.signature.{SignAlgo, Signer} +import fluence.crypto.{Crypto, DumbCrypto, KeyPair} import fluence.dataset.client.{ClientDatasetStorage, ClientDatasetStorageApi} import fluence.dataset.protocol.{ClientError, DatasetStorageRpc, ServerError} import fluence.kad.protocol.{Contact, ContactSecurity, Key} import fluence.kad.{KademliaConf, KademliaMVar} -import fluence.kvstore.StoreError import fluence.node.core.ContractsCacheConf import fluence.transport.grpc.client.GrpcClient import io.grpc.StatusRuntimeException diff --git a/project/plugins.sbt b/project/plugins.sbt index 9601e8996f..259abd72db 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -16,4 +16,6 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.0") addSbtPlugin("com.lihaoyi" % "workbench" % "0.4.0") +addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.0.0-M11") + libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.2" diff --git a/scala-multistream/grpc/jvm/src/main/scala/fluence/grpc/GrpcConnection.scala b/scala-multistream/grpc/jvm/src/main/scala/fluence/grpc/GrpcConnection.scala new file mode 100644 index 0000000000..bd8ae64378 --- /dev/null +++ b/scala-multistream/grpc/jvm/src/main/scala/fluence/grpc/GrpcConnection.scala @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2017 Fluence Labs Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package fluence.grpc + +import java.io.ByteArrayInputStream + +import cats.effect.IO +import fluence.stream.Connection +import io.grpc.MethodDescriptor.MethodType +import io.grpc.internal.IoUtils +import io.grpc.stub.{ClientCalls, StreamObserver} +import io.grpc.{CallOptions, ManagedChannel, MethodDescriptor} +import monix.execution.Scheduler +import monix.reactive.{Observable, OverflowStrategy} + +class GrpcConnection( + private val serviceManager: ServiceManager, + channelOptionsIO: IO[(ManagedChannel, CallOptions)] +)(implicit sch: Scheduler) + extends Connection { + + private val overflow: OverflowStrategy.Synchronous[Nothing] = OverflowStrategy.Unbounded + + private def checkType(methodDescriptor: MethodDescriptor[Any, Any], methodType: MethodType): IO[Unit] = { + if (methodDescriptor.getType == methodType) + IO.unit + else + IO.raiseError( + new IllegalArgumentException( + s"Cannot handle request, because method: ${methodDescriptor.getFullMethodName} is not of type: $methodType." + ) + ) + } + + override def handle( + service: String, + method: String, + requests: Observable[Array[Byte]] + ): IO[Observable[Array[Byte]]] = { + for { + methodDescriptor ← serviceManager.getMethodDescriptorF[IO](service, method) + _ ← checkType(methodDescriptor, MethodType.BIDI_STREAMING) + channelOptions ← channelOptionsIO + responses ← { + IO { + val (channel, callOptions) = channelOptions + val call = channel.newCall[Any, Any](methodDescriptor, callOptions) + + val operator = { obs: StreamObserver[Any] ⇒ + ClientCalls.asyncBidiStreamingCall(call, obs) + } + + //TODO delete double serialization + val requestsProto = requests.map(r ⇒ methodDescriptor.parseRequest(new ByteArrayInputStream(r))) + + GrpcMonix.liftByGrpcOperator(requestsProto, operator).map { r ⇒ + IoUtils.toByteArray(methodDescriptor.streamResponse(r)) + } + } + } + } yield responses + } + + override def handleUnary(service: String, method: String, request: Array[Byte]): IO[Array[Byte]] = { + for { + methodDescriptor ← serviceManager.getMethodDescriptorF[IO](service, method) + _ ← checkType(methodDescriptor, MethodType.UNARY) + channelOptions ← channelOptionsIO + response ← { + val (channel, callOptions) = channelOptions + val call = channel.newCall[Any, Any](methodDescriptor, callOptions) + + GrpcMonix + .guavaFutureToIO( + ClientCalls.futureUnaryCall( + call, + methodDescriptor.parseRequest(new ByteArrayInputStream(request)) + ) + ) + .map { r ⇒ + IoUtils.toByteArray(methodDescriptor.streamResponse(r)) + } + } + } yield response + } + +} + +object GrpcConnection { + def builder(serviceManager: ServiceManager)(channelOptions: IO[(ManagedChannel, CallOptions)])(implicit scheduler: Scheduler): GrpcConnection = { + new GrpcConnection(serviceManager, channelOptions) + } +} diff --git a/scala-multistream/grpc/jvm/src/main/scala/fluence/grpc/ServiceManager.scala b/scala-multistream/grpc/jvm/src/main/scala/fluence/grpc/ServiceManager.scala new file mode 100644 index 0000000000..606314d9c7 --- /dev/null +++ b/scala-multistream/grpc/jvm/src/main/scala/fluence/grpc/ServiceManager.scala @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2017 Fluence Labs Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package fluence.grpc + +import cats.MonadError +import cats.syntax.flatMap._ +import io.grpc._ + +import scala.collection.JavaConverters._ +import scala.language.higherKinds + +class ServiceManager(services: Map[String, MethodDescriptor[Any, Any]]) { + + /** + * Gets grpc method descriptor from registered services. + * + * @param service Name of service. + * @param method Name of method. + * + * @return Method descriptor or None, if there is no descriptor in registered services. + */ + private def getMethodDescriptor(service: String, method: String): Option[MethodDescriptor[Any, Any]] = { + for { + methodDescriptor ← services.get(service + "/" + method) + } yield methodDescriptor + } + + def getMethodDescriptorF[F[_]](service: String, method: String)( + implicit F: MonadError[F, Throwable] + ): F[MethodDescriptor[Any, Any]] = + F.pure(getMethodDescriptor(service, method)).flatMap { + case Some(md) ⇒ F.pure(md) + case None ⇒ F.raiseError(new IllegalArgumentException(s"There is no $service/$method method.")) + } + +} + +object ServiceManager { + + def apply(server: Server): ServiceManager = { + val methodDescriptorMap: Map[String, MethodDescriptor[Any, Any]] = server.getServices.asScala + .flatMap(_.getMethods.asScala.map(_.getMethodDescriptor.asInstanceOf[MethodDescriptor[Any, Any]])) + .map(m ⇒ m.getFullMethodName -> m) + .toMap + new ServiceManager(methodDescriptorMap) + } + + def apply(serverDescriptors: List[ServiceDescriptor]): ServiceManager = { + val methodDescriptorMap = serverDescriptors + .flatMap(sd ⇒ sd.getMethods.asScala.map(_.asInstanceOf[MethodDescriptor[Any, Any]])) + .map(m ⇒ m.getFullMethodName -> m) + .toMap + + new ServiceManager(methodDescriptorMap) + } +} diff --git a/scala-multistream/src/main/scala/fluence/stream/Connection.scala b/scala-multistream/src/main/scala/fluence/stream/Connection.scala new file mode 100644 index 0000000000..b04814a32d --- /dev/null +++ b/scala-multistream/src/main/scala/fluence/stream/Connection.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2017 Fluence Labs Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package fluence.stream + +import cats.effect.IO +import monix.reactive.Observable + +/** + * Interface that generalize connection between nodes. + */ +trait Connection { + + /** + * BIDI streaming request. + * @param service Protocol service name. + * @param method Protocol method name. + * @param requests Request stream. + * @return Response stream. + */ + def handle( + service: String, + method: String, + requests: Observable[Array[Byte]] + ): IO[Observable[Array[Byte]]] + + /** + * One request - on response. + * @param service Protocol service name. + * @param method Protocol method name. + * @param request Request for opposite node. + * @return Response from opposite node. + */ + def handleUnary( + service: String, + method: String, + request: Array[Byte] + ): IO[Array[Byte]] +} diff --git a/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/GrpcWebsocketProxy.scala b/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/GrpcWebsocketProxy.scala index 957144ebe5..2dae259eee 100644 --- a/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/GrpcWebsocketProxy.scala +++ b/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/GrpcWebsocketProxy.scala @@ -23,7 +23,6 @@ import fs2.async.mutable.Topic import fs2.{io ⇒ _, _} import monix.eval.Task import monix.execution.{Scheduler ⇒ TaskScheduler} -import monix.reactive.Observable import org.http4s._ import org.http4s.dsl.Http4sDsl import org.http4s.server.Server @@ -47,7 +46,7 @@ object GrpcWebsocketProxy extends Http4sDsl[Task] with slogging.LazyLogging { case GET -> Root ⇒ //Creates a proxy for each connection to separate the cache for all clients. - val proxyGrpc = new ProxyGrpc(inProcessGrpc) + val proxyGrpc = new ProxyWebsocketGrpc(inProcessGrpc) def replyPipe(topic: Topic[Task, WebSocketFrame]): Sink[Task, WebSocketFrame] = _.flatMap { case Binary(data, _) ⇒ diff --git a/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/InProcessGrpc.scala b/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/InProcessGrpc.scala index 4ed6b9a1c4..8a9b9d5ae6 100644 --- a/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/InProcessGrpc.scala +++ b/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/InProcessGrpc.scala @@ -20,17 +20,18 @@ package fluence.grpc.proxy import java.util.concurrent.TimeUnit import cats.effect.IO -import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import fluence.grpc.ServiceManager import io.grpc._ +import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} -import scala.collection.JavaConverters._ import scala.language.higherKinds /** * Grpc server and channel, that work in memory without transport. */ final class InProcessGrpc private (private val server: Server, private val channel: ManagedChannel) { - val services: List[ServerServiceDefinition] = server.getServices.asScala.toList + + val serviceManager: ServiceManager = ServiceManager(server) /** * Create new call from channel to server. diff --git a/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/ProxyGrpc.scala b/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/ProxyWebsocketGrpc.scala similarity index 83% rename from transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/ProxyGrpc.scala rename to transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/ProxyWebsocketGrpc.scala index ed536d573c..761f301fc8 100644 --- a/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/ProxyGrpc.scala +++ b/transport/grpc-proxy/src/main/scala/fluence/grpc/proxy/ProxyWebsocketGrpc.scala @@ -44,7 +44,7 @@ case class Result(observable: Observable[Array[Byte]], controlOnComplete: Boolea * * @param inProcessGrpc In-process services and client channel. */ -class ProxyGrpc(inProcessGrpc: InProcessGrpc)( +class ProxyWebsocketGrpc(inProcessGrpc: InProcessGrpc)( implicit ec: ExecutionContext ) extends slogging.LazyLogging { @@ -55,30 +55,6 @@ class ProxyGrpc(inProcessGrpc: InProcessGrpc)( private val overflow: OverflowStrategy.Synchronous[Nothing] = OverflowStrategy.Unbounded - /** - * Gets grpc method descriptor from registered services. - * - * @param service Name of service. - * @param method Name of method. - * - * @return Method descriptor or None, if there is no descriptor in registered services. - */ - private def getMethodDescriptor(service: String, method: String): Option[MethodDescriptor[Any, Any]] = { - for { - serviceDescriptor ← inProcessGrpc.services.find(_.getServiceDescriptor.getName == service) - serverMethodDefinition ← Option( - serviceDescriptor.getMethod(service + "/" + method).asInstanceOf[ServerMethodDefinition[Any, Any]] - ) - methodDescriptor ← Option(serverMethodDefinition.getMethodDescriptor) - } yield methodDescriptor - } - - private def getMethodDescriptorF(service: String, method: String): Task[MethodDescriptor[Any, Any]] = - Task(getMethodDescriptor(service, method)).flatMap { - case Some(md) ⇒ Task.pure(md) - case None ⇒ Task.raiseError(new IllegalArgumentException(s"There is no $service/$method method.")) - } - /** * Creates listener for client call and connects it with observable. * @@ -182,7 +158,7 @@ class ProxyGrpc(inProcessGrpc: InProcessGrpc)( reqE: Either[StatusException, InputStream] ): Task[Option[Result]] = { for { - methodDescriptor ← getMethodDescriptorF(service, method) + methodDescriptor ← inProcessGrpc.serviceManager.getMethodDescriptorF[Task](service, method) _ = logger.debug("Websocket method descriptor: " + methodDescriptor.toString) req ← Task(reqE.map(methodDescriptor.parseRequest)) _ = logger.debug("Websocket request: " + req) diff --git a/transport/grpc-proxy/src/test/scala/fluence/grpc/proxy/ProxyCallSpec.scala b/transport/grpc-proxy/src/test/scala/fluence/grpc/proxy/ProxyCallSpec.scala index 029a033bbd..003fc75853 100644 --- a/transport/grpc-proxy/src/test/scala/fluence/grpc/proxy/ProxyCallSpec.scala +++ b/transport/grpc-proxy/src/test/scala/fluence/grpc/proxy/ProxyCallSpec.scala @@ -95,7 +95,7 @@ class ProxyCallSpec extends WordSpec with Matchers with slogging.LazyLogging { TestServiceGrpc.METHOD_TEST_COUNT ) - def sendMessage(proxyGrpc: ProxyGrpc, message: WebsocketMessage): Observable[Array[Byte]] = { + def sendMessage(proxyGrpc: ProxyWebsocketGrpc, message: WebsocketMessage): Observable[Array[Byte]] = { proxyGrpc .handleMessage( @@ -110,7 +110,7 @@ class ProxyCallSpec extends WordSpec with Matchers with slogging.LazyLogging { "work with unary calls" in { inService() { inProcessGrpc ⇒ - val proxyGrpc = new ProxyGrpc(inProcessGrpc) + val proxyGrpc = new ProxyWebsocketGrpc(inProcessGrpc) val str = "test" val listStr = Seq("test1", "test2") @@ -139,7 +139,7 @@ class ProxyCallSpec extends WordSpec with Matchers with slogging.LazyLogging { "work with bidi streams" in { inService() { inProcessGrpc ⇒ - val proxyGrpc = new ProxyGrpc(inProcessGrpc) + val proxyGrpc = new ProxyWebsocketGrpc(inProcessGrpc) val requestId = Random.nextLong() @@ -166,7 +166,7 @@ class ProxyCallSpec extends WordSpec with Matchers with slogging.LazyLogging { "raise error if the proxy was closed" in { inService() { inProcessGrpc ⇒ - val proxyGrpc = new ProxyGrpc(inProcessGrpc) + val proxyGrpc = new ProxyWebsocketGrpc(inProcessGrpc) inProcessGrpc.close().unsafeRunSync() @@ -186,7 +186,7 @@ class ProxyCallSpec extends WordSpec with Matchers with slogging.LazyLogging { "close stream on error from client" in { inService() { inProcessGrpc ⇒ - val proxyGrpc = new ProxyGrpc(inProcessGrpc) + val proxyGrpc = new ProxyWebsocketGrpc(inProcessGrpc) val testMessage = generateMessage(1111L, TestRequest(Some(TestMessage())), TestServiceGrpc.METHOD_TEST) @@ -211,7 +211,7 @@ class ProxyCallSpec extends WordSpec with Matchers with slogging.LazyLogging { "close stream on close from client" in { inService() { inProcessGrpc ⇒ - val proxyGrpc = new ProxyGrpc(inProcessGrpc) + val proxyGrpc = new ProxyWebsocketGrpc(inProcessGrpc) val testMessage = generateMessage(1111L, TestRequest(Some(TestMessage())), TestServiceGrpc.METHOD_TEST_COUNT) @@ -231,7 +231,7 @@ class ProxyCallSpec extends WordSpec with Matchers with slogging.LazyLogging { "raise error if no method or service descriptor in proxy" in { inService() { inProcessGrpc ⇒ - val proxyGrpc = new ProxyGrpc(inProcessGrpc) + val proxyGrpc = new ProxyWebsocketGrpc(inProcessGrpc) val testMessage = generateMessage(555L, TestRequest(Some(TestMessage())), TestServiceGrpc.METHOD_TEST) diff --git a/transport/websocket-js/src/main/scala/fluence/transport/websocket/ProtobufCodec.scala b/transport/grpc/src/main/scala/fluence/transport/ProtobufCodec.scala similarity index 97% rename from transport/websocket-js/src/main/scala/fluence/transport/websocket/ProtobufCodec.scala rename to transport/grpc/src/main/scala/fluence/transport/ProtobufCodec.scala index efebb80be1..730d1aa08e 100644 --- a/transport/websocket-js/src/main/scala/fluence/transport/websocket/ProtobufCodec.scala +++ b/transport/grpc/src/main/scala/fluence/transport/ProtobufCodec.scala @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package fluence.transport.websocket +package fluence.transport import fluence.codec.PureCodec import scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message} diff --git a/transport/websocket-js/src/main/scala/fluence/transport/websocket/GrpcProxyClient.scala b/transport/websocket-js/src/main/scala/fluence/transport/websocket/GrpcProxyClient.scala deleted file mode 100644 index 5254fb7a22..0000000000 --- a/transport/websocket-js/src/main/scala/fluence/transport/websocket/GrpcProxyClient.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (C) 2017 Fluence Labs Limited - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package fluence.transport.websocket - -import com.google.protobuf.ByteString -import fluence.codec.PureCodec -import fluence.proxy.grpc.Status.Code -import fluence.proxy.grpc.{Status, WebsocketMessage} -import fluence.transport.websocket.WebsocketPipe.WebsocketClient -import monix.execution.Ack -import monix.execution.Ack.Continue -import monix.reactive.{Observable, Observer} - -import scala.concurrent.{ExecutionContext, Future} -import scala.language.higherKinds -import scala.util.Random - -object GrpcProxyClient { - - private def message(service: String, method: String, requestId: Long)(payload: Array[Byte]): WebsocketMessage = { - WebsocketMessage(service, method, requestId, WebsocketMessage.Response.Payload(ByteString.copyFrom(payload))) - } - - /** - * Create proxy WebsocketPipe for working with GRPC interfaces on server. - * @param service Name of GRPC service. - * @param method Name of GRPC method. - * @param websocketClient Websocket transport layer. - * @param requestCodec Codec for converting requests to byte array. - * @param responseCodec Codec for converting responses from byte array. - * @tparam A Request type. - * @tparam B Response type. - * @return Pipe with input observer, output observable and websocket status observable. - */ - def proxy[A, B]( - service: String, - method: String, - websocketClient: WebsocketClient[WebsocketMessage], - requestCodec: PureCodec.Func[A, Array[Byte]], - responseCodec: PureCodec.Func[Array[Byte], B] - )(implicit ec: ExecutionContext): WebsocketPipe[A, B] = { - val requestId = Random.nextLong() - def messageCreation: Array[Byte] ⇒ WebsocketMessage = message(service, method, requestId) - - val wsObserver = websocketClient.input - val wsObservable = websocketClient.output - - val proxyObserver: Observer[A] = new Observer[A] { - override def onNext(elem: A): Future[Ack] = { - val req = requestCodec.unsafe(elem) - val message = messageCreation(req) - wsObserver.onNext(message) - // this will break backpressure, but if we do the chain of futures, - // logic will be broken due to incomprehensible circumstances - // TODO investigate and fix it - Future(Continue) - } - - override def onError(ex: Throwable): Unit = wsObserver.onError(ex) - - override def onComplete(): Unit = wsObserver.onComplete() - } - - //we will collect only messages that have equals method name, service name and request id - val proxyObservable = wsObservable.collect { - case WebsocketMessage(s, m, rId, payload) if s == service && m == method && rId == rId ⇒ - payload - }.takeWhile { - case WebsocketMessage.Response.CompleteStatus(status) if status.code == Code.OK ⇒ - false - case _ ⇒ true - }.flatMap { - case WebsocketMessage.Response.Payload(payload) ⇒ - Observable(responseCodec.unsafe(payload.toByteArray)) - case WebsocketMessage.Response.CompleteStatus(status) if status.code == Code.OK ⇒ - Observable() - case WebsocketMessage.Response.CompleteStatus(status) ⇒ - Observable.raiseError(new StatusException(status)) - case WebsocketMessage.Response.Empty ⇒ - Observable.raiseError(new StatusException(Status(Status.Code.UNKNOWN, "Empty response received."))) - } - - WebsocketPipe(proxyObserver, proxyObservable, websocketClient.statusOutput) - } -} diff --git a/transport/websocket-js/src/main/scala/fluence/transport/websocket/WebsocketConnection.scala b/transport/websocket-js/src/main/scala/fluence/transport/websocket/WebsocketConnection.scala new file mode 100644 index 0000000000..ee0bcead81 --- /dev/null +++ b/transport/websocket-js/src/main/scala/fluence/transport/websocket/WebsocketConnection.scala @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2017 Fluence Labs Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package fluence.transport.websocket + +import cats.effect.IO +import com.google.protobuf.ByteString +import fluence.proxy.grpc.Status.Code +import fluence.proxy.grpc.{Status, WebsocketMessage} +import fluence.stream.Connection +import monix.execution.Scheduler +import monix.reactive.Observable + +import scala.language.higherKinds +import scala.util.Random + +class WebsocketConnection( + //TODO change WebsocketPipe to something more generalized in order to use different websocket clients + connection: IO[WebsocketPipe[WebsocketMessage, WebsocketMessage]] +)( + implicit s: Scheduler +) extends Connection { + + private def message(service: String, method: String, requestId: Long)(payload: Array[Byte]): WebsocketMessage = { + WebsocketMessage(service, method, requestId, WebsocketMessage.Response.Payload(ByteString.copyFrom(payload))) + } + + override def handle( + service: String, + method: String, + requests: Observable[Array[Byte]] + ): IO[Observable[Array[Byte]]] = { + connection.map { websocketClient ⇒ + val requestId = Random.nextLong() + def messageCreation: Array[Byte] ⇒ WebsocketMessage = message(service, method, requestId) + + val wsObserver = websocketClient.input + val wsObservable = websocketClient.output + + requests.map(messageCreation).subscribe(wsObserver) + + //we will collect only messages that have equals method name, service name and request id + val proxyObservable = wsObservable.collect { + case WebsocketMessage(srv, m, rId, payload) if srv == service && m == method && rId == rId ⇒ + payload + }.takeWhile { + case WebsocketMessage.Response.CompleteStatus(status) if status.code == Code.OK ⇒ + false + case _ ⇒ true + }.flatMap { + case WebsocketMessage.Response.Payload(payload) ⇒ + Observable(payload.toByteArray) + case WebsocketMessage.Response.CompleteStatus(status) if status.code == Code.OK ⇒ + Observable() + case WebsocketMessage.Response.CompleteStatus(status) ⇒ + Observable.raiseError(new StatusException(status)) + case WebsocketMessage.Response.Empty ⇒ + Observable.raiseError(new StatusException(Status(Status.Code.UNKNOWN, "Empty response received."))) + } + + proxyObservable + } + } + + override def handleUnary(service: String, method: String, request: Array[Byte]): IO[Array[Byte]] = { + for { + observable ← handle(service, method, Observable(request)) + res ← observable.headL.toIO + } yield { + res + } + } +}