Get up to 80 % extra points for free! More info:

Lesson 9 - Java Server - Event bus

In the previous lesson, Java Server - Communication Protocol, we designed our communication protocol and finally tested communication between the server and the client successfully.

Today we're going to create a simple event bus to propagate events across the server.

Event Bus

This mechanism allows communication between two different components not knowing each other. One component produces an event and doesn't care how many listener components capture and respond to this event.

Designing Interfaces

In the core package we'll create a new package named event. All classes that we're going to create now will be in this package unless specified otherwise. First we'll design the interfaces and then implement them. We'll create the interface as follows:

  • IEvent - an interface representing the event
  • IEventHandler - an interface to react to the event
  • IEventBus - an interface to manage the events

The IEvent interface will contain a single getEventType() method that will return the unique identifier of the event that has occurred:

public interface IEvent {
    String getEventType();
}

The IEventHandler interface will also contain a single method only - handleEvent() which will be overwritten by everyone who wants to listen to the event:

@FunctionalInterface
public interface IEventHandler {
    void handleEvent(IEvent event);
}

Finally, the IEventBus interface will contain methods for subscribing/un­subscribing to events and a method for raising an event:

public interface IEventBus {
    void registerEventHandler(String messageType, IEventHandler listener);
    void unregisterEventHandler(String messageType, IEventHandler listener);
    void publishEvent(IEvent event);
}

Implementation

The only interface we'll implement in the core package is the IEventBus. We'll use an EventBus class of the same name to do so:

@Singleton
public class EventBus implements IEventBus {

    private final Map<String, List<IEventHandler>> listenerMap = new HashMap<>();

    @Override
    public void registerEventHandler(String messageType, IEventHandler listener) {
        List<IEventHandler> listeners = listenerMap
            .computeIfAbsent(messageType, k -> new ArrayList<>());
        listeners.add(listener);
    }

    @Override
    public void unregisterEventHandler(String messageType,
        IEventHandler listener) {
        final List<IEventHandler> listeners = listenerMap
            .getOrDefault(messageType, Collections.emptyList());
        listeners.remove(listener);
    }

    @Override
    public void publishEvent(IEvent event) {
        final List<IEventHandler> handlers = listenerMap
            .getOrDefault(event.getEventType(), Collections.emptyList());
        handlers.forEach(handler -> handler.handleEvent(event));
    }
}

The class contains one class constant, listenerMap, in which event listeners for individual events will be stored. The map has String keys, which we'll get using the getEventType() method from the IEvent interface. The map value is a collection of listeners so that multiple listeners can be registered for a single event.

Finally, we'll register the event bus to the guice module:

bind(IEventBus.class).to(EventBus.class);

Using the Events

For now, we'll just publish events, specifically that:

  • a user has joined
  • a user has disconnected
  • the server received a message from a user

We'll implement reacting to these events in the future. The ConnectionManager class can publish the information that a user has connected/dis­connected, as it's the only one class that will know that. So we'll add two classes to the connection package to represent the connected and disconnected events:

The ClientConnectedEvent:

public class ClientConnectedEvent implements IEvent {
    public static final String EVENT_TYPE = "client-connected";
    private final Client client;

    ClientConnectedEvent(Client client) {
        this.client = client;
    }

    public Client getClient() {
        return client;
    }

    @Override
    public String getEventType() {
        return EVENT_TYPE;
    }
}

The ClientDisconnectedEvent:

public class ClientDisconnectedEvent implements IEvent {
    public static final String EVENT_TYPE = "client-disonnected";
    private final Client client;

    ClientDisconnectedEvent(Client client) {
        this.client = client;
    }

    public Client getClient() {
        return client;
    }

    @Override
    public String getEventType() {
        return EVENT_TYPE;
    }
}

We'll add a new class constant of the IEventBus type to ConnectionManager and let it initialize in the constructor from a parameter:

private final IEventBus eventBus;

    @Inject
    public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread,
        IEventBus eventBus, ExecutorService pool, int maxClients) {
        this.clientDispatcher = clientDispatcher;
        this.writerThread = writerThread;
        this.eventBus = eventBus;
        this.pool = pool;
        this.maxClients = maxClients;
    }

The events themselves will be raised in the insertClientToListOrQueue() method. We'll publish an event that a client has disconnected as soon as we remove the client from the client list:

client.setConnectionClosedListener(() -> {
    clients.remove(client);
    // Creating a new event
    eventBus.publishEvent(new ClientDisconnectedEvent(client));
    if (clientDispatcher.hasClientInQueue()) {
        this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue());
    }
});

We'll publish the connected client event after adding the client to the threadpool:

pool.submit(client);
eventBus.publishEvent(new ClientConnectedEvent(client));

The last event we're going to publish is the event of the received message from a client. First, we'll create a class to represent this event:

public class MessageReceivedEvent implements IEvent {

    private final IMessage receivedMessage;
    private final Client client;

    MessageReceivedEvent(IMessage receivedMessage, Client client) {
        this.receivedMessage = receivedMessage;
        this.client = client;
    }

    @Override
    public String getEventType() {
        return receivedMessage.getType();
    }

    public IMessage getReceivedMessage() {
        return receivedMessage;
    }

    public Client getClient() {
        return client;
    }
}

Now we'll move to the Client class. Again, we'll create a class constant of the IEventBus type there and initialize it in the constructor using a parameter:

private final IEventBus eventBus;

Client(Socket socket, IWriterThread writerThread, IEventBus eventBus) throws IOException {
    this.socket = socket;
    writer = new ObjectOutputStream(socket.getOutputStream());
    this.writerThread = writerThread;
    this.eventBus = eventBus;
}

Finally, in the run() method, when we receive a message, we publish it:

IMessage received;
while ((received = (IMessage) reader.readObject()) != null) {
    eventBus.publishEvent(new MessageReceivedEvent(received, this));
}

By doing this, we publish events across the entire server.

In the next lesson, Java Server - Plugin System, we'll implement a plug-in system to extend the server with new functionality easily.


 

Previous article
Java Server - Communication Protocol
All articles in this section
Server for Client Applications in Java
Skip article
(not recommended)
Java Server - Plugin System
Article has been written for you by Petr Štechmüller
Avatar
User rating:
2 votes
Activities