Skip to content

Commit

Permalink
Connection refactoring (#136)
Browse files Browse the repository at this point in the history
* add range call to js client, add random nodes processing for get and range requests

* WIP experiments with StreamHandler

* add StreamHandler implementation to websocket

* grpc logic refactoring

* delete unused class

* fix default headers bug

* refactoring, grpc-monix util methods

* reshuffle libs

* missed class

* renaming and some docs

* add TODOs, revert some unnecessary changes

* client put refactoring

* ClientPut refactoring

* ClientGet refactoring

* ClientRange refactoring

* small fixes

* delete println

* add backpressure

* small fixes
  • Loading branch information
DieMyst authored and alari committed Sep 3, 2018
1 parent 7c4bc83 commit 39c2830
Show file tree
Hide file tree
Showing 30 changed files with 1,012 additions and 1,048 deletions.
96 changes: 83 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ lazy val `kademlia-grpc-js` = `kademlia-grpc`.js
.dependsOn(`transport-websocket-js`)

lazy val `kademlia-grpc-jvm` = `kademlia-grpc`.jvm
.dependsOn(`grpc-monix-converter`)

lazy val `kademlia-monix` =
crossProject(JVMPlatform, JSPlatform)
Expand Down Expand Up @@ -176,13 +177,15 @@ lazy val `kademlia` = crossProject(JVMPlatform, JSPlatform)
scalaJSModuleKind in Test := ModuleKind.CommonJSModule
)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(`kademlia-monix`, `kademlia-grpc`, `kademlia-testkit` % Test)
.dependsOn(`kademlia-monix`, `kademlia-grpc`, `scala-multistream-grpc`, `kademlia-testkit` % Test)

lazy val `kademlia-js` = `kademlia`.js
lazy val `kademlia-jvm` = `kademlia`.jvm

lazy val `transport-grpc-monix` = project
.in(file("transport/grpc-monix"))
//TODO utility project, could be replaced to another project
//TODO add unit tests for this project
lazy val `grpc-monix-converter` = project
.in(file("grpc-monix-converter"))
.settings(
commons,
grpc,
Expand All @@ -191,6 +194,7 @@ lazy val `transport-grpc-monix` = project
scalatestKit
)
)
.dependsOn(`scala-multistream-jvm`)
.enablePlugins(AutomateHeaderPlugin)

lazy val `transport-grpc` = crossProject(JVMPlatform, JSPlatform)
Expand Down Expand Up @@ -218,7 +222,7 @@ lazy val `transport-grpc` = crossProject(JVMPlatform, JSPlatform)
scalaJSModuleKind in Test := ModuleKind.CommonJSModule
)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(`transport-core`, `kademlia-protocol`)
.dependsOn(`transport-core`, `kademlia-protocol`, `scala-multistream`)

lazy val `transport-grpc-js` = `transport-grpc`.js
lazy val `transport-grpc-jvm` = `transport-grpc`.jvm
Expand Down Expand Up @@ -258,7 +262,7 @@ lazy val `transport-grpc-proxy` = project
scalatest
)
)
.dependsOn(`transport-core-jvm`, `websocket-protobuf-jvm`, `transport-grpc-monix`)
.dependsOn(`transport-core-jvm`, `websocket-protobuf-jvm`, `grpc-monix-converter`, `scala-multistream-grpc-jvm`)
.enablePlugins(AutomateHeaderPlugin)

lazy val `transport-websocket-js` = project
Expand All @@ -282,7 +286,7 @@ lazy val `transport-websocket-js` = project
)
.enablePlugins(ScalaJSPlugin)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(`websocket-protobuf-js`)
.dependsOn(`websocket-protobuf-js`, `scala-multistream-js`)

lazy val `transport-core` = crossProject(JVMPlatform, JSPlatform)
.withoutSuffixFor(JVMPlatform)
Expand Down Expand Up @@ -508,7 +512,7 @@ lazy val `dataset-grpc` = crossProject(JVMPlatform, JSPlatform)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(`dataset-client`, `transport-grpc`)

