Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Added handlers to ParrotTransport #45

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/main/scala/com/twitter/parrot/server/ParrotTransport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import scala.collection.mutable
import scala.util.Random
import com.twitter.finagle.Service
import com.twitter.logging.Logger
import com.twitter.parrot.util.IgnorantHostnameVerifier
import com.twitter.parrot.util.IgnorantTrustManager
import com.twitter.parrot.util.{IgnorantHostnameVerifier, IgnorantTrustManager}
import com.twitter.util.Future
import com.twitter.util.Time
import com.twitter.util.Try
Expand All @@ -31,12 +30,18 @@ import com.twitter.util.Await

trait ParrotTransport[Req <: ParrotRequest, Rep] extends Service[Req, Rep] {
val log = Logger.get(getClass.getName)
private[this] val handlers = new mutable.ListBuffer[Try[Rep] => Unit]()
override def apply(request: Req): Future[Rep] =
private[this] val responseHandlers = new mutable.ListBuffer[Try[Rep] => Unit]()
private[this] val requestHandlers = new mutable.ListBuffer[Req => Unit]()
private[this] val responseReqHandlers = new mutable.ListBuffer[(Try[Rep], Req) => Unit]()
override def apply(request: Req): Future[Rep] = {
requestHandlers foreach { _(request) }

sendRequest(request) respond { k =>
log.debug("Response: " + k.toString)
handlers foreach { _(k) }
responseHandlers foreach { _(k) }
responseReqHandlers foreach { _(k, request) }
}
}

protected[server] def sendRequest(request: Req): Future[Rep]

Expand All @@ -47,7 +52,15 @@ trait ParrotTransport[Req <: ParrotRequest, Rep] extends Service[Req, Rep] {
def stats(response: Rep): Seq[String] = Nil

def respond(f: Try[Rep] => Unit) {
handlers += f
responseHandlers += f
}

def request(f: Req => Unit) {
requestHandlers += f
}

def respondReq(f: (Try[Rep], Req) => Unit) {
responseReqHandlers += f
}

def start() {
Expand Down