Jetty I/O Architecture

The Jetty libraries (both client and server) use Java NIO to handle I/O, so that at its core Jetty I/O is completely non-blocking.

Jetty I/O: SelectorManager

The main class of The Jetty I/O library is SelectorManager.

SelectorManager manages internally a configurable number of ManagedSelectors. Each ManagedSelector wraps an instance of java.nio.channels.Selector that in turn manages a number of java.nio.channels.SocketChannel instances.

TODO: add image

SocketChannel instances are typically created by the Jetty implementation, on client-side when connecting to a server and on server-side when accepting connections from clients. In both cases the SocketChannel instance is passed to SelectorManager (which passes it to ManagedSelector and eventually to java.nio.channels.Selector) to be registered for use within Jetty.

It is possible for an application to create the SocketChannel instances outside Jetty, even perform some initial network traffic also outside Jetty (for example for authentication purposes), and then pass the SocketChannel instance to SelectorManager for use within Jetty.

This example shows how a client can connect to a server:

public void connect(SelectorManager selectorManager, Map<String, Object> context) throws IOException
{
    String host = "host";
    int port = 8080;

    // Create an unconnected SocketChannel.
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);

    // Connect and register to Jetty.
    if (socketChannel.connect(new InetSocketAddress(host, port)))
        selectorManager.accept(socketChannel, context);
    else
        selectorManager.connect(socketChannel, context);
}

This example shows how a server accepts a client connection:

public void accept(ServerSocketChannel acceptor, SelectorManager selectorManager) throws IOException
{
    // Wait until a client connects.
    SocketChannel socketChannel = acceptor.accept();
    socketChannel.configureBlocking(false);

    // Accept and register to Jetty.
    Object attachment = null;
    selectorManager.accept(socketChannel, attachment);
}

Jetty I/O: EndPoint and Connection

SocketChannels that are passed to SelectorManager are wrapped into two related components: an EndPoint and a Connection.

EndPoint is the Jetty abstraction for a SocketChannel or a DatagramChannel: you can read bytes from an EndPoint, you can write bytes to an EndPoint , you can close an EndPoint, etc.

Connection is the Jetty abstraction that is responsible to read bytes from the EndPoint and to deserialize the read bytes into objects. For example, an HTTP/1.1 server-side Connection implementation is responsible to deserialize HTTP/1.1 request bytes into an HTTP request object. Conversely, an HTTP/1.1 client-side Connection implementation is responsible to deserialize HTTP/1.1 response bytes into an HTTP response object.

Connection is the abstraction that implements the reading side of a specific protocol such as HTTP/1.1, or HTTP/2, or HTTP/3, or WebSocket: it is able to read and parse incoming bytes in that protocol.

The writing side for a specific protocol may be implemented in the Connection but may also be implemented in other components, although eventually the bytes to write will be written through the EndPoint.

While there are primarily only two implementations of EndPoint,SocketChannelEndPoint for TCP and DatagramChannelEndPoint for UDP (used both on the client-side and on the server-side), there are many implementations of Connection, typically two for each protocol (one for the client-side and one for the server-side).

The EndPoint and Connection pairs can be chained, for example in case of encrypted communication using the TLS protocol. There is an EndPoint and Connection TLS pair where the EndPoint reads the encrypted bytes from the socket and the Connection decrypts them; next in the chain there is an EndPoint and Connection pair where the EndPoint "reads" decrypted bytes (provided by the previous Connection) and the Connection deserializes them into specific protocol objects (for example HTTP/2 frame objects).

Certain protocols, such as WebSocket, start the communication with the server using one protocol (for example, HTTP/1.1), but then change the communication to use another protocol (for example, WebSocket). EndPoint supports changing the Connection object on-the-fly via EndPoint.upgrade(Connection). This allows to use the HTTP/1.1 Connection during the initial communication and later to replace it with a WebSocket Connection.

SelectorManager is an abstract class because while it knows how to create concrete EndPoint instances, it does not know how to create protocol specific Connection instances.

Creating Connection instances is performed on the server-side by ConnectionFactorys and on the client-side by ClientConnectionFactorys.

On the server-side, the component that aggregates a SelectorManager with a set of ConnectionFactorys is ServerConnector for TCP sockets, QuicServerConnector for QUIC sockets, and UnixDomainServerConnector for Unix-Domain sockets (see the server-side architecture section for more information).

