Skip to content

Commit

Permalink
[split] finagle-core: Improves FailFastFactory documentation Problem …
Browse files Browse the repository at this point in the history
…FailFastFactory behavior when multiple threads are asking for a client simultaneously was nonobvious to me.

Solution
Wrote a test to verify my understanding and added documentation.  I also added a stat and documented it in the user guide.

Result
FailFastFactory is a little better documented.

RB_ID=345617
  • Loading branch information
mosesn authored and CI committed Jun 6, 2014
1 parent cc1dfc5 commit fd3c9d0
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 12 deletions.
3 changes: 3 additions & 0 deletions doc/src/sphinx/metrics/FailFast.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
FailFastFactory
<<<<<<<<<<<<<<<

**marked_dead**
a counter of how many times the FailFastFactory has been marked dead

**unhealthy_for_ms**
a gauge of how long the FailFastFactory has been retrying for this failure

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,17 @@ case class DefaultClient[Req, Rep](
loadBalancer: WeightedLoadBalancerFactory = DefaultBalancerFactory
) extends Client[Req, Rep] {
com.twitter.finagle.Init()
val globalStatsReceiver = new RollupStatsReceiver(statsReceiver)
private[this] val log = Logger.getLogger(getClass.getName)

/** Bind a socket address to a well-formed stack */
val bindStack: SocketAddress => ServiceFactory[Req, Rep] = sa => {
val hostStats = if (hostStatsReceiver.isNull) globalStatsReceiver else {
val hostStats = if (hostStatsReceiver.isNull) statsReceiver else {
val host = new RollupStatsReceiver(hostStatsReceiver.scope(
sa match {
case ia: InetSocketAddress => "%s:%d".format(ia.getHostName, ia.getPort)
case other => other.toString
}))
BroadcastStatsReceiver(Seq(host, globalStatsReceiver))
BroadcastStatsReceiver(Seq(host, statsReceiver))
}

val lifetimeLimited: Transformer[Req, Rep] = {
Expand Down Expand Up @@ -160,7 +159,7 @@ case class DefaultClient[Req, Rep](
val traced: Transformer[Req, Rep] = new TracingFilter[Req, Rep](tracer, name) andThen _

val observed: Transformer[Req, Rep] =
new StatsFactoryWrapper(_, globalStatsReceiver.scope("service_creation"))
new StatsFactoryWrapper(_, statsReceiver.scope("service_creation"))

val noBrokersException = new NoBrokersAvailableException(name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ private[finagle] object FailFastFactory {
}

/**
* An experimental fail-fast factory that attempts to reduce
* the amount of requests dispatched to endpoints that will
* anyway fail. It works by marking a host dead on failure,
* launching a background process that attempts to reestablish
* the connection with the given backoff schedule. At this time,
* the factory is marked unavailable (and thus the load balancer
* above it will avoid its use). The factory becomes available
* again on success or when the backoff schedule runs out.
* A fail-fast factory that attempts to reduce the amount of requests dispatched
* to endpoints that will anyway fail. It works by marking a host dead on
* failure, launching a background process that attempts to reestablish the
* connection with the given backoff schedule. At this time, the factory is
* marked unavailable (and thus the load balancer above it will avoid its
* use). The factory becomes available again on success or when the backoff
* schedule runs out.
*
* Inflight attempts to connect will continue uninterrupted. However, trying to
* connect *after* being marked dead will fail fast until the background process
* is able to establish a connection.
*/
private[finagle] class FailFastFactory[Req, Rep](
self: ServiceFactory[Req, Rep],
Expand Down Expand Up @@ -77,6 +80,7 @@ private[finagle] class FailFastFactory[Req, Rep](
val wait #:: rest = getBackoffs()
val now = Time.now
val task = timer.schedule(now + wait) { proc ! Observation.Timeout }
markedDeadCounter.incr()
state = Retrying(now, task, 0, rest)

case Observation.TimeoutFail if state != Ok =>
Expand Down Expand Up @@ -127,6 +131,8 @@ private[finagle] class FailFastFactory[Req, Rep](
}
}

private[this] val markedDeadCounter = statsReceiver.counter("marked_dead")

override def apply(conn: ClientConnection) =
if (state != Ok) failedFastExc else {
self(conn) respond {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,22 @@ class FailFastFactoryTest extends FunSuite with MockitoSugar {
assert(failfast.isAvailable === underlying.isAvailable)
}
}

test("fails simultaneous requests properly") {
Time.withCurrentTimeFrozen { tc =>
val ctx = newCtx()
import ctx._

val pp2 = failfast()
val e = new Exception
p() = Throw(e)

assert(pp.poll === Some(Throw(e)))
assert(pp2.poll === Some(Throw(e)))

intercept[FailedFastException] {
failfast().poll.get.get
}
}
}
}

0 comments on commit fd3c9d0

Please sign in to comment.