Sunday, August 07, 2011

Reactor Design Pattern - using java nio

While working on nginx, I got so interested with the architecture on how it can address the C10K problem. Unlike traditional servers, Nginx doesn't rely on threads to handle requests. Instead it uses a much more scalable event-driven (asynchronous) architecture.

But what is event-driven (asynchronous) architecture really? To simplify, let's talk about Asynchronous I/O. Asynchronous I/O is also known as Non-blocking I/O. It can be best describe by the reactor design pattern.

Wikipedia explains, the reactor design pattern is a concurrent programming pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers. (The Reactor pattern is closely related to the Observer/Observable pattern in this aspect: all dependents are informed when a single subject changes. The Observer pattern is associated with a single source of events, however, whereas the Reactor pattern is associated with multiple sources of events.)

Reactor design pattern is easier to understand by examples and diagrams. In this illustration, I will be using the java nio.

Until JDK 1.4, the Java platform did not support nonblocking I/O calls. With an almost one-to-one ratio of threads to clients, servers written in the Java language were susceptible to enormous thread overhead, which resulted in both performance problems and lack of scalability.


By this time, you have the basic idea of how the reactor pattern works! The key component are the Selector, Channels (and buffers) and the handler. Let's investigate them one by one.

Channels and Buffers
Channels are like streams in the original I/O package. All data that goes anywhere (or comes from anywhere) must pass through a Channel object. A Buffer is a container object. All data that is sent to a channel must first be placed in a buffer; likewise, any data that is read from a channel is read into a buffer.

A Buffer is an object, which holds some data, that is to be written to or that has just been read from. The addition of the Buffer object in NIO marks one of the most significant differences between the new library and original I/O. In stream-oriented I/O, you wrote data directly to, and read data directly from, Stream objects.

In the NIO library, all data is handled with buffers. When data is read, it is read directly into a buffer. When data is written, it is written into a buffer. Anytime you access data in NIO, you are pulling it out of the buffer.

The most commonly used kind of buffer is the ByteBuffer. A ByteBuffer allows get/set operations (that is, the getting and setting of bytes) on its underlying byte array. (There are other buffers as well. CharBuffer ShortBuffer, IntBuffer, LongBuffer, FloatBuffer, and DoubleBuffer). *NOTE: StringBuffer was added in Java 5 and it's not even part of nio package.

Basic Example on reading data from a Channel.

//getting the channel.
FileInputStream fin = new FileInputStream( "readandshow.txt" );
FileChannel fc = fin.getChannel();

//creating a buffer.
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
fc.read( buffer );

You'll notice that we didn't need to tell the channel how much to read into the buffer. Each buffer has a sophisticated internal accounting system that keeps track of how much data has been read and how much room there is for more data.

Writing to a file.
FileOutputStream fout = new FileOutputStream( "writesomebytes.txt" );
FileChannel fc = fout.getChannel();

//create a buffer, and put some data in it.
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
for (int i=0; i<100; ++i) {
    buffer.put( i );
}

//The flip() method 
//prepares the buffer to have the 
//newly-read data written to another channel
buffer.flip();

//write data of the buffer.
fc.write( buffer );
Selector The central object in asynchronous I/O is called the Selector. A Selector is where you register your interest in various I/O events, and it is the object that tells you when those events occur. Example:
Selector selector = Selector.open();

//another way of getting selector instance.
Selector selector = SelectorProvider.provider().openSelector();
Handler The handler are your worker threads. They are responsible for the data that you read, and also for writing your data. You can pre-define a thread pool to handle all your request. There's nothing fancy about the handler. Check the link I provided below, for detailed example. Now that you are familiar with Selectors, Channels, buffers and Selector, we need to tie them together. But first, to accept connection from a client, you need a ServerSocketChannel. ServerSocketChannel is the nio version of ServerSocket that uses channeling and buffering methodology. Example:
// Create a new non-blocking server socket channel
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);

// Bind the server socket to the specified address and port
// NOTE: HOST_ADDRESS is a type InetAddress and PORT is an int.
InetSocketAddress isa = new InetSocketAddress(HOST_ADDRESS, PORT);
serverChannel.socket().bind(isa);

//registering the ServerSocketChannel to the selector.
SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );

The first argument to register() is always the Selector. The second argument, OP_ACCEPT, here specifies that we want to listen for accept events -- that is, the events that occur when a new connection is made. This is the only kind of event that is appropriate for a ServerSocketChannel.

Note the return value of the call to register(). A SelectionKey represents this registration of this channel with this Selector. When a Selector notifies you of an incoming event, it does this by supplying the SelectionKey that corresponds to that event. The SelectionKey can also be used to de-register the channel.