On the client-side, the components that aggregates a SelectorManager with a set of ClientConnectionFactorys are HttpClientTransport subclasses (see the client-side architecture section for more information).

Jetty I/O: EndPoint

The Jetty I/O library use Java NIO to handle I/O, so that I/O is non-blocking.

At the Java NIO level, in order to be notified when a SocketChannel or DatagramChannel has data to be read, the SelectionKey.OP_READ flag must be set.

In the Jetty I/O library, you can call EndPoint.fillInterested(Callback) to declare interest in the "read" (also called "fill") event, and the Callback parameter is the object that is notified when such an event occurs.

At the Java NIO level, a SocketChannel or DatagramChannel is always writable, unless it becomes congested. In order to be notified when a channel uncongests and it is therefore writable again, the SelectionKey.OP_WRITE flag must be set.

In the Jetty I/O library, you can call EndPoint.write(Callback, ByteBuffer…​) to write the ByteBuffers and the Callback parameter is the object that is notified when the whole write is finished (i.e. all ByteBuffers have been fully written, even if they are delayed by congestion/uncongestion).

The EndPoint APIs abstract out the Java NIO details by providing non-blocking APIs based on Callback objects for I/O operations. The EndPoint APIs are typically called by Connection implementations, see this section.

Jetty I/O: Connection

Connection is the abstraction that deserializes incoming bytes into objects, for example an HTTP request object or a WebSocket frame object, that can be used by more abstract layers.

Connection instances have two lifecycle methods:

  • Connection.onOpen(), invoked when the Connection is associated with the EndPoint.

  • Connection.onClose(Throwable), invoked when the Connection is disassociated from the EndPoint, where the Throwable parameter indicates whether the disassociation was normal (when the parameter is null) or was due to an error (when the parameter is not null).

When a Connection is first created, it is not registered for any Java NIO event. It is therefore typical to implement onOpen() to call EndPoint.fillInterested(Callback) so that the Connection declares interest for read events, and it is invoked (via the Callback) when the read event happens.

The abstract class AbstractConnection partially implements Connection and provides simpler APIs. The example below shows a typical implementation that extends AbstractConnection:

// Extend AbstractConnection to inherit basic implementation.
class MyConnection extends AbstractConnection
{
    public MyConnection(EndPoint endPoint, Executor executor)
    {
        super(endPoint, executor);
    }

    @Override
    public void onOpen()
    {
        super.onOpen();

        // Declare interest for fill events.
        // When the fill event happens, method onFillable() below is invoked.
        fillInterested();
    }

    @Override
    public void onFillable()
    {
        // Invoked when a fill event happens.
    }
}

Jetty I/O: Connection.Listener

The Jetty I/O library allows applications to register event listeners for the Connection events "opened" and "closed" via the interface Connection.Listener.

This is useful in many cases, for example:

  • Gather statistics about connection lifecycle, such as time of creation and duration.

  • Gather statistics about the number of concurrent connections, and take action if a threshold is exceeded.

  • Gather statistics about the number of bytes read and written, and the number of "messages" read and written, where "messages" may mean HTTP/1.1 requests or responses, or WebSocket frames, or HTTP/2 frames, etc.

  • Gather statistics about the different types of connections being opened (TLS, HTTP/1.1, HTTP/2, WebSocket, etc.).

  • Etc.

Connection.Listener implementations must be added as beans to a server-side Connector, or to client-side HttpClient, WebSocketClient, HTTP2Client or HTTP3Client. You can add as beans many Connection.Listener objects, each with its own logic, so that you can separate different logics into different Connection.Listener implementations.

The Jetty I/O library provides useful Connection.Listener implementations that you should evaluate before writing your own:

Here is a simple example of a Connection.Listener used both on the client and on the server:

class ThresholdConnectionListener implements Connection.Listener
{
    private final AtomicInteger connections = new AtomicInteger();

    private int threshold;
    private boolean notified;

    public ThresholdConnectionListener(int threshold)
    {
        this.threshold = threshold;
    }

    @Override
    public void onOpened(Connection connection)
    {
        int count = connections.incrementAndGet();
        if (count > threshold && !notified)
        {
            notified = true;
            System.getLogger("connection.threshold").log(System.Logger.Level.WARNING, "Connection threshold exceeded");
        }
    }

    @Override
    public void onClosed(Connection connection)
    {
        int count = connections.decrementAndGet();
        // Reset the alert when we are below 90% of the threshold.
        if (count < threshold * 0.9F)
            notified = false;
    }
}

