Lesson 6 - Java Server - Client Dispatcher
In the previous lesson, Java Server - Connection Manager, we implemented the connection manager. Today we're going to look at managing clients that have been moved to the waiting queue because the server's maximum capacity has been reached.
Client Dispatcher
This class will have a simple goal. It'll try to maintain an active connection between the server and the client. It'll be going through the clients in the waiting queue at a specified time interval and sending them a simple (for now) text message with information about the current number of clients in the queue. If the message cannot be delivered, the connection to the client is terminated and the client is removed from the waiting queue. The whole process will happen only if there are clients in the queue.
Methods
I've already described the features in the paragraph above, now we'll summarize them as points:
- queue a client
- get a client from the queue
- return whether there's a client in the queued
In the core
package, we'll create a new package named
dispatcher
, in which we'll create a IClientDispatcher
interface. The interface will define the dispatcher methods:
public interface IClientDispatcher extends IThreadControl { boolean hasClientInQueue(); Client getClientFromQueue(); boolean addClientToQueue(Client client); }
The interface inherits from IThreadControl
so that we can
control the thread in which the dispatcher will run.
The Class Implementation
We'll name the class that will implement the interface simply
ClientDispatcher
, let it inherit from the Thread
class, and implement the IClientDispatcher
interface:
public class ClientDispatcher extends Thread implements IClientDispatcher
We'll define one class constant to represent the interval at which communication with clients in the queue will be repeated:
private static final int SLEEP_TIME = 5000;
Instance variables follow:
private boolean interupt = false; private final Semaphore semaphore = new Semaphore(0); private final Queue<Client> waitingQueue = new ConcurrentLinkedQueue<>(); private final Collection<Client> clientsToRemove = new ArrayList<>(); private final int waitingQueueSize;
The interrupt
variable will control the thread. As long as it's
false
, the thread will run. The semaphore will manage the queue.
The thread will wait at the semaphore until there are clients in the queue. Once
a connected client queues, the thread will pass through the semaphore and do its
job. Once all the clients are removed from the queue, the thread is put back to
sleep. Two collections follow. waitingQueue
will keep the clients
and clientsToRemove
will contain clients that have closed the
connection and must be removed from the queue. The waitingQueueSize
variable contains the maximum number of queued clients.
The class constructor will require a single parameter. It'll be the maximum number of clients to wait in the queue:
public ClientDispatcher(int waitingQueueSize) { this.waitingQueueSize = waitingQueueSize; }
Methods Implementation
We'll start implementing methods from the IClientDispatcher
interface:
@Override public boolean hasClientInQueue() { return !waitingQueue.isEmpty(); } @Override public Client getClientFromQueue() { return waitingQueue.poll(); } @Override public boolean addClientToQueue(Client client) { if (waitingQueue.size() < waitingQueueSize) { waitingQueue.add(client); semaphore.release(); return true; } return false; }
The first two methods are trivial and don't need to be commented. Let's talk
about the method adding clients to the queue. First, we must determine if the
queue can handle another client. If it does, the semaphore is released and
true
returned, otherwise false
is returned and nothing
more happens.
Thread Logic Code
Now we'll implement the most important method, run()
, in which
all the logic will take place:
@Override public void run() { while(!interupt) { while(waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} } if (interupt) { clientsToRemove.addAll(waitingQueue); } else { final int count = waitingQueue.size(); waitingQueue.iterator().forEach(client -> { try { client.writer.write(("count: " + count + "\n").getBytes()); client.writer.flush(); } catch (IOException e) { clientsToRemove.add(client); } }); } waitingQueue.removeAll(clientsToRemove); for (Client client : clientsToRemove) { client.close(); } clientsToRemove.clear(); try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException ignored) {} } }
First, an infinite loop that will depend on the interupt
variable is declared. Then another loop which will depend on the semaphore
follows. It's always better to have a semaphore waiting in a loop than in one
condition. The difference between the codes:
if (waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} }
and:
while (waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} }
is in one word only (if
and
while
), but the meaning is very different. While
waiting for the semaphore, an InterruptedException
may be thrown.
If we were waiting for the semaphore using if
,
then the thread would start to execute its logic unnecessarily. Therefore, it's
important to wait for the semaphore in a loop. Two things are checked in the
loop:
- queue length
- the
interupt
variable
If the thread wakes up at a semaphore and the queue is empty, or the
interupt
flag is false
, the thread will be put to
sleep again. It's important to have the interupt
variable.
Otherwise, we wouldn't be able to terminate the thread while shutting down the
server.
When we leave the semaphore part, there's an evaluation of whether or not the thread will terminate. If the thread is about to terminate, all clients from the queue will be added to the collection of clients to be removed from the queue. In the case of the standard scenario, a simple message is sent to all clients. If the message fails to deliver, the client is added in the collection to be removed too because the client might not have maintained the connection.
In the end, the connection with all users who were in the collection of clients to be removed is terminated, and we wait for a specified time and the whole loop will start from the beginning.
Terminating the Thread
Finally, we'll implement the shutdown()
method required by the
IThreadControl
interface:
@Override public void shutdown() { interupt = true; semaphore.release(); try { join(); } catch (InterruptedException ignored) { } }
In this method we do three things:
- set the
interupt
variable totrue
- release the semaphore
- wait for the thread to terminate
Releasing the semaphore triggers the dispatcher thread. By setting the
interupt
variable to true
, all clients from the queue
will be added to the list of clients to disconnect. After the connection with
the client is closed and the client removed from the queue, the infinite loop
condition no longer applies and the thread terminates safely.
Wiring the Logic
In the second part of today's article we'll use the client dispatcher in the
ConnectionManager
class. We'll start by adding a new class constant
of the IClientDispatcher
type and we'll also add a parameter of the
same type to the ConnectionManager
class constructor to initialize
the constant:
public ConnectionManager(IClientDispatcher clientDispatcher, ExecutorService pool, int maxClients) { this.clientDispatcher = clientDispatcher; this.pool = pool; this.maxClients = maxClients; }
Next, we'll finish the implementation of the
insertClientToListOrQueue()
method. We'll modify
connectionClosedListener
so that the server tries to retrieve a
client from the queue and add it to the active clients:
client.setConnectionClosedListener(() - > { clients.remove(client); if (clientDispatcher.hasClientInQueue()) { this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue()); } });
Instead of the second TODO we'll add the client to the queue:
if (!clientDispatcher.addClientToQueue(client)) {
client.close();
}
Here we use the return value of the addClientToQueue()
method,
which is true
if it added the client to the queue successfully. If
the queue is full, the method returns false
and we disconnect the
client.
Now we only need to run the dispatcher thread. To start it, we'll use the
onServerStart()
method of the ConnectionManager
class,
where we'll call:
clientDispatcher.start();
In the onServerStop()
method, we'll terminate the
dispatcher:
clientDispatcher.shutdown();
We call the shutdown()
method of the dispatcher to set the
interupt
variable and wake up the thread. After a while, the
dispatcher thread terminates.
Finally, we'll create a dispatcher factory and register it. So we'll create
an IClientDispatcherFactory
interface that will have a single
getClientDispatcher()
method that will accept the maximum number of
queued clients as a parameter:
public interface IClientDispatcherFactory { IClientDispatcher getClientDispatcher(int waitingQueueSize); }
The implementation of this interface will be very simple. We'll create a
ClientDispatcherFactory
class that will implement this interface
and implement its only getClientDispatcher()
method:
public class ClientDispatcherFactory implements IClientDispatcherFactory { @Override public IClientDispatcher getClientDispatcher(int waitingQueueSize) { return new ClientDispatcher(waitingQueueSize); } }
We'll register the factory in the ServerModule
class as
usual:
bind(IClientDispatcherFactory.class).to(ClientDispatcherFactory.class);
Everything is almost fine except for ConnectionManagerFactory
.
We changed the constructor signature of the ConnectionManager
class
by adding a parameter of the IClientDispatcher
type. So we'll
create a new instance constant of the IClientDispatcherFactory
type
in this factory. The connection manager factory will get this constant in the
constructor:
private final IClientDispatcherFactory clientDispatcherFactory; @Inject ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory) { this.clientDispatcherFactory = clientDispatcherFactory; }
Now, nothing stops us from modifying the getConnectionManager()
method. To create a new connection manager instance we'll use the client
dispatcher factory:
return new ConnectionManager(clientDispatcherFactory.getClientDispatcher( waitingQueueSize), pool, maxClients);
By doing this, we should have finished the part managing clients waiting in the queue. Next time, in the lesson Java Server - Writing Thread, we'll create a thread that will send messages from the server to clients asynchronously.