lazy val `dataset-grpc-jvm` = `dataset-grpc`.jvm.dependsOn(`dataset-node`, `transport-grpc-monix`)
lazy val `dataset-grpc-jvm` = `dataset-grpc`.jvm.dependsOn(`dataset-node`, `grpc-monix-converter`)
lazy val `dataset-grpc-js` = `dataset-grpc`.js
.enablePlugins(ScalaJSBundlerPlugin)
.dependsOn(`transport-websocket-js`)
Expand Down Expand Up @@ -669,15 +673,16 @@ lazy val `client-grpc` = crossProject(JVMPlatform, JSPlatform)
.in(file("client/grpc"))
.settings(commons)
.jsSettings(
scalaJSModuleKind in Test := ModuleKind.CommonJSModule,
fork in Test := false
scalaJSModuleKind in Test := ModuleKind.CommonJSModule,
fork in Test := false
)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(`client-core`, `transport-grpc`, `kademlia-grpc`, `dataset-grpc`, `contract-grpc`)
.dependsOn(`client-core`, `kademlia-grpc`, `dataset-grpc`, `contract-grpc`)

lazy val `client-grpc-js` = `client-grpc`.js
.enablePlugins(ScalaJSBundlerPlugin)
lazy val `client-grpc-jvm` = `client-grpc`.jvm
.dependsOn(`scala-multistream-grpc-jvm`)

lazy val `client-cli` = crossProject(JVMPlatform, JSPlatform)
.withoutSuffixFor(JVMPlatform)
Expand Down Expand Up @@ -715,14 +720,14 @@ lazy val `client-cli-app` = crossProject(JVMPlatform, JSPlatform)
)
)
.jsSettings(
fork in Test := false,
fork in Test := false,
scalaJSUseMainModuleInitializer := false,
jsEnv in Compile := new org.scalajs.jsenv.jsdomnodejs.JSDOMNodeJSEnv(),
workbenchStartMode := WorkbenchStartModes.Manual,
skip in packageJSDependencies := false,
scalaJSModuleKind in Test := ModuleKind.CommonJSModule,
webpackBundlingMode := BundlingMode.LibraryAndApplication(),
emitSourceMaps := false
webpackBundlingMode := BundlingMode.LibraryAndApplication(),
emitSourceMaps := false
)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(`client-cli`, `client-grpc`)
Expand Down Expand Up @@ -765,3 +770,68 @@ lazy val `node` = project
)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(`node-grpc`)