// Configure server-side connectors with Connection.Listeners.
Server server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);
// Add statistics.
connector.addBean(new ConnectionStatistics());
// Add your own Connection.Listener.
connector.addBean(new ThresholdConnectionListener(2048));
server.start();

// Configure client-side HttpClient with Connection.Listeners.
HttpClient httpClient = new HttpClient();
// Add statistics.
httpClient.addBean(new ConnectionStatistics());
// Add your own Connection.Listener.
httpClient.addBean(new ThresholdConnectionListener(512));
httpClient.start();

Jetty I/O: TCP Network Echo

With the concepts above it is now possible to write a simple, fully non-blocking, Connection implementation that simply echoes the bytes that it reads back to the other peer.

A naive, but wrong, implementation may be the following:

class WrongEchoConnection extends AbstractConnection implements Callback
{
    public WrongEchoConnection(EndPoint endPoint, Executor executor)
    {
        super(endPoint, executor);
    }

    @Override
    public void onOpen()
    {
        super.onOpen();

        // Declare interest for fill events.
        fillInterested();
    }

    @Override
    public void onFillable()
    {
        try
        {
            ByteBuffer buffer = BufferUtil.allocate(1024);
            int filled = getEndPoint().fill(buffer);
            if (filled > 0)
            {
                // Filled some bytes, echo them back.
                getEndPoint().write(this, buffer);
            }
            else if (filled == 0)
            {
                // No more bytes to fill, declare
                // again interest for fill events.
                fillInterested();
            }
            else
            {
                // The other peer closed the
                // connection, close it back.
                getEndPoint().close();
            }
        }
        catch (Exception x)
        {
            getEndPoint().close(x);
        }
    }

    @Override
    public void succeeded()
    {
        // The write is complete, fill again.
        onFillable();
    }

    @Override
    public void failed(Throwable x)
    {
        getEndPoint().close(x);
    }
}
The implementation above is wrong and leads to StackOverflowError.

The problem with this implementation is that if the writes always complete synchronously (i.e. without being delayed by TCP congestion), you end up with this sequence of calls:

Connection.onFillable()
  EndPoint.write()
    Connection.succeeded()
      Connection.onFillable()
        EndPoint.write()
          Connection.succeeded()
          ...

which leads to StackOverflowError.

This is a typical side effect of asynchronous programming using non-blocking APIs, and happens in the Jetty I/O library as well.

The callback is invoked synchronously for efficiency reasons. Submitting the invocation of the callback to an Executor to be invoked in a different thread would cause a context switch and make simple writes extremely inefficient.

This side effect of asynchronous programming leading to StackOverflowError is so common that the Jetty libraries have a generic solution for it: a specialized Callback implementation named org.eclipse.jetty.util.IteratingCallback that turns recursion into iteration, therefore avoiding the StackOverflowError.

IteratingCallback is a Callback implementation that should be passed to non-blocking APIs such as EndPoint.write(Callback, ByteBuffer…​) when they are performed in a loop.

IteratingCallback works by starting the loop with IteratingCallback.iterate(). In turn, this calls IteratingCallback.process(), an abstract method that must be implemented with the code that should be executed for each loop.

Method process() must return:

  • Action.SCHEDULED, to indicate whether the loop has performed a non-blocking, possibly asynchronous, operation

  • Action.IDLE, to indicate that the loop should temporarily be suspended to be resumed later

  • Action.SUCCEEDED to indicate that the loop exited successfully

Any exception thrown within process() exits the loops with a failure.

Now that you know how IteratingCallback works, a correct implementation for the echo Connection is the following:

class EchoConnection extends AbstractConnection
{
    private final IteratingCallback callback = new EchoIteratingCallback();

    public EchoConnection(EndPoint endp, Executor executor)
    {
        super(endp, executor);
    }

    @Override
    public void onOpen()
    {
        super.onOpen();

        // Declare interest for fill events.
        fillInterested();
    }

    @Override
    public void onFillable()
    {
        // Start the iteration loop that reads and echoes back.
        callback.iterate();
    }

    class EchoIteratingCallback extends IteratingCallback
    {
        private ByteBuffer buffer;

