Skip to content

Commit

Permalink
[split] Improve the Dtab API
Browse files Browse the repository at this point in the history
Problem

The current API is asymmetric: apply() doesn't refer to the same thing
as update(). This is highly confusing and even dangerous. The
relationship between the base Dtab and the local Dtab is unclear and
confusing. It's a bad API. Further, Java usage of Dtab was littered
with MODULE$ references.

Solution

Deconflate 'base' from 'local'. Instead of Dtab() and Dtab.base, we
have instead Dtab.base and Dtab.local. These are accessed and updated
in the standard way. Dtab users (i.e. BindingFactory, a class in
ConfigBus) must now compose the Dtab explicitly:
Dtab.base++Dtab.local. Also get rid of Dtab.baseDiff() -- protocols
simply encode Dtab.local.

We also ensure that Dtabs are tested with a nonempty Dtab.base, so
that this does not accidentally leak.

RB_ID=363999
  • Loading branch information
mariusae authored and CI committed May 29, 2014
1 parent b70c1e1 commit 1412039
Show file tree
Hide file tree
Showing 22 changed files with 870 additions and 46 deletions.
72 changes: 53 additions & 19 deletions finagle-core/src/main/scala/com/twitter/finagle/Dtab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ case class Dtab(dentries0: IndexedSeq[Dentry])

def apply(i: Int): Dentry = dentries0(i)
def length = dentries0.length
override def isEmpty = length == 0

