Get up to 80 % extra points for free! More info:

Lesson 6 - Java Server - Client Dispatcher

In the previous lesson, Java Server - Connection Manager, we implemented the connection manager. Today we're going to look at managing clients that have been moved to the waiting queue because the server's maximum capacity has been reached.

Client Dispatcher

This class will have a simple goal. It'll try to maintain an active connection between the server and the client. It'll be going through the clients in the waiting queue at a specified time interval and sending them a simple (for now) text message with information about the current number of clients in the queue. If the message cannot be delivered, the connection to the client is terminated and the client is removed from the waiting queue. The whole process will happen only if there are clients in the queue.

Methods

I've already described the features in the paragraph above, now we'll summarize them as points:

  • queue a client
  • get a client from the queue
  • return whether there's a client in the queued

In the core package, we'll create a new package named dispatcher, in which we'll create a IClientDispatcher interface. The interface will define the dispatcher methods:

public interface IClientDispatcher extends IThreadControl {
    boolean hasClientInQueue();
    Client getClientFromQueue();
    boolean addClientToQueue(Client client);
}

The interface inherits from IThreadControl so that we can control the thread in which the dispatcher will run.

The Class Implementation

We'll name the class that will implement the interface simply ClientDispatcher, let it inherit from the Thread class, and implement the IClientDispatcher interface:

public class ClientDispatcher extends Thread implements IClientDispatcher

We'll define one class constant to represent the interval at which communication with clients in the queue will be repeated:

private static final int SLEEP_TIME = 5000;

Instance variables follow:

private boolean interupt = false;
private final Semaphore semaphore = new Semaphore(0);
private final Queue<Client> waitingQueue = new ConcurrentLinkedQueue<>();
private final Collection<Client> clientsToRemove = new ArrayList<>();
private final int waitingQueueSize;

The interrupt variable will control the thread. As long as it's false, the thread will run. The semaphore will manage the queue. The thread will wait at the semaphore until there are clients in the queue. Once a connected client queues, the thread will pass through the semaphore and do its job. Once all the clients are removed from the queue, the thread is put back to sleep. Two collections follow. waitingQueue will keep the clients and clientsToRemove will contain clients that have closed the connection and must be removed from the queue. The waitingQueueSize variable contains the maximum number of queued clients.

The class constructor will require a single parameter. It'll be the maximum number of clients to wait in the queue:

public ClientDispatcher(int waitingQueueSize) {
    this.waitingQueueSize = waitingQueueSize;
}

Methods Implementation

We'll start implementing methods from the IClientDispatcher interface:

@Override
public boolean hasClientInQueue() {
    return !waitingQueue.isEmpty();
}

@Override
public Client getClientFromQueue() {
    return waitingQueue.poll();
}

@Override
public boolean addClientToQueue(Client client) {
    if (waitingQueue.size() < waitingQueueSize) {
        waitingQueue.add(client);
        semaphore.release();
        return true;
    }

    return false;
}

The first two methods are trivial and don't need to be commented. Let's talk about the method adding clients to the queue. First, we must determine if the queue can handle another client. If it does, the semaphore is released and true returned, otherwise false is returned and nothing more happens.

Thread Logic Code

Now we'll implement the most important method, run(), in which all the logic will take place:

@Override
public void run() {
    while(!interupt) {
        while(waitingQueue.isEmpty() && !interupt) {
            try {
                semaphore.acquire();
            } catch (InterruptedException ignored) {}
        }

        if (interupt) {
            clientsToRemove.addAll(waitingQueue);
        } else {
        final int count = waitingQueue.size();
            waitingQueue.iterator().forEach(client -> {
                try {
                    client.writer.write(("count: " + count + "\n").getBytes());
                    client.writer.flush();
                } catch (IOException e) {
                    clientsToRemove.add(client);
                }
            });
        }

        waitingQueue.removeAll(clientsToRemove);
        for (Client client : clientsToRemove) {
            client.close();
        }
        clientsToRemove.clear();

        try {
            Thread.sleep(SLEEP_TIME);
        } catch (InterruptedException ignored) {}
    }
}

First, an infinite loop that will depend on the interupt variable is declared. Then another loop which will depend on the semaphore follows. It's always better to have a semaphore waiting in a loop than in one condition. The difference between the codes:

if (waitingQueue.isEmpty() && !interupt) {
    try {
        semaphore.acquire();
    } catch (InterruptedException ignored) {}
}

and:

while (waitingQueue.isEmpty() && !interupt) {
    try {
        semaphore.acquire();
    } catch (InterruptedException ignored) {}
}

is in one word only (if and while), but the meaning is very different. While waiting for the semaphore, an InterruptedException may be thrown. If we were waiting for the semaphore using if, then the thread would start to execute its logic unnecessarily. Therefore, it's important to wait for the semaphore in a loop. Two things are checked in the loop:

  1. queue length
  2. the interupt variable

