Skip to content

Commit

Permalink
[split] Buf, Reader: remove Buf.Eof; end-of-stream is None
Browse files Browse the repository at this point in the history
Problem

The reader interface is complicated substantially by using 'Buf.Eof'
as a sentinel value. It is a value that's confusing and brittle to
compare against (e.g. object equality must be used), and is just
unintuitive.

Solution

Signal EOF using an Option, where None = EOF. This is straightforward
to inspect. This also means we have to make Reader.writable() Closable
in order to signal EOF.

RB_ID=394095
  • Loading branch information
mariusae authored and CI committed Jul 7, 2014
1 parent 44b8e30 commit ae8bcdb
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[finagle] class DelayedReleaseService[-Req <: Request](
val httpResponse = in
override lazy val reader = new Reader {
def read(n: Int) = in.reader.read(n) respond {
case Return(Buf.Eof) => done()
case Return(None) => done()
case Throw(_) => done()
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.twitter.finagle.http

import com.twitter.io.{Buf, Reader => BufReader, Writer => BufWriter}
import com.twitter.finagle.netty3.ChannelBufferBuf
import com.twitter.util.{Await, Duration}
import com.twitter.util.{Await, Duration, Closable}
import java.io.{InputStream, InputStreamReader, OutputStream, OutputStreamWriter, Reader, Writer}
import java.util.{Iterator => JIterator}
import java.nio.charset.Charset
Expand Down Expand Up @@ -36,7 +36,7 @@ abstract class Message extends HttpMessage {
* A write-only handle to the internal stream of bytes, representing the
* message body. See [[com.twitter.util.Writer]] for more information.
**/
def writer: BufWriter = readerWriter
def writer: BufWriter with Closable = readerWriter

def isRequest: Boolean
def isResponse = !isRequest
Expand Down Expand Up @@ -409,7 +409,7 @@ abstract class Message extends HttpMessage {
}

/** End the response stream. */
def close() = writer.write(Buf.Eof)
def close() = writer.close()

private[this] def writeChunk(buf: ChannelBuffer) {
if (buf.readable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import org.jboss.netty.handler.codec.http.{HttpChunk, DefaultHttpChunk}
import org.jboss.netty.buffer.ChannelBuffers

private[http] object NullReader extends Reader {
def read(n: Int) = ReaderUtils.eof
def read(n: Int) = Future.None
def discard() { }
}

private[http] object ReaderUtils {
val eof = Future.value(Buf.Eof)

/**
* Implement a Reader given a Transport. The Reader represents a byte
* stream, so it is useful to know when the stream has finished. This end of
Expand All @@ -25,58 +23,61 @@ private[http] object ReaderUtils {
def readerFromTransport(trans: Transport[Any, Any], done: Promise[Unit]): Reader =
new Reader {
private[this] val mu = new AsyncMutex
@volatile private[this] var buf = Buf.Empty
@volatile private[this] var buf: Option[Buf] = Some(Buf.Empty) // None = EOF

// We don't want to rely on scheduler semantics for ordering.
def read(n: Int): Future[Buf] = mu.acquire() flatMap { permit =>
val readOp = if (buf eq Buf.Eof) {
eof
} else if (buf.length > 0) {
val f = Future.value(buf.slice(0, n))
buf = buf.slice(n, Int.MaxValue)
f
} else trans.read() flatMap {
private[this] def fill(): Future[Unit] = {
trans.read() flatMap {
// Buf is empty so set it to the result of trans.read()
case chunk: HttpChunk if chunk.isLast =>
done.setDone()
buf = Buf.Eof
eof
buf = None
Future.Done

case chunk: HttpChunk =>
// Read data -- return up to n bytes and save the rest
val cbb = ChannelBufferBuf(chunk.getContent)
val f = Future.value(cbb.slice(0, n))
buf = cbb.slice(n, Int.MaxValue)
f
buf = Some(ChannelBufferBuf(chunk.getContent))
Future.Done

case invalid =>
val exc = new IllegalArgumentException(
"invalid message \"%s\"".format(invalid))
buf = None
Future.exception(exc)
}

readOp onFailure { exc =>
trans.close()
done.updateIfEmpty(Throw(exc))
} ensure { permit.release() }
}

def read(n: Int): Future[Option[Buf]] =
mu.acquire() flatMap { permit =>
def go(): Future[Option[Buf]] = buf match {
case None =>
done.setDone()
Future.None
case Some(buf) if buf.isEmpty =>
fill() before go()
case Some(nonempty) =>
val f = Future.value(Some(nonempty.slice(0, n)))
buf = Some(nonempty.slice(n, Int.MaxValue))
f
}

go() onFailure { exc =>
trans.close()
done.updateIfEmpty(Throw(exc))
} ensure { permit.release() }
}

def discard() {
// Any interrupt to `read` will result in transport closure, but we also
// call `trans.close` here to handle the case where a discard is called
// call `trans.close()` here to handle the case where a discard is called
// without interrupting the `read` operation.
trans.close()
}
}

/**
* Translates a Buf into HttpChunk. Beware: Buf.Empty will have the same
* effect as Buf.Eof, which if used incorrectly can prematurely signal the end
* Translates a Buf into HttpChunk. Beware: an empty buffer indicates end
* of stream.
*/
def chunkOfBuf(buf: Buf): HttpChunk = buf match {
case Buf.Eof =>
HttpChunk.LAST_CHUNK
case cb: ChannelBufferBuf =>
new DefaultHttpChunk(cb.buf)
case buf =>
Expand All @@ -93,11 +94,9 @@ private[http] object ReaderUtils {
bufSize: Int = Int.MaxValue
): Future[Unit] =
r.read(bufSize) flatMap {
// Ignore Buf.Empty as they can prematurely end the stream.
case buf if buf eq Buf.Empty => streamChunks(trans, r)
case buf =>
val chunk = chunkOfBuf(buf)
if (chunk.isLast) trans.write(chunk)
else trans.write(chunk) before streamChunks(trans, r)
case None =>
trans.write(HttpChunk.LAST_CHUNK)
case Some(buf) =>
trans.write(chunkOfBuf(buf)) before streamChunks(trans, r)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DelayedReleaseServiceTest extends FunSuite with MockitoSugar {
verify(service, never).close()
Reader.readAll(response.reader)
}
request.response.close() // writes Buf.Eof
request.response.close() // EOF
verify(service).close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ class EndToEndTest extends FunSuite with BeforeAndAfter {
* Read `n` number of bytes from the bytestream represented by `r`.
*/
def readNBytes(n: Int, r: Reader): Future[Buf] = {
def loop(buf: Buf): Future[Buf] = (n - buf.length) match {
def loop(left: Buf): Future[Buf] = (n - left.length) match {
case x if x > 0 =>
r.read(x) flatMap {
case Buf.Eof => Future.value(buf)
case next => loop(buf concat next)
case Some(right) => loop(left concat right)
case None => Future.value(left)
}
case _ => Future.value(buf)
case _ => Future.value(left)
}

loop(Buf.Empty)
Expand Down Expand Up @@ -205,7 +205,7 @@ class EndToEndTest extends FunSuite with BeforeAndAfter {
val res = Response()
res.setChunked(true)
def go = for {
c <- req.reader.read(Int.MaxValue)
Some(c) <- req.reader.read(Int.MaxValue)
_ <- res.writer.write(c)
_ <- res.close()
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ class HttpClientDispatcherTest extends FunSuite {
assert(!c.isDefined)
req.writer.write(Buf.Utf8("a"))
out.read() flatMap { c => out.write(c) }
assert(Await.result(c) === Buf.Utf8("a"))
assert(Await.result(c) === Some(Buf.Utf8("a")))

val cc = res.reader.read(Int.MaxValue)
assert(!cc.isDefined)
req.writer.write(Buf.Utf8("some other thing"))
out.read() flatMap { c => out.write(c) }
assert(Await.result(cc) === Buf.Utf8("some other thing"))
assert(Await.result(cc) === Some(Buf.Utf8("some other thing")))

val last = res.reader.read(Int.MaxValue)
assert(!last.isDefined)
req.close()
out.read() flatMap { c => out.write(c) }
assert(Await.result(last) === Buf.Eof)
assert(Await.result(last).isEmpty)
}

test("invalid message") {
Expand Down Expand Up @@ -92,14 +92,14 @@ class HttpClientDispatcherTest extends FunSuite {

val c = reader.read(Int.MaxValue)
out.write(chunk("hello"))
assert(Await.result(c) === Buf.Utf8("hello"))
assert(Await.result(c) === Some(Buf.Utf8("hello")))

val cc = reader.read(Int.MaxValue)
out.write(chunk("world"))
assert(Await.result(cc) === Buf.Utf8("world"))
assert(Await.result(cc) === Some(Buf.Utf8("world")))

out.write(HttpChunk.LAST_CHUNK)
assert(Await.result(reader.read(Int.MaxValue)) === Buf.Eof)
assert(Await.result(reader.read(Int.MaxValue)).isEmpty)
}

test("error mid-chunk") {
Expand All @@ -115,7 +115,7 @@ class HttpClientDispatcherTest extends FunSuite {

val c = reader.read(Int.MaxValue)
out.write(chunk("hello"))
assert(Await.result(c) === Buf.Utf8("hello"))
assert(Await.result(c) === Some(Buf.Utf8("hello")))

val cc = reader.read(Int.MaxValue)
out.write("something else")
Expand All @@ -135,7 +135,7 @@ class HttpClientDispatcherTest extends FunSuite {
override val httpMessage = reqIn
lazy val remoteSocketAddress = reqIn.remoteSocketAddress
override val reader = new Reader {
def read(n: Int) = Future.value(Buf.Eof)
def read(n: Int) = Future.value(None)
def discard() {
discardp.setDone()
}
Expand Down

0 comments on commit ae8bcdb

Please sign in to comment.