def lookup(path: Path): Activity[NameTree[Name]] = {
val matches = dentries collect {
Expand All @@ -40,11 +41,23 @@ case class Dtab(dentries0: IndexedSeq[Dentry])
def +(dentry: Dentry): Dtab =
Dtab(dentries0 :+ dentry)

/**
* Java API for '+'
*/
def append(dentry: Dentry): Dtab = this + dentry

/**
* Construct a new Dtab with the given dtab appended.
*/
def ++(dtab: Dtab): Dtab =
Dtab(dentries0 ++ dtab.dentries0)
def ++(dtab: Dtab): Dtab = {
if (dtab.isEmpty) this
else Dtab(dentries0 ++ dtab.dentries0)
}

/**
* Java API for '++'
*/
def concat(dtab: Dtab): Dtab = this ++ dtab

/**
* Efficiently removes prefix `prefix` from `dtab`.
Expand Down Expand Up @@ -110,36 +123,57 @@ object Dentry {
val nop: Dentry = Dentry(Path.Utf8("/"), NameTree.Neg)
}

/**
* Object Dtab manages 'base' and 'local' Dtabs.
*/
object Dtab {
/**
* An empty delegation table.
*/
val empty: Dtab = Dtab(Vector.empty)

/**
* The base, or "system", or "global", delegation table applies to
* every request in this process. It is generally set at process
* startup, and not changed thereafter.
*/
@volatile var base: Dtab = empty

/**
* Java API for ``base_=``
*/
def setBase(dtab: Dtab) { base = dtab }

private[this] val l = new Local[Dtab]

def apply(): Dtab = l() getOrElse base
def update(dtab: Dtab) { l() = dtab }
def clear() { l.clear() }
/**
* The local, or "per-request", delegation table applies to the
* current [[com.twitter.util.Local Local]] scope which is usually
* defined on a per-request basis. Finagle uses the Dtab
* ``Dtab.base ++ Dtab.local`` to bind
* [[com.twitter.finagle.Name.Path Paths]].
*
* Local's scope is dictated by [[com.twitter.util.Local Local]].
*
* The local dtab is serialized into outbound requests when
* supported protocols are used. (Http, Thrift via TTwitter, Mux,
* and ThriftMux are among these.) The upshot is that ``local`` is
* defined for the entire request graph, so that a local dtab
* defined here will apply to downstream services as well.
*/
def local: Dtab = l() getOrElse Dtab.empty
def local_=(dtab: Dtab) { l() = dtab }

/**
* Java API for ``local_=``
*/
def setLocal(dtab: Dtab) { local = dtab }

def unwind[T](f: => T): T = {
val save = l()
try f finally l.set(save)
}

def delegate(dentry: Dentry) {
this() = this() + dentry
}

def delegate(dtab: Dtab) {
this() = this() ++ dtab
}

/**
* Retrieve the difference between the base dtab
* and the current local dtab.
*/
def baseDiff(): Dtab = this().stripPrefix(base)

/**
* Parse a Dtab from string `s` with concrete syntax
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[finagle] class BindingFactory[Req, Rep](
}

def apply(conn: ClientConnection): Future[Service[Req, Rep]] =
dtabCache(Dtab(), conn)
dtabCache(Dtab.base ++ Dtab.local, conn)

def close(deadline: Time) =
Closable.sequence(dtabCache, nameCache).close(deadline)
Expand Down
19 changes: 19 additions & 0 deletions finagle-core/src/test/java/com/twitter/finagle/MyTestDtab.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.twitter.finagle;

/**
* A Java compilation test for Dtab manipulation.
*/

public class MyTestDtab {
static {
Dtab d = Dtab.empty();
d = Dtab.local();
d = Dtab.base();
Dtab.setLocal(d);
Dtab base = Dtab.base();
Dtab.setBase(Dtab.empty());
Dtab.setBase(base);
d = Dtab.local().concat(Dtab.base());
d = Dtab.local().append(Dentry.read("/foo=>/bar"));
}
}
13 changes: 13 additions & 0 deletions finagle-core/src/test/scala/com/twitter/finagle/DtabTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,26 @@ class DtabTest extends FunSuite {
val d1 = Dtab.read("/foo => /bar")
val d2 = Dtab.read("/foo=>/biz;/biz=>/$/inet//8080;/bar=>/$/inet//9090")

assert(d1++d2 === Dtab.read("""
/foo=>/bar;
/foo=>/biz;
/biz=>/$/inet//8080;
/bar=>/$/inet//9090
"""))

def assertEval(dtab: Dtab, path: Path, expect: Name*) {
assert((dtab orElse Namer.global).bind(NameTree.Leaf(path)).sample().eval === Some(expect.toSet))
}

assertEval(d1++d2, Path.read("/foo"), Name.bound(new InetSocketAddress(8080)))
assertEval(d2++d1, Path.read("/foo"), Name.bound(new InetSocketAddress(9090)))
}

test("d1 ++ Dtab.empty") {
val d1 = Dtab.read("/foo=>/bar;/biz=>/baz")

assert(d1 ++ Dtab.empty === d1)
}

test("Dtab.stripPrefix") {
val d1, d2 = Dtab.read("/foo=>/bar;/baz=>/xxx/yyy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,32 @@ package com.twitter.finagle.factory

import com.twitter.finagle._
import com.twitter.util.{Await, Future, Time, Var, Activity, Promise, Throw, Return}
import java.net.SocketAddress
import java.net.{InetSocketAddress, SocketAddress}
import org.junit.runner.RunWith
import org.mockito.Matchers.any
import org.mockito.ArgumentCaptor
import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.stubbing.Answer
import org.mockito.invocation.InvocationOnMock
import org.scalatest.FunSuite
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.junit.JUnitRunner
import org.scalatest.mock.MockitoSugar
import com.twitter.finagle.stats._

@RunWith(classOf[JUnitRunner])
class BindingFactoryTest extends FunSuite with MockitoSugar {
class BindingFactoryTest extends FunSuite with MockitoSugar with BeforeAndAfter {
var saveBase: Dtab = Dtab.empty
before {
saveBase = Dtab.base
Dtab.base = Dtab.read("""
/test1010=>/$/inet//1010
""")
}

after {
Dtab.base = saveBase
}

def anonNamer() = new Namer {
def lookup(path: Path): Activity[NameTree[Name]] =
Activity.value(NameTree.Neg)
Expand Down Expand Up @@ -50,11 +62,20 @@ class BindingFactoryTest extends FunSuite with MockitoSugar {

def newWith(localDtab: Dtab): Service[Unit, Var[Addr]] = {
Dtab.unwind {
Dtab() = localDtab
Dtab.local = localDtab
Await.result(factory())
}
}
}

test("Uses Dtab.base") (new Ctx {
val n1 = Dtab.read("/foo/bar=>/test1010")
val s1 = newWith(n1)
val v1 = Await.result(s1(()))
assert(v1.sample() === Addr.Bound(new InetSocketAddress(1010)))

s1.close()
})

test("Caches namers") (new Ctx {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ private[finagle] class DtabHttpDispatcher(
import GenSerialClientDispatcher.wrapWriteException

protected def dispatch(req: HttpRequest, p: Promise[HttpResponse]): Future[Unit] = {
HttpDtab.write(Dtab.baseDiff(), req)
HttpDtab.write(Dtab.local, req)
trans.write(req) rescue(wrapWriteException) flatMap { _ =>
trans.read() flatMap {
case res: HttpResponse =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class HttpClientDispatcher[Req <: HttpRequest](
// It's kind of nasty to modify the request inline like this, but it's
// in-line with what we already do in finagle-http. For example:
// the body buf gets read without slicing.
HttpDtab.write(Dtab.baseDiff(), req)
HttpDtab.write(Dtab.local, req)
trans.write(req) rescue(wrapWriteException) before
trans.read() flatMap {
case res: HttpResponse if !res.isChunked =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class DtabFilter[Req <: HttpMessage, Rep <: HttpMessage]
def apply(req: Req, service: Service[Req, Rep]) = {
val dtab = HttpDtab.read(req)
if (dtab.isEmpty) service(req) else Dtab.unwind {
Dtab.delegate(dtab)
Dtab.local ++= dtab
HttpDtab.clear(req)
service(req)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,21 @@ import java.net.InetSocketAddress
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.handler.codec.http._
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class EndToEndTest extends FunSuite {
class EndToEndTest extends FunSuite with BeforeAndAfter {
var saveBase: Dtab = Dtab.empty
before {
saveBase = Dtab.base
Dtab.base = Dtab.read("/foo=>/bar; /baz=>/biz")
}

after {
Dtab.base = saveBase
}

type HttpService = Service[HttpRequest, HttpResponse]
type RichHttpService = Service[Request, Response]

Expand Down Expand Up @@ -60,7 +70,7 @@ class EndToEndTest extends FunSuite {
def apply(request: HttpRequest) = {
val stringer = new StringWriter
val printer = new PrintWriter(stringer)
Dtab.baseDiff().print(printer)
Dtab.local.print(printer)
val response = Response(request)
response.contentString = stringer.toString
Future.value(response)
Expand All @@ -70,7 +80,7 @@ class EndToEndTest extends FunSuite {
val client = connect(service)

Dtab.unwind {
Dtab.delegate(Dtab.read("/a=>/b; /c=>/d"))
Dtab.local ++= Dtab.read("/a=>/b; /c=>/d")

val res = Response(Await.result(client(Request("/"))))
assert(res.contentString === "Dtab(2)\n\t/a => /b\n\t/c => /d\n")
Expand All @@ -79,6 +89,26 @@ class EndToEndTest extends FunSuite {
client.close()
}

test(name + ": (no) dtab") {
val service = new HttpService {
def apply(request: HttpRequest) = {
val stringer = new StringWriter
val printer = new PrintWriter(stringer)

val response = Response(request)
response.contentString = "%d".format(Dtab.local.length)
Future.value(response)
}
}

val client = connect(service)

val res = Response(Await.result(client(Request("/"))))
assert(res.contentString === "0")

client.close()
}

test(name + ": stream") {
def service(r: Reader) = new HttpService {
def apply(request: HttpRequest) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private[finagle] class ClientDispatcher (
val contexts = Context.emit() map { case (k, v) =>
(BufChannelBuffer(k), BufChannelBuffer(v))
}
Tdispatch(tag, contexts.toSeq, "", Dtab.baseDiff(), req)
Tdispatch(tag, contexts.toSeq, "", Dtab.local, req)
}

if (traceWrite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[finagle] class ServerDispatcher(
Context.handle(ChannelBufferBuf(k), ChannelBufferBuf(v))
Trace.record(Annotation.ServerRecv())
if (dtab.length > 0)
Dtab.delegate(dtab)
Dtab.local ++= dtab
val f = service(req)
pending.put(tag, f)
f respond {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ClientDispatcher private[exp](
val contexts = Context.emit().map({ case (k, v) =>
(BufChannelBuffer(k), BufChannelBuffer(v))
}).toSeq
trans.write(encode(Tdispatch(MarkerTag, contexts, "", Dtab.baseDiff(), BufChannelBuffer(buf))))
trans.write(encode(Tdispatch(MarkerTag, contexts, "", Dtab.local, BufChannelBuffer(buf))))
}

/**
Expand Down Expand Up @@ -135,7 +135,7 @@ class ClientDispatcher private[exp](
val contexts = Context.emit().map({ case (k, v) =>
(BufChannelBuffer(k), BufChannelBuffer(v))
}).toSeq
val dtab = Dtab.baseDiff()
val dtab = Dtab.local
val traceId = Some(Trace.id)

def terminal(content: Buf): ChannelBuffer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class Session private[finagle](
for ((k, v) <- contexts)
Context.handle(ChannelBufferBuf(k), ChannelBufferBuf(v))
if (dtab.length > 0)
Dtab.delegate(dtab)
Dtab.local ++= dtab

service.send(ChannelBufferBuf(buf))

Expand All @@ -109,7 +109,7 @@ class Session private[finagle](
for ((k, v) <- contexts)
Context.handle(ChannelBufferBuf(k), ChannelBufferBuf(v))
if (dtab.length > 0)
Dtab.delegate(dtab)
Dtab.local ++= dtab

val source = incoming.getOrElseUpdate(masked, {
val source = new SpoolSource[Buf]
Expand Down
Loading

0 comments on commit 1412039

Please sign in to comment.