//TODO replace to another project
lazy val `scala-multistream` = crossProject(JVMPlatform, JSPlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(FluenceCrossType)
.in(file("scala-multistream"))
.settings(
commons,
libraryDependencies ++= Seq(
"io.monix" %%% "monix" % MonixV
)
)
.jsSettings(
fork in Test := false
)
.enablePlugins(AutomateHeaderPlugin)

lazy val `scala-multistream-js` = `scala-multistream`.js
lazy val `scala-multistream-jvm` = `scala-multistream`.jvm

lazy val `scala-multistream-grpc` = crossProject(JVMPlatform, JSPlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(FluenceCrossType)
.in(file("scala-multistream/grpc"))
.settings(
commons,
protobuf,
grpc,
libraryDependencies ++= Seq(
"io.monix" %%% "monix" % MonixV,
"one.fluence" %%% "codec-protobuf" % CodecV,
scalatest
)
)
.jsSettings(
fork in Test := false
)
.dependsOn(`scala-multistream`)
.enablePlugins(AutomateHeaderPlugin)

lazy val `scala-multistream-grpc-js` = `scala-multistream-grpc`.js
lazy val `scala-multistream-grpc-jvm` = `scala-multistream-grpc`.jvm
.dependsOn(`grpc-monix-converter`)

lazy val `scala-multistream-websocket` = crossProject(JVMPlatform, JSPlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(FluenceCrossType)
.in(file("scala-multistream/websocket"))
.settings(
commons,
protobuf,
libraryDependencies ++= Seq(
"io.monix" %%% "monix" % MonixV,
"one.fluence" %%% "codec-protobuf" % CodecV,
scalatest
)
)
.jsSettings(
fork in Test := false
)
.dependsOn(`scala-multistream`)
.enablePlugins(AutomateHeaderPlugin)

lazy val `scala-multistream-websocket-js` = `scala-multistream-websocket`.js
lazy val `scala-multistream-websocket-jvm` = `scala-multistream-websocket`.jvm
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import fluence.contract.protocol.{ContractAllocatorRpc, ContractsCacheRpc}
import fluence.crypto.signature.SignAlgo.CheckerFn
import fluence.dataset.grpc.client.DatasetStorageClient
import fluence.dataset.protocol.DatasetStorageRpc
import fluence.kad.grpc.client.KademliaWebsocketClient
import fluence.kad.grpc.client.KademliaClient
import fluence.kad.protocol.{Contact, KademliaRpc}
import fluence.proxy.grpc.WebsocketMessage
import fluence.transport.websocket.{ConnectionPool, WebsocketPipe, WebsocketT}
import fluence.transport.websocket._
import monix.execution.Scheduler
import monix.reactive.Observable

Expand All @@ -54,12 +54,12 @@ class ClientWebsocketServices(connectionPool: ConnectionPool[WebsocketMessage, W
contact.websocketPort.map { wsPort
val url = "ws://" + contact.addr + ":" + wsPort

def connection: IO[WebsocketPipe[WebsocketMessage, WebsocketMessage]] =
IO(connectionPool.getOrCreateConnection(url))
def connection: WebsocketConnection =
new WebsocketConnection(IO(connectionPool.getOrCreateConnection(url)))

new ClientServices[F, BasicContract, Contact] {
override def kademlia: KademliaRpc[Contact] =
new KademliaWebsocketClient(connection)
new KademliaClient(connection)

override def contractsCache: ContractsCacheRpc[BasicContract] =
new ContractsCacheClient[BasicContract](connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

package fluence.client.grpc

import cats.effect.Effect
import cats.effect.{Effect, IO}
import fluence.client.core.ClientServices
import fluence.contract.BasicContract
import fluence.contract.grpc.client.{ContractAllocatorClient, ContractsCacheClient}
import fluence.contract.protobuf.grpc.{ContractAllocatorGrpc, ContractsCacheGrpc}
import fluence.contract.protocol.{ContractAllocatorRpc, ContractsCacheRpc}
import fluence.crypto.signature.SignAlgo.CheckerFn
import fluence.dataset.grpc.client.DatasetStorageClient
import fluence.dataset.protobuf.grpc.DatasetStorageRpcGrpc
import fluence.dataset.protocol.DatasetStorageRpc
import fluence.grpc.{GrpcConnection, ServiceManager}
import fluence.kad.grpc.client.KademliaClient
import fluence.kad.protobuf.grpc.KademliaGrpc
import fluence.kad.protocol.{Contact, KademliaRpc}
import fluence.transport.grpc.client.GrpcClient
import io.grpc.{CallOptions, ManagedChannel}
import monix.execution.Scheduler
import monix.reactive.Observable
import shapeless.HNil
Expand All @@ -46,11 +51,22 @@ object ClientGrpcServices {
import fluence.contract.grpc.BasicContractCodec.{codec contractCodec}
import fluence.kad.KademliaNodeCodec.{pureCodec nodeCodec}

//TODO is it possible to avoid this?
val services = List(
KademliaGrpc.SERVICE,
ContractsCacheGrpc.SERVICE,
ContractAllocatorGrpc.SERVICE,
DatasetStorageRpcGrpc.SERVICE
)

val serviceManager = ServiceManager(services)
val handlerBuilder: IO[(ManagedChannel, CallOptions)] => GrpcConnection = GrpcConnection.builder(serviceManager)

val client = builder
.add(KademliaClient.register())
.add(ContractsCacheClient.register[BasicContract]())
.add(ContractAllocatorClient.register[BasicContract]())
.add(DatasetStorageClient.register[F]())
.add(handlerBuilder andThen KademliaClient.apply)
.add(handlerBuilder andThen ContractsCacheClient.apply[BasicContract])
.add(handlerBuilder andThen ContractAllocatorClient.apply[BasicContract])
.add(handlerBuilder andThen DatasetStorageClient.apply[F])
.build

contact
Expand Down

This file was deleted.

Loading

0 comments on commit 39c2830

Please sign in to comment.