Lesson 18 - Java Chat - Client - Server Connection Part 2
In the previous lesson, Java Chat - Client - Server Connection Part 1, we created an interface for a client communicator, among other things.
In today's Java tutorial we're going to implement the communicator.
Client communicator
In the service
package, we'll create a new
ClientCommunicationService
class and let it implement the
IClientCommunicationService
interface:
public class ClientCommunicationService implements IClientCommunicationService { }
Variables and Constants
We'll add the following variables and constants to the class:
private final ObjectProperty<Socket> socket = new SimpleObjectProperty<>(this, "socket", null); private final ReadOnlyObjectWrapper<ConnectionState> connectionState = new ReadOnlyObjectWrapper<>(this, "connectionState", ConnectionState.DISCONNECTED); private final HashMap<String, List<OnDataReceivedListener>> listeners = new HashMap<>(); private final StringProperty host = new SimpleStringProperty(this, "host", null); private final IntegerProperty port = new SimpleIntegerProperty(this, "port", -1); private final StringProperty connectedServerName = new SimpleStringProperty(this, "connectedServerName", null); private final ObjectProperty<ServerStatus> serverStatus = new SimpleObjectProperty<>(this, "serverStatus", ServerStatus.EMPTY); private final Queue<Request> requests = new LinkedBlockingQueue<>(); private ReaderThread readerThread; private WriterThread writerThread;
All the constants are self-explanatory, so we don't need to comment them.
It's worth mentioning the socket
constant, which is wrapped in the
ObjectProperty
class. This gives us the opportunity to observe
changes in the value. Interesting is also the requests
queue, using
which we'll realize the request-response type of communication.
We'll create the Request
class later. The readerThread
and writerThread
will contain the reader and writer threads. We
won't initialize these variables until we try to create a new connection.
Constructor
The class constructor won't require any parameters. In it, we'll set the listener on the socket and create a binding to the server name, which will have the format: "name: port":
public ClientCommunicationService() { socket.addListener(this::socketListener); connectedServerName.bind(Bindings.createStringBinding(() -> String.format("%s:%d", host.get(), port.get()), host, port, connectionState)); }
Socket Status Change Listener
We'll create a private socketListener()
method, which we
registered in the constructor. In this method we'll initialize/cancel the
reader/writer thread:
private void socketListener(ObservableValue<? extends Socket> observableValue, Socket oldSocket, Socket newSocket) { if (newSocket == null) { readerThread = null; writerThread = null; unregisterMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener); return; } try { readerThread = new ReaderThread(newSocket.getInputStream(), listener, this::disconnect); writerThread = new WriterThread(newSocket.getOutputStream(), this::disconnect); readerThread.start(); writerThread.start(); registerMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener); } catch (IOException e) { System.out.println("Server communication error eccurred."); } }
The method consists of two parts. The first part handles the case when the
connection was terminated for some reason and it's necessary to remove the old
reader/writer thread instances. The rest of the method assumes that the
newSocket
variable contains a new valid socket based on the newly
created connection. New instances of the reader/writer thread are created and
started by start()
. We'll explain the
(un)registerMessageObserver()
method when implementing it.
Delegating Received Messages Processing
We'll add another instance constant to the class, which will contain an anonymous function sending messages to registered observers:
private final OnDataReceivedListener listener = message -> { if (message.isResponce()) { final Request poll = requests.poll(); if (poll != null) { poll.onResponce(message); } return; } final List<OnDataReceivedListener> listenerList = listeners.get(message.getType()); if (listenerList == null) { return; } for (OnDataReceivedListener listener : listenerList) { listener.onDataReceived(message); } };
At the beginning of the method, we check whether the received message is a
response to a request. If so, the message is retrieved and the message handler
from the requests
queue is called. If it's a regular message, we
get all listeners from the listeners
map and let them process the
received message.
Subscribe/Unsubscribe To Messages
Other methods that we need to implement according to the interface are
methods for subscribing and unsubscribing. These methods will modify the
listeners
map:
@Override public synchronized void registerMessageObserver(String messageType, OnDataReceivedListener listener) { List<OnDataReceivedListener> listenerList = listeners.computeIfAbsent(messageType, k -> new ArrayList<>()); listenerList.add(listener); } @Override public synchronized void unregisterMessageObserver(String messageType, OnDataReceivedListener listener) { List<OnDataReceivedListener> listenerList = listeners.get(messageType); if (listenerList == null) { return; } listenerList.remove(listener); }
When registering the listener, we use the computeIfAbsent()
method, which looks at the map and creates a value if there's no value under the
specified key.
Establishing the Connection
Finally, we come to the most important methods of the whole communicator.
Let's start implementing the connect()
method. We're going to use
the CompletableFuture
class for the first time:
@Override public CompletableFuture <Void> connect(String host, int port) { if (isConnected()) { throw new RuntimeException("The connection already exists."); } changeState(ConnectionState.CONNECTING); return CompletableFuture.supplyAsync(() -> { final Socket socket = new Socket(); try { socket.connect(new InetSocketAddress(host, port), 3000); return socket; } catch (IOException e) { return null; } }, ThreadPool.COMMON_EXECUTOR) .thenApplyAsync(socket -> { this.socket.set(socket); if (socket != null) { this.host.set(host); this.port.set(port); } else { changeState(ConnectionState.DISCONNECTED); this.host.set(null); this.port.set(-1); } if (socket == null) { throw new RuntimeException("Unable to create the connection."); } return null; }, ThreadPool.JAVAFX_EXECUTOR); }
The method is divided into logical parts again. In the first part we check
whether we're already connected to the server. If so, we throw an exception. We
don't even have to handle the RuntimeException
, it'll be just
written to our console. The important thing is the application won't close. The
changeState()
method tells the others that we're trying to connect
to the server.
In the second part of the method we create a future in which we try to
establish a connection with the server by calling the
socket.connect()
method. By the
ThreadPool.COMMON_EXECUTOR
constant we set the connection to a
separate thread. If we connect to the server successfully, we return the socket.
The method thenApplyAsync()
transforms the socket into a
result.
In the third part we store the socket by calling
this.socket.set(socket)
. This calls, among other things, the
changeListener
and creates/deletes the reader and writer threads.
The whole third part must be run on the JavaFX thread. That's because we'll
later bind graphical components to some of the observable constants, and we
already know these can only be updated in the JavaFX thread, otherwise an
exception will be thrown.
Terminating the Connection
We'll terminate the connection with a disconnect()
method. The
method's goal will be to terminate the reader/writer thread properly:
public CompletableFuture<Boolean> disconnect() { if (!isConnected()) { return CompletableFuture.completedFuture(false); } return CompletableFuture.supplyAsync(() -> { try { socket.get().close(); readerThread.shutdown(); writerThread.shutdown(); } catch (IOException e) { e.printStackTrace(); return false; } return true; }, ThreadPool.COMMON_EXECUTOR) .thenApplyAsync(success -> { if (success) { this.socket.set(null); changeState(ConnectionState.DISCONNECTED); } return success; }, ThreadPool.JAVAFX_EXECUTOR); }
If the connection is terminated successfully,
this.socket.set(null)
command removes the reader and writer threads
and the communicator enters the DISCONNECTED
state.
Sending Messages
We're going to send two types of messages:
- not waiting for the result
- waiting for the result
The method not waiting for the result will be very simple. It'll take the message, pass it to the writer thread, and just won't care anymore:
public synchronized void sendMessage(IMessage message) { if (writerThread != null) { writerThread.addMessageToQueue(message); } }
Sending a message waiting for a result has one problem that we need to solve. That is waiting for a response from the server:
public synchronized CompletableFuture<IMessage> sendMessageFuture(IMessage message) { return CompletableFuture.supplyAsync(() -> { sendMessage(message); return null; }) .thenCompose(ignored -> { Request request = new Request(); requests.add(request); return request.getFuture(); }); }
The method to send a message and get the response returns a future in which
the response will come. First the message is sent as usual and then the new
thenCompose()
method is called. This method basically says that we
get the result of the future from another CompletableFuture
class
instance. We get this other instance by calling the getFuture()
method on the Request
class instance, which we'll declare in a
moment.
Request-Response Messages
We'll create a utility class to ensure that we'll wait for a server response.
It'll be a Request
class, everything should suddenly make
sense:
class Request { private final Object lock = new Object(); private boolean waiting = true; private IMessage responce; CompletableFuture<IMessage> getFuture() { return CompletableFuture.supplyAsync(() -> { while (waiting) { synchronized (lock) { try { lock.wait(); } catch (InterruptedException ignored) {} } } return responce; }); } void onResponce(IMessage message) { this.responce = message; waiting = false; synchronized (lock) { lock.notify(); } } }
The class has only two methods: getFuture()
and
onResponce()
. The first method creates a future in which the thread
is put to sleep by calling the wait()
method. The only one who can
wake up this future is the onResponce()
method, which is called
when a response is received from the server. This simple trick creates the
impression of request-response communication.
Finally, we'll just add the implementation of the remaining methods, required by the interface:
@Override public ConnectionState getConnectionState() { return connectionState.get(); } @Override public ReadOnlyObjectProperty<ConnectionState> connectionStateProperty() { return connectionState.getReadOnlyProperty(); } @Override public String getConnectedServerName() { return connectedServerName.get(); }
That would be all for this lesson.
Next time, in the lesson Java Chat - Client - Server Connection Part 3, we'll connect to the server.