        @Override
        protected Action process() throws Throwable
        {
            // Obtain a buffer if we don't already have one.
            if (buffer == null)
                buffer = BufferUtil.allocate(1024);

            int filled = getEndPoint().fill(buffer);
            if (filled > 0)
            {
                // We have filled some bytes, echo them back.
                getEndPoint().write(this, buffer);

                // Signal that the iteration should resume
                // when the write() operation is completed.
                return Action.SCHEDULED;
            }
            else if (filled == 0)
            {
                // We don't need the buffer anymore, so
                // don't keep it around while we are idle.
                buffer = null;

                // No more bytes to read, declare
                // again interest for fill events.
                fillInterested();

                // Signal that the iteration is now IDLE.
                return Action.IDLE;
            }
            else
            {
                // The other peer closed the connection,
                // the iteration completed successfully.
                return Action.SUCCEEDED;
            }
        }

        @Override
        protected void onCompleteSuccess()
        {
            // The iteration completed successfully.
            getEndPoint().close();
        }

        @Override
        protected void onCompleteFailure(Throwable cause)
        {
            // The iteration completed with a failure.
            getEndPoint().close(cause);
        }

        @Override
        public InvocationType getInvocationType()
        {
            return InvocationType.NON_BLOCKING;
        }
    }
}

When onFillable() is called, for example the first time that bytes are available from the network, the iteration is started. Starting the iteration calls process(), where a buffer is allocated and filled with bytes read from the network via EndPoint.fill(ByteBuffer); the buffer is subsequently written back via EndPoint.write(Callback, ByteBuffer…​) — note that the callback passed to EndPoint.write() is this, i.e. the IteratingCallback itself; finally Action.SCHEDULED is returned, returning from the process() method.

At this point, the call to EndPoint.write(Callback, ByteBuffer…​) may have completed synchronously; IteratingCallback would know that and call process() again; within process(), the buffer has already been allocated so it will be reused, saving further allocations; the buffer will be filled and possibly written again; Action.SCHEDULED is returned again, returning again from the process() method.

At this point, the call to EndPoint.write(Callback, ByteBuffer…​) may have not completed synchronously, so IteratingCallback will not call process() again; the processing thread is free to return to the Jetty I/O system where it may be put back into the thread pool. If this was the only active network connection, the system would now be idle, with no threads blocked, waiting that the write() completes. This thread-less wait is one of the most important features that make non-blocking asynchronous servers more scalable: they use less resources.

Eventually, the Jetty I/O system will notify that the write() completed; this notifies the IteratingCallback that can now resume the loop and call process() again.

When process() is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the Connection to read, or it may send them after a long pause — in both cases we do not want to retain the memory allocated by the buffer; next, you want to call fillInterested() to declare again interest for read events, and return Action.IDLE since there is nothing to write back and therefore the loop may be suspended. When more bytes are again available to be read from the network, onFillable() will be called again and that will start the iteration again.

Another possibility is that during process() the read returns -1 indicating that the other peer has closed the connection; this means that there will not be more bytes to read and the loop can be exited, so you return Action.SUCCEEDED; IteratingCallback will then call onCompleteSuccess() where you can close the EndPoint.

The last case is that during process() an exception is thrown, for example by EndPoint.fill(ByteBuffer) or, in more advanced implementations, by code that parses the bytes that have been read and finds them unacceptable; any exception thrown within process() will be caught by IteratingCallback that will exit the loop with a failure and call onCompleteFailure(Throwable) with the exception that has been thrown, where you can close the EndPoint, passing the exception that is the reason for closing prematurely the EndPoint.

Asynchronous programming is hard.

Rely on the Jetty classes to implement Connection to avoid mistakes that will be difficult to diagnose and reproduce.

Content.Source

The high-level abstraction that Jetty offers to read bytes is org.eclipse.jetty.io.Content.Source.

Content.Source offers a non-blocking demand/read model where a read returns a Content.Chunk (see also this section).

A Content.Chunk groups the following information:

  • A ByteBuffer with the bytes that have been read; it may be empty.

  • Whether the read reached end-of-file, via its last flag.

  • A failure that might have happened during the read, via its getFailure() method.

The Content.Chunk returned from Content.Source.read() can either be a normal chunk (a chunk containing a ByteBuffer and a null failure), or a failure chunk (a chunk containing an empty ByteBuffer and a non-null failure).

A failure chunk also indicates (via the last flag) whether the failure is a fatal (when last=true) or transient (when last=false) failure.

A transient failure is a temporary failure that happened during the read, it may be ignored, and it is recoverable: it is possible to call read() again and obtain a normal chunk (or a null chunk). Typical cases of transient failures are idle timeout failures, where the read timed out, but the application may decide to insist reading until some other event happens. The application may convert a transient failure into a fatal failure by calling Content.Source.fail(Throwable).

