diff --git a/finagle-http/src/main/scala/com/twitter/finagle/http/DelayedReleaseService.scala b/finagle-http/src/main/scala/com/twitter/finagle/http/DelayedReleaseService.scala index 5593949fe7..3ac69c247c 100644 --- a/finagle-http/src/main/scala/com/twitter/finagle/http/DelayedReleaseService.scala +++ b/finagle-http/src/main/scala/com/twitter/finagle/http/DelayedReleaseService.scala @@ -27,7 +27,7 @@ private[finagle] class DelayedReleaseService[-Req <: Request]( val httpResponse = in override lazy val reader = new Reader { def read(n: Int) = in.reader.read(n) respond { - case Return(Buf.Eof) => done() + case Return(None) => done() case Throw(_) => done() case _ => } diff --git a/finagle-http/src/main/scala/com/twitter/finagle/http/Message.scala b/finagle-http/src/main/scala/com/twitter/finagle/http/Message.scala index 87e1ebe3e4..52b8436be7 100644 --- a/finagle-http/src/main/scala/com/twitter/finagle/http/Message.scala +++ b/finagle-http/src/main/scala/com/twitter/finagle/http/Message.scala @@ -2,7 +2,7 @@ package com.twitter.finagle.http import com.twitter.io.{Buf, Reader => BufReader, Writer => BufWriter} import com.twitter.finagle.netty3.ChannelBufferBuf -import com.twitter.util.{Await, Duration} +import com.twitter.util.{Await, Duration, Closable} import java.io.{InputStream, InputStreamReader, OutputStream, OutputStreamWriter, Reader, Writer} import java.util.{Iterator => JIterator} import java.nio.charset.Charset @@ -36,7 +36,7 @@ abstract class Message extends HttpMessage { * A write-only handle to the internal stream of bytes, representing the * message body. See [[com.twitter.util.Writer]] for more information. **/ - def writer: BufWriter = readerWriter + def writer: BufWriter with Closable = readerWriter def isRequest: Boolean def isResponse = !isRequest @@ -409,7 +409,7 @@ abstract class Message extends HttpMessage { } /** End the response stream. */ - def close() = writer.write(Buf.Eof) + def close() = writer.close() private[this] def writeChunk(buf: ChannelBuffer) { if (buf.readable) { diff --git a/finagle-http/src/main/scala/com/twitter/finagle/http/ReaderUtils.scala b/finagle-http/src/main/scala/com/twitter/finagle/http/ReaderUtils.scala index 71f6ecacba..ab93bd5c52 100644 --- a/finagle-http/src/main/scala/com/twitter/finagle/http/ReaderUtils.scala +++ b/finagle-http/src/main/scala/com/twitter/finagle/http/ReaderUtils.scala @@ -9,13 +9,11 @@ import org.jboss.netty.handler.codec.http.{HttpChunk, DefaultHttpChunk} import org.jboss.netty.buffer.ChannelBuffers private[http] object NullReader extends Reader { - def read(n: Int) = ReaderUtils.eof + def read(n: Int) = Future.None def discard() { } } private[http] object ReaderUtils { - val eof = Future.value(Buf.Eof) - /** * Implement a Reader given a Transport. The Reader represents a byte * stream, so it is useful to know when the stream has finished. This end of @@ -25,58 +23,61 @@ private[http] object ReaderUtils { def readerFromTransport(trans: Transport[Any, Any], done: Promise[Unit]): Reader = new Reader { private[this] val mu = new AsyncMutex - @volatile private[this] var buf = Buf.Empty + @volatile private[this] var buf: Option[Buf] = Some(Buf.Empty) // None = EOF - // We don't want to rely on scheduler semantics for ordering. - def read(n: Int): Future[Buf] = mu.acquire() flatMap { permit => - val readOp = if (buf eq Buf.Eof) { - eof - } else if (buf.length > 0) { - val f = Future.value(buf.slice(0, n)) - buf = buf.slice(n, Int.MaxValue) - f - } else trans.read() flatMap { + private[this] def fill(): Future[Unit] = { + trans.read() flatMap { // Buf is empty so set it to the result of trans.read() case chunk: HttpChunk if chunk.isLast => - done.setDone() - buf = Buf.Eof - eof + buf = None + Future.Done case chunk: HttpChunk => // Read data -- return up to n bytes and save the rest - val cbb = ChannelBufferBuf(chunk.getContent) - val f = Future.value(cbb.slice(0, n)) - buf = cbb.slice(n, Int.MaxValue) - f + buf = Some(ChannelBufferBuf(chunk.getContent)) + Future.Done case invalid => val exc = new IllegalArgumentException( "invalid message \"%s\"".format(invalid)) + buf = None Future.exception(exc) } - - readOp onFailure { exc => - trans.close() - done.updateIfEmpty(Throw(exc)) - } ensure { permit.release() } } + def read(n: Int): Future[Option[Buf]] = + mu.acquire() flatMap { permit => + def go(): Future[Option[Buf]] = buf match { + case None => + done.setDone() + Future.None + case Some(buf) if buf.isEmpty => + fill() before go() + case Some(nonempty) => + val f = Future.value(Some(nonempty.slice(0, n))) + buf = Some(nonempty.slice(n, Int.MaxValue)) + f + } + + go() onFailure { exc => + trans.close() + done.updateIfEmpty(Throw(exc)) + } ensure { permit.release() } + } + def discard() { // Any interrupt to `read` will result in transport closure, but we also - // call `trans.close` here to handle the case where a discard is called + // call `trans.close()` here to handle the case where a discard is called // without interrupting the `read` operation. trans.close() } } /** - * Translates a Buf into HttpChunk. Beware: Buf.Empty will have the same - * effect as Buf.Eof, which if used incorrectly can prematurely signal the end + * Translates a Buf into HttpChunk. Beware: an empty buffer indicates end * of stream. */ def chunkOfBuf(buf: Buf): HttpChunk = buf match { - case Buf.Eof => - HttpChunk.LAST_CHUNK case cb: ChannelBufferBuf => new DefaultHttpChunk(cb.buf) case buf => @@ -93,11 +94,9 @@ private[http] object ReaderUtils { bufSize: Int = Int.MaxValue ): Future[Unit] = r.read(bufSize) flatMap { - // Ignore Buf.Empty as they can prematurely end the stream. - case buf if buf eq Buf.Empty => streamChunks(trans, r) - case buf => - val chunk = chunkOfBuf(buf) - if (chunk.isLast) trans.write(chunk) - else trans.write(chunk) before streamChunks(trans, r) + case None => + trans.write(HttpChunk.LAST_CHUNK) + case Some(buf) => + trans.write(chunkOfBuf(buf)) before streamChunks(trans, r) } } diff --git a/finagle-http/src/test/scala/com/twitter/finagle/http/DelayedReleaseServiceTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/http/DelayedReleaseServiceTest.scala index 473f064090..b268f10659 100644 --- a/finagle-http/src/test/scala/com/twitter/finagle/http/DelayedReleaseServiceTest.scala +++ b/finagle-http/src/test/scala/com/twitter/finagle/http/DelayedReleaseServiceTest.scala @@ -37,7 +37,7 @@ class DelayedReleaseServiceTest extends FunSuite with MockitoSugar { verify(service, never).close() Reader.readAll(response.reader) } - request.response.close() // writes Buf.Eof + request.response.close() // EOF verify(service).close() } diff --git a/finagle-http/src/test/scala/com/twitter/finagle/http/EndToEndTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/http/EndToEndTest.scala index 9297cf335d..f9a2487544 100644 --- a/finagle-http/src/test/scala/com/twitter/finagle/http/EndToEndTest.scala +++ b/finagle-http/src/test/scala/com/twitter/finagle/http/EndToEndTest.scala @@ -41,13 +41,13 @@ class EndToEndTest extends FunSuite with BeforeAndAfter { * Read `n` number of bytes from the bytestream represented by `r`. */ def readNBytes(n: Int, r: Reader): Future[Buf] = { - def loop(buf: Buf): Future[Buf] = (n - buf.length) match { + def loop(left: Buf): Future[Buf] = (n - left.length) match { case x if x > 0 => r.read(x) flatMap { - case Buf.Eof => Future.value(buf) - case next => loop(buf concat next) + case Some(right) => loop(left concat right) + case None => Future.value(left) } - case _ => Future.value(buf) + case _ => Future.value(left) } loop(Buf.Empty) @@ -205,7 +205,7 @@ class EndToEndTest extends FunSuite with BeforeAndAfter { val res = Response() res.setChunked(true) def go = for { - c <- req.reader.read(Int.MaxValue) + Some(c) <- req.reader.read(Int.MaxValue) _ <- res.writer.write(c) _ <- res.close() } yield () diff --git a/finagle-http/src/test/scala/com/twitter/finagle/http/codec/HttpClientDispatcherTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/http/codec/HttpClientDispatcherTest.scala index ccb94230cd..ffb802e216 100644 --- a/finagle-http/src/test/scala/com/twitter/finagle/http/codec/HttpClientDispatcherTest.scala +++ b/finagle-http/src/test/scala/com/twitter/finagle/http/codec/HttpClientDispatcherTest.scala @@ -46,19 +46,19 @@ class HttpClientDispatcherTest extends FunSuite { assert(!c.isDefined) req.writer.write(Buf.Utf8("a")) out.read() flatMap { c => out.write(c) } - assert(Await.result(c) === Buf.Utf8("a")) + assert(Await.result(c) === Some(Buf.Utf8("a"))) val cc = res.reader.read(Int.MaxValue) assert(!cc.isDefined) req.writer.write(Buf.Utf8("some other thing")) out.read() flatMap { c => out.write(c) } - assert(Await.result(cc) === Buf.Utf8("some other thing")) + assert(Await.result(cc) === Some(Buf.Utf8("some other thing"))) val last = res.reader.read(Int.MaxValue) assert(!last.isDefined) req.close() out.read() flatMap { c => out.write(c) } - assert(Await.result(last) === Buf.Eof) + assert(Await.result(last).isEmpty) } test("invalid message") { @@ -92,14 +92,14 @@ class HttpClientDispatcherTest extends FunSuite { val c = reader.read(Int.MaxValue) out.write(chunk("hello")) - assert(Await.result(c) === Buf.Utf8("hello")) + assert(Await.result(c) === Some(Buf.Utf8("hello"))) val cc = reader.read(Int.MaxValue) out.write(chunk("world")) - assert(Await.result(cc) === Buf.Utf8("world")) + assert(Await.result(cc) === Some(Buf.Utf8("world"))) out.write(HttpChunk.LAST_CHUNK) - assert(Await.result(reader.read(Int.MaxValue)) === Buf.Eof) + assert(Await.result(reader.read(Int.MaxValue)).isEmpty) } test("error mid-chunk") { @@ -115,7 +115,7 @@ class HttpClientDispatcherTest extends FunSuite { val c = reader.read(Int.MaxValue) out.write(chunk("hello")) - assert(Await.result(c) === Buf.Utf8("hello")) + assert(Await.result(c) === Some(Buf.Utf8("hello"))) val cc = reader.read(Int.MaxValue) out.write("something else") @@ -135,7 +135,7 @@ class HttpClientDispatcherTest extends FunSuite { override val httpMessage = reqIn lazy val remoteSocketAddress = reqIn.remoteSocketAddress override val reader = new Reader { - def read(n: Int) = Future.value(Buf.Eof) + def read(n: Int) = Future.value(None) def discard() { discardp.setDone() }