Lesson 9 - Java Server - Event bus
In the previous lesson, Java Server - Communication Protocol, we designed our communication protocol and finally tested communication between the server and the client successfully.
Today we're going to create a simple event bus to propagate events across the server.
Event Bus
This mechanism allows communication between two different components not knowing each other. One component produces an event and doesn't care how many listener components capture and respond to this event.
Designing Interfaces
In the core
package we'll create a new package named
event
. All classes that we're going to create now will be in this
package unless specified otherwise. First we'll design the interfaces and then
implement them. We'll create the interface as follows:
IEvent
- an interface representing the eventIEventHandler
- an interface to react to the eventIEventBus
- an interface to manage the events
The IEvent
interface will contain a single
getEventType()
method that will return the unique identifier of the
event that has occurred:
public interface IEvent { String getEventType(); }
The IEventHandler
interface will also contain a single method
only - handleEvent()
which will be overwritten by everyone who
wants to listen to the event:
@FunctionalInterface public interface IEventHandler { void handleEvent(IEvent event); }
Finally, the IEventBus
interface will contain methods for
subscribing/unsubscribing to events and a method for raising an event:
public interface IEventBus { void registerEventHandler(String messageType, IEventHandler listener); void unregisterEventHandler(String messageType, IEventHandler listener); void publishEvent(IEvent event); }
Implementation
The only interface we'll implement in the core
package is the
IEventBus
. We'll use an EventBus
class of the same
name to do so:
@Singleton public class EventBus implements IEventBus { private final Map<String, List<IEventHandler>> listenerMap = new HashMap<>(); @Override public void registerEventHandler(String messageType, IEventHandler listener) { List<IEventHandler> listeners = listenerMap .computeIfAbsent(messageType, k -> new ArrayList<>()); listeners.add(listener); } @Override public void unregisterEventHandler(String messageType, IEventHandler listener) { final List<IEventHandler> listeners = listenerMap .getOrDefault(messageType, Collections.emptyList()); listeners.remove(listener); } @Override public void publishEvent(IEvent event) { final List<IEventHandler> handlers = listenerMap .getOrDefault(event.getEventType(), Collections.emptyList()); handlers.forEach(handler -> handler.handleEvent(event)); } }
The class contains one class constant, listenerMap
, in which
event listeners for individual events will be stored. The map has
String
keys, which we'll get using the getEventType()
method from the IEvent
interface. The map value is a collection of
listeners so that multiple listeners can be registered for a single event.
Finally, we'll register the event bus to the guice module:
bind(IEventBus.class).to(EventBus.class);
Using the Events
For now, we'll just publish events, specifically that:
- a user has joined
- a user has disconnected
- the server received a message from a user
We'll implement reacting to these events in the future. The
ConnectionManager
class can publish the information that a user has
connected/disconnected, as it's the only one class that will know that. So
we'll add two classes to the connection
package to represent the
connected and disconnected events:
The ClientConnectedEvent
:
public class ClientConnectedEvent implements IEvent { public static final String EVENT_TYPE = "client-connected"; private final Client client; ClientConnectedEvent(Client client) { this.client = client; } public Client getClient() { return client; } @Override public String getEventType() { return EVENT_TYPE; } }
The ClientDisconnectedEvent
:
public class ClientDisconnectedEvent implements IEvent { public static final String EVENT_TYPE = "client-disonnected"; private final Client client; ClientDisconnectedEvent(Client client) { this.client = client; } public Client getClient() { return client; } @Override public String getEventType() { return EVENT_TYPE; } }
We'll add a new class constant of the IEventBus
type to
ConnectionManager
and let it initialize in the constructor from a
parameter:
private final IEventBus eventBus; @Inject public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread, IEventBus eventBus, ExecutorService pool, int maxClients) { this.clientDispatcher = clientDispatcher; this.writerThread = writerThread; this.eventBus = eventBus; this.pool = pool; this.maxClients = maxClients; }
The events themselves will be raised in the
insertClientToListOrQueue()
method. We'll publish an event that a
client has disconnected as soon as we remove the client from the client
list:
client.setConnectionClosedListener(() -> { clients.remove(client); // Creating a new event eventBus.publishEvent(new ClientDisconnectedEvent(client)); if (clientDispatcher.hasClientInQueue()) { this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue()); } });
We'll publish the connected client event after adding the client to the threadpool:
pool.submit(client);
eventBus.publishEvent(new ClientConnectedEvent(client));
The last event we're going to publish is the event of the received message from a client. First, we'll create a class to represent this event:
public class MessageReceivedEvent implements IEvent { private final IMessage receivedMessage; private final Client client; MessageReceivedEvent(IMessage receivedMessage, Client client) { this.receivedMessage = receivedMessage; this.client = client; } @Override public String getEventType() { return receivedMessage.getType(); } public IMessage getReceivedMessage() { return receivedMessage; } public Client getClient() { return client; } }
Now we'll move to the Client
class. Again, we'll create a class
constant of the IEventBus
type there and initialize it in the
constructor using a parameter:
private final IEventBus eventBus; Client(Socket socket, IWriterThread writerThread, IEventBus eventBus) throws IOException { this.socket = socket; writer = new ObjectOutputStream(socket.getOutputStream()); this.writerThread = writerThread; this.eventBus = eventBus; }
Finally, in the run()
method, when we receive a message, we
publish it:
IMessage received; while ((received = (IMessage) reader.readObject()) != null) { eventBus.publishEvent(new MessageReceivedEvent(received, this)); }
By doing this, we publish events across the entire server.
In the next lesson, Java Server - Plugin System, we'll implement a plug-in system to extend the server with new functionality easily.