I'm sure by this time, you are ready to see a working sample code to implement this whole theory. I thought of writing an example, but this site did it very well. http://rox-xmlrpc.sourceforge.net/niotut/

Monday, June 06, 2011

Java Concurrency Utilities: using Semaphore

If you haven't done much of multi-threaded programming with Java 5, I am sure when you are ask of how prevent concurrent problems when 2 threads is accessing your data, you would think of synchronizing code, by making the method synchronized.

public class SyncCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public synchronized int value() {
        return c;
    }
}

Perhaps, you would probably also come up with an idea, instead of synchronizing a method, you would only synchronized a block of code.

    public void increment() {
        synchronized(c) {
            c++;
        }
    }

But with the concurrency API of java 5, they have added solutions for common threads requirements. In particular, the have added "Semaphore".

A Semaphore controls access to shared resource using a counter. If the counter has a value greater than zero, then access is allowed. If it is zero, then access is denied. What the counter is counting are permits that allow access to the shared resource. Ergo, to access the resource, a thread must be granded a permit from the semaphore.

Semaphore has two constructor:

Sempahore(int num)
Semaphore(int num, boolean how)

num specifies the initial permit count. The num parameter, specifies the number of threads that can access a shared resource at any one time. If the value of num is one, then only one thread can access the resource at any one time. By setting the how to true, you can ensure that waiting threads are granted a permit in the order in which they request access.

To acquire permit, call the acquire() method, which has these two forms:

void acquire() throws InterruptedException
void acquire(int num) throws InterruptedException

To release a permit, call release(), which has these two forms:

void release()
void release(int num)

The first form releases one permit. The second form releases the number of permits.

To use a semaphore to control access to a resource, each thread that wants to use that resource must first call acquire() before accessing the resource. When the thread is done with the resource, it must call release().

import java.util.concurrent.*;

class SemaphoreDemo {

    public static void main(String args[]) {
        //instantiate a Semaphore with value 1. Meaning, 1 thread can aquire permit at a time.
        Semaphore sem = new Semaphore(1);
      
        //instantiate 2 threads to access a shared resource at the same time.
        new IncThread(sem, "A");
        new DecThread(sem, "B");
    }

}

// A shared resource.
class Shared {
    static int count = 0;
}

class IncThread implements Runnable {
    String name;
    Semaphore sem;

    IncThread(Semaphore s, String n) {
        sem = s;
        name = n;
        new Thread(this).start();
    }

    public void run() {
        try {
        //acquiring the permit.
            sem.acquire();
            System.out.println(name + "gets a permit.");
            for( int i=0; i < 5; i++ ) {
                Shared.count++;
                System.out.println(name + ":" + Shared.count);
                Thread.sleep(10);
            }
            } catch (InterruptedException exc) {
               System.out.println(exc);
            }
        //releasing the permit.
        System.out.println(name + "releases the permit.");
        sem.release();
    }
}

class DecThread implements Runnable {
    String name;
    Semaphore sem;

    DecThread(Semaphore s, String n) {
        sem = s;
        name = n;
        new Thread(this).start();
    }

    public void run() {
        try {
            sem.acquire();
            System.out.println(name + "gets a permit.");
            for(int i=0; i < 5; i++ ) {
                Shared.count--;
               System.out.println(name + ":" + Shared.count);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc);
        }
        System.out.println(name + "releases the permit.");
        sem.release();
    }
}


Notice that in the run methods, there are no synchronized keywords define. That is because, the semaphore actually implements it internally making it synchronized as you acquire for the lock.

Without the use of Semaphore, accesses to Shared.count by both threads would have occurred simultaneously, and the increments and decrements would be intermixed.

Monday, May 23, 2011

new try-catch in java

As we have experienced this, it is difficult to correctly close resources. For example, if you open a file or a socket, it is easy to forget to close it. Your code can easily ran out of file handles if not properly taken care of.
To make things easier, Java 7 introduced the new "try with resources" syntax. It automatically closes any AutoCloseable resources referenced in the try statement. For example, instead of manually closing streams ourselves, we can simply do this:

import java.io.*;

public class AutomaticResourceClosing {
  public static void main(String[] args) throws IOException {
    try (
      PrintStream out = new PrintStream (
          new BufferedOutputStream(
              new FileOutputStream("foo.txt")))
    ) {
      out.print("Unable to close resource");
    }
  }
}


Take note that there is never a semicolon at the end of the "try ()" declaration.

You can read more about other features by going to Java.net site ( http://jdk7.java.net/ ).