A Content.Source must be fully consumed by reading all its content, or failed by calling Content.Source.fail(Throwable) to signal that the reader is not interested in reading anymore, otherwise it may leak underlying resources.

Fully consuming a Content.Source means reading from it until it returns a Content.Chunk whose last flag is true. Reading or demanding from an already fully consumed Content.Source is always immediately serviced with the last state of the Content.Source: a Content.Chunk with the last flag set to true, either an end-of-file chunk, or a failure chunk.

Once failed, a Content.Source is considered fully consumed. Further attempts to read from a failed Content.Source return a failure chunk whose getFailure() method returns the exception passed to Content.Source.fail(Throwable).

When reading a normal chunk, its ByteBuffer is typically a slice of a different ByteBuffer that has been read by a lower layer. There may be multiple layers between the bottom layer (where the initial read typically happens) and the application layer that calls Content.Source.read().

By slicing the ByteBuffer (rather than copying its bytes), there is no copy of the bytes between the layers, which yields greater performance. However, this comes with the cost that the ByteBuffer, and the associated Content.Chunk, have an intrinsic lifecycle: the final consumer of a Content.Chunk at the application layer must indicate when it has consumed the chunk, so that the bottom layer may reuse/recycle the ByteBuffer.

Consuming the chunk means that the bytes in the ByteBuffer are read (or ignored), and that the application will not look at or reference that ByteBuffer ever again.

Content.Chunk offers a retain/release model to deal with the ByteBuffer lifecycle, with a simple rule:

A Content.Chunk returned by a call to Content.Source.read() must be released, except for Content.Chunks that are failure chunks. Failure chunks may be released, but they do not need to be.

The example below is the idiomatic way of reading from a Content.Source:

public void read(Content.Source source)
{
    // Read from the source in a loop.
    while (true)
    {
        // Read a chunk, must be eventually released.
        Content.Chunk chunk = source.read(); (1)

        // If no chunk, demand to be called back when there are more chunks.
        if (chunk == null)
        {
            source.demand(() -> read(source));
            return;
        }

        // If there is a failure reading, handle it.
        if (Content.Chunk.isFailure(chunk))
        {
            boolean fatal = chunk.isLast();
            if (fatal)
            {
                // A fatal failure, such as a network failure.
                handleFatalFailure(chunk.getFailure());
                // No recovery is possible, stop reading
                // by returning without demanding.
                return;
            }
            else
            {
                // A transient failure such as a read timeout.
                handleTransientFailure(chunk.getFailure());
                // Recovery is possible, try to read again.
                continue;
            }
        }

        // A normal chunk of content, consume it.
        consume(chunk);

        // Release the chunk.
        chunk.release(); (2)

        // Stop reading if EOF was reached.
        if (chunk.isLast())
            return;

        // Loop around to read another chunk.
    }
}
1 The read() that must be paired with a release().
2 The release() that pairs the read().

Note how the reads happen in a loop, consuming the Content.Source as soon as it has content available to be read, and therefore no backpressure is applied to the reads.

Calling Content.Chunk.release() must be done only after the bytes in the ByteBuffer returned by Content.Chunk.getByteBuffer() have been consumed. When the Content.Chunk is released, the implementation may reuse the ByteBuffer and overwrite the bytes with different bytes; if the application looks at the ByteBuffer after having released the Content.Chunk is may see other, unrelated, bytes.

An alternative way to read from a Content.Source, to use when the chunk is consumed asynchronously, and you don’t want to read again until the Content.Chunk is consumed, is the following:

public void read(Content.Source source)
{
    // Read a chunk, must be eventually released.
    Content.Chunk chunk = source.read(); (1)

    // If no chunk, demand to be called back when there are more chunks.
    if (chunk == null)
    {
        source.demand(() -> read(source));
        return;
    }

    // If there is a failure reading, always treat it as fatal.
    if (Content.Chunk.isFailure(chunk))
    {
        // If the failure is transient, fail the source
        // to indicate that there will be no more reads.
        if (!chunk.isLast())
            source.fail(chunk.getFailure());

        // Handle the failure and stop reading by not demanding.
        handleFatalFailure(chunk.getFailure());
        return;
    }

    // Consume the chunk asynchronously, and do not
    // read more chunks until this has been consumed.
    CompletableFuture<Void> consumed = consumeAsync(chunk);

    // Release the chunk.
    chunk.release(); (2)

    // Only when the chunk has been consumed try to read more.
    consumed.whenComplete((result, failure) ->
    {
        if (failure == null)
        {
            // Continue reading if EOF was not reached.
            if (!chunk.isLast())
                source.demand(() -> read(source));
        }
        else
        {
            // If there is a failure reading, handle it,
            // and stop reading by not demanding.
            handleFatalFailure(failure);
        }
    });
}
1 The read() that must be paired with a release().
2 The release() that pairs the read().