If the thread wakes up at a semaphore and the queue is empty, or the interupt flag is false, the thread will be put to sleep again. It's important to have the interupt variable. Otherwise, we wouldn't be able to terminate the thread while shutting down the server.

When we leave the semaphore part, there's an evaluation of whether or not the thread will terminate. If the thread is about to terminate, all clients from the queue will be added to the collection of clients to be removed from the queue. In the case of the standard scenario, a simple message is sent to all clients. If the message fails to deliver, the client is added in the collection to be removed too because the client might not have maintained the connection.

In the end, the connection with all users who were in the collection of clients to be removed is terminated, and we wait for a specified time and the whole loop will start from the beginning.

Terminating the Thread

Finally, we'll implement the shutdown() method required by the IThreadControl interface:

@Override
public void shutdown() {
    interupt = true;
    semaphore.release();
    try {
        join();
    } catch (InterruptedException ignored) { }

}

In this method we do three things:

  1. set the interupt variable to true
  2. release the semaphore
  3. wait for the thread to terminate

Releasing the semaphore triggers the dispatcher thread. By setting the interupt variable to true, all clients from the queue will be added to the list of clients to disconnect. After the connection with the client is closed and the client removed from the queue, the infinite loop condition no longer applies and the thread terminates safely.

Wiring the Logic

In the second part of today's article we'll use the client dispatcher in the ConnectionManager class. We'll start by adding a new class constant of the IClientDispatcher type and we'll also add a parameter of the same type to the ConnectionManager class constructor to initialize the constant:

public ConnectionManager(IClientDispatcher clientDispatcher, ExecutorService pool,
    int maxClients) {
    this.clientDispatcher = clientDispatcher;
    this.pool = pool;
    this.maxClients = maxClients;
}

Next, we'll finish the implementation of the insertClientToListOrQueue() method. We'll modify connectionClosedListener so that the server tries to retrieve a client from the queue and add it to the active clients:

client.setConnectionClosedListener(() - > {
    clients.remove(client);
    if (clientDispatcher.hasClientInQueue()) {
        this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue());
    }
});

Instead of the second TODO we'll add the client to the queue:

if (!clientDispatcher.addClientToQueue(client)) {
    client.close();
}

Here we use the return value of the addClientToQueue() method, which is true if it added the client to the queue successfully. If the queue is full, the method returns false and we disconnect the client.

Now we only need to run the dispatcher thread. To start it, we'll use the onServerStart() method of the ConnectionManager class, where we'll call:

clientDispatcher.start();

In the onServerStop() method, we'll terminate the dispatcher:

clientDispatcher.shutdown();

We call the shutdown() method of the dispatcher to set the interupt variable and wake up the thread. After a while, the dispatcher thread terminates.

Finally, we'll create a dispatcher factory and register it. So we'll create an IClientDispatcherFactory interface that will have a single getClientDispatcher() method that will accept the maximum number of queued clients as a parameter:

public interface IClientDispatcherFactory {
    IClientDispatcher getClientDispatcher(int waitingQueueSize);
}

The implementation of this interface will be very simple. We'll create a ClientDispatcherFactory class that will implement this interface and implement its only getClientDispatcher() method:

public class ClientDispatcherFactory implements IClientDispatcherFactory {
    @Override
    public IClientDispatcher getClientDispatcher(int waitingQueueSize) {
        return new ClientDispatcher(waitingQueueSize);
    }
}

We'll register the factory in the ServerModule class as usual:

bind(IClientDispatcherFactory.class).to(ClientDispatcherFactory.class);

Everything is almost fine except for ConnectionManagerFactory. We changed the constructor signature of the ConnectionManager class by adding a parameter of the IClientDispatcher type. So we'll create a new instance constant of the IClientDispatcherFactory type in this factory. The connection manager factory will get this constant in the constructor:

private final IClientDispatcherFactory clientDispatcherFactory;

@Inject
ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory) {
    this.clientDispatcherFactory = clientDispatcherFactory;
}

Now, nothing stops us from modifying the getConnectionManager() method. To create a new connection manager instance we'll use the client dispatcher factory:

return new ConnectionManager(clientDispatcherFactory.getClientDispatcher( waitingQueueSize), pool, maxClients);

By doing this, we should have finished the part managing clients waiting in the queue. Next time, in the lesson Java Server - Writing Thread, we'll create a thread that will send messages from the server to clients asynchronously.


 

Previous article
Java Server - Connection Manager
All articles in this section
Server for Client Applications in Java
Skip article
(not recommended)
Java Server - Writing Thread
Article has been written for you by Petr Štechmüller
Avatar
User rating:
No one has rated this quite yet, be the first one!
Activities