Note how the reads do not happen in a loop, and therefore backpressure is applied to the reads, because there is not a next read until the chunk from the previous read has been consumed (and this may take time).

Since the Chunk is consumed asynchronously, you may need to retain it to extend its lifecycle, as explained in this section.

You can use Content.Source static methods to conveniently read (in a blocking way or non-blocking way), for example via static Content.Source.asStringAsync(Content.Source, Charset), or via an InputStream using static Content.Source.asInputStream(Content.Source).

Refer to the Content.Source javadocs for further details.

Content.Chunk

Content.Chunk offers a retain/release API to control the lifecycle of its ByteBuffer.

When Content.Chunks are consumed synchronously, no additional retain/release API call is necessary, for example:

public void consume(Content.Chunk chunk) throws IOException
{
    // Consume the chunk synchronously within this method.

    // For example, parse the bytes into other objects,
    // or copy the bytes elsewhere (e.g. the file system).
    fileChannel.write(chunk.getByteBuffer());

    if (chunk.isLast())
        fileChannel.close();
}

On the other hand, if the Content.Chunk is not consumed immediately, then it must be retained, and you must arrange for the Content.Chunk to be released at a later time, thus pairing the retain. For example, you may accumulate the Content.Chunks in a List to convert them to a String when all the Content.Chunks have been read.

Since reading from a Content.Source is asynchronous, the String result is produced via a CompletableFuture:

// CompletableTask is-a CompletableFuture.
public class ChunksToString extends CompletableTask<String>
{
    private final List<Content.Chunk> chunks = new ArrayList<>();
    private final Content.Source source;

    public ChunksToString(Content.Source source)
    {
        this.source = source;
    }

    @Override
    public void run()
    {
        while (true)
        {
            // Read a chunk, must be eventually released.
            Content.Chunk chunk = source.read(); (1)

            if (chunk == null)
            {
                source.demand(this);
                return;
            }

            if (Content.Chunk.isFailure(chunk))
            {
                handleFatalFailure(chunk.getFailure());
                return;
            }

            // A normal chunk of content, consume it.
            consume(chunk);

            // Release the chunk.
            // This pairs the call to read() above.
            chunk.release(); (2)

            if (chunk.isLast())
            {
                // Produce the result.
                String result = getResult();

                // Complete this CompletableFuture with the result.
                complete(result);

                // The reading is complete.
                return;
            }
        }
    }

    public void consume(Content.Chunk chunk)
    {
        // The chunk is not consumed within this method, but
        // stored away for later use, so it must be retained.
        chunk.retain(); (3)
        chunks.add(chunk);
    }

    public String getResult()
    {
        Utf8StringBuilder builder = new Utf8StringBuilder();
        // Iterate over the chunks, copying and releasing.
        for (Content.Chunk chunk : chunks)
        {
            // Copy the chunk bytes into the builder.
            builder.append(chunk.getByteBuffer());

            // The chunk has been consumed, release it.
            // This pairs the retain() in consume().
            chunk.release(); (4)
        }
        return builder.toCompleteString();
    }
}
1 The read() that must be paired with a release().
2 The release() that pairs the read().
3 The retain() that must be paired with a release().
4 The release() that pairs the retain().

Note how method consume(Content.Chunk) retains the Content.Chunk because it does not consume it, but rather stores it away for later use. With this additional retain, the retain count is now 2: one implicitly from the read() that returned the Content.Chunk, and one explicit in consume(Content.Chunk).

However, just after returning from consume(Content.Chunk) the Content.Chunk is released (pairing the implicit retain from read()), so that the retain count goes to 1, and an additional release is still necessary.

Method getResult() arranges to release all the Content.Chunks that have been accumulated, pairing the retains done in consume(Content.Chunk), so that the retain count for the Content.Chunks goes finally to 0.

Content.Sink

The high-level abstraction that Jetty offers to write bytes is org.eclipse.jetty.io.Content.Sink.

The primary method to use is Content.Sink.write(boolean, ByteBuffer, Callback), which performs a non-blocking write of the given ByteBuffer, with the indication of whether the write is the last.

The Callback parameter is completed, successfully or with a failure, and possibly asynchronously by a different thread, when the write is complete.

Your application can typically perform zero or more non-last writes, and one final last write.

However, because the writes may be asynchronous, you cannot start a next write before the previous write is completed.

This code is wrong:

public void wrongWrite(Content.Sink sink, ByteBuffer content1, ByteBuffer content2)
{
    // Initiate a first write.
    sink.write(false, content1, Callback.NOOP);

    // WRONG! Cannot initiate a second write before the first is complete.
    sink.write(true, content2, Callback.NOOP);
}

You must initiate a second write only when the first is finished, for example:

public void manyWrites(Content.Sink sink, ByteBuffer content1, ByteBuffer content2)
{
    // Initiate a first write.
    Callback.Completable resultOfWrites = Callback.Completable.with(callback1 -> sink.write(false, content1, callback1))
        // Chain a second write only when the first is complete.
        .compose(callback2 -> sink.write(true, content2, callback2));

    // Use the resulting Callback.Completable as you would use a CompletableFuture.
    // For example:
    resultOfWrites.whenComplete((ignored, failure) ->
    {
        if (failure == null)
            System.getLogger("sink").log(INFO, "writes completed successfully");
        else
            System.getLogger("sink").log(INFO, "writes failed", failure);
    });
}

When you need to perform an unknown number of writes, you must use an IteratingCallback, explained in this section, to avoid StackOverFlowErrors.

For example, to copy from a Content.Source to a Content.Sink you should use the convenience method Content.copy(Content.Source, Content.Sink, Callback). For illustrative purposes, below you can find the implementation of copy(Content.Source, Content.Sink, Callback) that uses an IteratingCallback:

@SuppressWarnings("InnerClassMayBeStatic")
class Copy extends IteratingCallback
{
    private final Content.Source source;
    private final Content.Sink sink;
    private final Callback callback;
    private Content.Chunk chunk;

    public Copy(Content.Source source, Content.Sink sink, Callback callback)
    {
        this.source = source;
        this.sink = sink;
        // The callback to notify when the copy is completed.
        this.callback = callback;
    }

    @Override
    protected Action process() throws Throwable
    {
        // If the last write completed, succeed this IteratingCallback,
        // causing onCompleteSuccess() to be invoked.
        if (chunk != null && chunk.isLast())
            return Action.SUCCEEDED;

        // Read a chunk.
        chunk = source.read();

        // No chunk, demand to be called back when there will be more chunks.
        if (chunk == null)
        {
            source.demand(this::iterate);
            return Action.IDLE;
        }

        // The read failed, re-throw the failure
        // causing onCompleteFailure() to be invoked.
        if (Content.Chunk.isFailure(chunk))
            throw chunk.getFailure();

        // Copy the chunk.
        sink.write(chunk.isLast(), chunk.getByteBuffer(), this);
        return Action.SCHEDULED;
    }

    @Override
    public void succeeded()
    {
        // After every successful write, release the chunk.
        chunk.release();
        super.succeeded();
    }

    @Override
    public void failed(Throwable x)
    {
        super.failed(x);
    }

    @Override
    protected void onCompleteSuccess()
    {
        // The copy is succeeded, succeed the callback.
        callback.succeeded();
    }

    @Override
    protected void onCompleteFailure(Throwable failure)
    {
        // In case of a failure, either on the
        // read or on the write, release the chunk.
        chunk.release();

        // The copy is failed, fail the callback.
        callback.failed(failure);
    }

    @Override
    public InvocationType getInvocationType()
    {
        return InvocationType.NON_BLOCKING;
    }
}

Non-blocking writes can be easily turned in blocking writes. This leads to perhaps code that is simpler to read, but that also comes with a price: greater resource usage that may lead to less scalability and less performance.

public void blockingWrite(Content.Sink sink, ByteBuffer content1, ByteBuffer content2) throws IOException
{
    // First blocking write, returns only when the write is complete.
    Content.Sink.write(sink, false, content1);

    // Second blocking write, returns only when the write is complete.
    // It is legal to perform the writes sequentially, since they are blocking.
    Content.Sink.write(sink, true, content2);
}