Finish core logic:

Finish NetMarshalServer core.
Fix up NetMarshalClient.
Allow FragmentationOptions to be duplicated.
Add CandidateClient methods and equality checking.
This commit is contained in:
Captain ALM 2023-05-20 14:49:21 +01:00
parent 4b139113fd
commit d17af203b9
Signed by: alfred
GPG Key ID: 4E4ADD02609997B1
4 changed files with 227 additions and 25 deletions

View File

@ -1,6 +1,7 @@
package com.captainalm.lib.calmnet.marshal;
import java.net.InetAddress;
import java.util.Objects;
/**
* This class provides a candidate client for {@link NetMarshalServer}s.
@ -26,9 +27,34 @@ public final class CandidateClient {
*
* @param address The remote address of the candidate.
* @param port The remote port of the candidate.
* @throws NullPointerException address is null.
*/
public CandidateClient(InetAddress address, int port) {
if (address == null) throw new NullPointerException("address is null");
this.address = address;
this.port = port;
}
/**
* Checks if this candidate matches an existing {@link NetMarshalClient}.
*
* @param toCheck The client to check against.
* @return If the candidate matches the passed client.
*/
public boolean matchesNetMarshalClient(NetMarshalClient toCheck) {
return toCheck.remoteAddress().equals(address) && toCheck.remotePort() == port;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof CandidateClient)) return false;
CandidateClient that = (CandidateClient) o;
return port == that.port && address.equals(that.address);
}
@Override
public int hashCode() {
return Objects.hash(address, port);
}
}

View File

@ -37,6 +37,23 @@ public final class FragmentationOptions {
*/
public boolean equalityVerifyFragments = false;
public FragmentationOptions() {}
/**
* Creates a copy of the provided FragmentationOptions.
*
* @param toCopy The options to copy.
* @throws NullPointerException toCopy is null.
*/
public FragmentationOptions(FragmentationOptions toCopy) {
if (toCopy == null) throw new NullPointerException("toCopy is null");
maximumFragmentAge = toCopy.maximumFragmentAge;
fragmentationSplitSize = toCopy.fragmentationSplitSize;
emptySendsTillForced = toCopy.emptySendsTillForced;
verifyFragments = toCopy.verifyFragments;
equalityVerifyFragments = toCopy.equalityVerifyFragments;
}
/**
* Validates the parameters within this structure.
*

View File

@ -33,6 +33,7 @@ import java.util.function.Consumer;
public class NetMarshalClient implements Closeable {
protected boolean running;
protected final Object slocksock = new Object();
protected Socket socket;
protected DatagramSocket dsocket;
protected InputStream inputStream;
@ -75,7 +76,7 @@ public class NetMarshalClient implements Closeable {
this.factory = factory;
if (loader == null) throw new NullPointerException("loader is null");
this.loader = loader;
this.fragmentationOptions = fragmentationOptions;
this.fragmentationOptions = (fragmentationOptions == null) ? null : new FragmentationOptions(fragmentationOptions);
if (fragmentationOptions == null) {
fragmentReceiver = null;
fragmentRMM = null;
@ -397,7 +398,7 @@ public class NetMarshalClient implements Closeable {
*/
public synchronized final void sendPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException {
if (packetIn == null) throw new NullPointerException("packetIn is null");
synchronized ((socket == null) ? dsocket : socket) {
synchronized (slocksock) {
if (fragmentationOptions == null || directSend) {
loader.writePacket(outputStream, packetIn, true);
} else {
@ -413,7 +414,7 @@ public class NetMarshalClient implements Closeable {
* @throws IOException A stream exception has occurred.
*/
public synchronized final void flush() throws IOException {
synchronized ((socket == null) ? dsocket : socket) {
synchronized (slocksock) {
outputStream.flush();
rootOutputStream.flush();
}
@ -516,8 +517,8 @@ public class NetMarshalClient implements Closeable {
if (context == null) throw new NullPointerException("context is null");
if (!disablePacketReading && Thread.currentThread() != receiveThread) throw new IllegalStateException("sslUpgrade methods should be called in a BiConsumer (for setReceiveBiConsumer) within the target NetMarshalClient" +
" or when reading packets (arePacketsBeingRead) is disabled on the NetMarshalClient");
Socket originalSocket = socket;
synchronized (originalSocket) {
synchronized (slocksock) {
Socket originalSocket = socket;
try {
socket = SSLUtilities.upgradeClientSocketToSSL(context, socket, remoteHostName, socket.getPort(), true, remoteHostName != null);
if (rootInputStream instanceof NetworkInputStream) ((NetworkInputStream) rootInputStream).setSocket(socket);
@ -642,7 +643,7 @@ public class NetMarshalClient implements Closeable {
}
}
} catch (InterruptedException | InterruptedIOException e) {
} catch (PacketException | IOException e) {
} catch (Exception e) {
if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this);
try {
close();
@ -674,7 +675,7 @@ public class NetMarshalClient implements Closeable {
slockReceive.notify();
}
}
synchronized ((socket == null) ? dsocket : socket) {
synchronized (slocksock) {
sendFragmentData();
}
} else {
@ -686,7 +687,7 @@ public class NetMarshalClient implements Closeable {
}
}
} catch (InterruptedException | InterruptedIOException e) {
} catch (PacketException | IOException e) {
} catch (Exception e) {
if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this);
try {
close();

View File

@ -6,13 +6,8 @@ import com.captainalm.lib.calmnet.packet.PacketLoader;
import com.captainalm.lib.calmnet.packet.factory.IPacketFactory;
import com.captainalm.lib.calmnet.stream.NetworkInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.PipedOutputStream;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.*;
import java.net.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -29,10 +24,12 @@ import java.util.function.Consumer;
public class NetMarshalServer implements Closeable {
protected boolean running;
protected final Object slocksock = new Object();
protected ServerSocket socket;
protected DatagramSocket dsocket;
protected final NetworkInputStream dInputStream;
protected final Map<NetMarshalClient, PipedOutputStream> outputs;
protected final Map<CandidateClient, PipedOutputStream> outputs;
protected final Object slockOutputs;
protected final List<NetMarshalClient> clients = new ArrayList<>();
protected BiConsumer<IPacket, NetMarshalClient> receiveBiConsumer;
@ -62,17 +59,23 @@ public class NetMarshalServer implements Closeable {
this.factory = factory;
if (loader == null) throw new NullPointerException("loader is null");
this.loader = loader;
this.fragmentationOptions = fragmentationOptions;
if (fragmentationOptions != null) fragmentationOptions.validate();
if (fragmentationOptions == null) {
this.fragmentationOptions = null;
} else {
this.fragmentationOptions = new FragmentationOptions(fragmentationOptions);
this.fragmentationOptions.validate();
}
if (dsock == null) {
dInputStream = null;
outputs = null;
slockOutputs = null;
acceptThread = new Thread(() -> {
while (running) acceptThreadExecutedSocket();
}, "thread_accept_" + localAddress.getHostAddress() + ":" + localPort);
} else {
dInputStream = new NetworkInputStream(dsock);
outputs = new HashMap<>();
slockOutputs = new Object();
acceptThread = new Thread(() -> {
while (running) acceptThreadExecutedDSocket();
}, "thread_accept_" + localAddress.getHostAddress() + ":" + localPort);
@ -178,7 +181,7 @@ public class NetMarshalServer implements Closeable {
* @return An array of connected clients.
*/
public synchronized final NetMarshalClient[] getConnectedClients() {
synchronized ((socket == null) ? dsocket : socket) {
synchronized (slocksock) {
return clients.toArray(new NetMarshalClient[0]);
}
}
@ -194,7 +197,7 @@ public class NetMarshalServer implements Closeable {
*/
public synchronized final void broadcastPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException {
if (packetIn == null) throw new NullPointerException("packetIn is null");
synchronized ((socket == null) ? dsocket : socket) {
synchronized (slocksock) {
for (NetMarshalClient c : clients)
if (c.isRunning()) c.sendPacket(packetIn, directSend);
}
@ -206,7 +209,7 @@ public class NetMarshalServer implements Closeable {
* @throws IOException A stream exception has occurred.
*/
public synchronized final void flush() throws IOException {
synchronized ((socket == null) ? dsocket : socket) {
synchronized (slocksock) {
for (NetMarshalClient c : clients)
if (c.isRunning()) c.flush();
}
@ -222,21 +225,79 @@ public class NetMarshalServer implements Closeable {
}
private void disconnectAllInternal() throws IOException {
synchronized ((socket == null) ? dsocket : socket) {
synchronized (slocksock) {
for (NetMarshalClient c : clients)
if (c.isRunning()) c.close();
}
}
protected NetMarshalClient generateClientSocket(Socket socketIn) {
return new NetMarshalClient(socketIn, factory, loader, fragmentationOptions);
}
protected NetMarshalClient generateClientDSocket(CandidateClient candidate,PipedInputStream inputStream) {
return new NetMarshalClient(dsocket, candidate.address, candidate.port, inputStream, factory, loader, fragmentationOptions);
}
protected void applyClientEvents(NetMarshalClient client) {
client.setReceiveBiConsumer(this::onClientReceive);
client.setReceiveExceptionBiConsumer(this::onClientReceiveException);
client.setClosedConsumer(this::onClientClose);
}
/**
* Connects to a remote endpoint.
*
* @param remoteAddress The remote address to connect to.
* @param remotePort The remote port to connect to.
* @param timeout The timeout of the connection attempt (0 for infinite timeout).
* @return A NetMarshalClient instance or null for non-accepted connection.
* @throws IOException A connection error has occurred.
*/
public synchronized final NetMarshalClient connect(InetAddress remoteAddress, int remotePort) throws IOException {
public synchronized final NetMarshalClient connect(InetAddress remoteAddress, int remotePort, int timeout) throws IOException {
if (remoteAddress == null) throw new NullPointerException("remoteAddress is null");
if (remotePort < 0) throw new IllegalArgumentException("remotePort is less than 0");
if (remotePort > 65535) throw new IllegalArgumentException("remotePort is greater than 65535");
CandidateClient candidateClient = new CandidateClient(remoteAddress, remotePort);
if (acceptanceBiConsumer != null) acceptanceBiConsumer.accept(candidateClient, this);
if (candidateClient.accept) {
NetMarshalClient found = null;
synchronized (slocksock) {
for (NetMarshalClient c : clients)
if (candidateClient.matchesNetMarshalClient(c)) {
found = c;
break;
}
if (found == null) {
if (socket == null) {
PipedInputStream inputPipe = new PipedInputStream(65535);
PipedOutputStream outputPipe = new PipedOutputStream(inputPipe);
found = generateClientDSocket(candidateClient, inputPipe);
synchronized (slockOutputs) {
outputs.put(candidateClient, outputPipe);
}
} else {
Socket clientSocket = new Socket();
clientSocket.connect(new InetSocketAddress(remoteAddress, remotePort), timeout);
found = generateClientSocket(clientSocket);
}
try {
applyClientEvents(found);
clients.add(found);
} catch (Exception e) {
clients.remove(found);
if (socket == null) {
synchronized (slockOutputs) {
outputs.remove(new CandidateClient(found.remoteAddress(), found.remotePort()));
}
}
throw e;
}
}
}
found.open();
return found;
}
return null;
}
@ -259,11 +320,108 @@ public class NetMarshalServer implements Closeable {
}
}
protected void acceptThreadExecutedSocket() {
protected void onClientReceive(IPacket packet, NetMarshalClient client) {
if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, client);
}
protected void onClientReceiveException(Exception e, NetMarshalClient client) {
if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, client);
}
protected void onClientClose(NetMarshalClient closed) {
synchronized (slocksock) {
clients.remove(closed);
if (socket == null) {
CandidateClient candidate = new CandidateClient(closed.remoteAddress(), closed.remotePort());
synchronized (slockOutputs) {
PipedOutputStream outputPipe = outputs.get(candidate);
if (outputPipe != null) {
try {
outputPipe.close();
} catch (IOException e) {
onClientReceiveException(e, closed);
}
}
outputs.remove(candidate);
}
}
}
if (closedConsumer != null) closedConsumer.accept(closed);
}
protected void acceptThreadExecutedSocket() {
try {
Socket clientSocket = socket.accept();
CandidateClient candidateClient = new CandidateClient(clientSocket.getInetAddress(), clientSocket.getPort());
try {
if (acceptanceBiConsumer != null) acceptanceBiConsumer.accept(candidateClient, this);
if (candidateClient.accept) {
NetMarshalClient client = generateClientSocket(clientSocket);
applyClientEvents(client);
synchronized (slocksock) {
clients.add(client);
}
client.open();
} else {
clientSocket.close();
}
} catch (Exception e) {
clientSocket.close();
throw e;
}
} catch (InterruptedIOException e) {
} catch (Exception e) {
if (acceptExceptionBiConsumer != null) acceptExceptionBiConsumer.accept(e, this);
}
}
protected void acceptThreadExecutedDSocket() {
try {
byte[] dPacket = dInputStream.readPacket();
CandidateClient candidateClient = new CandidateClient(dInputStream.getAddress(), dInputStream.getPort());
PipedOutputStream outputPipe;
synchronized (slockOutputs) {
outputPipe = outputs.get(candidateClient);
}
if (outputPipe == null) {
synchronized (slocksock) {
NetMarshalClient found = null;
for (NetMarshalClient c : clients)
if (candidateClient.matchesNetMarshalClient(c)) {
found = c;
break;
}
if (found == null) {
try {
if (acceptanceBiConsumer != null) acceptanceBiConsumer.accept(candidateClient, this);
if (candidateClient.accept) {
PipedInputStream inputPipe = new PipedInputStream(65535);
outputPipe = new PipedOutputStream(inputPipe);
synchronized (slockOutputs) {
outputs.put(candidateClient, outputPipe);
}
NetMarshalClient client = generateClientDSocket(candidateClient, inputPipe);
applyClientEvents(client);
clients.add(client);
client.open();
}
} catch (Exception e) {
synchronized (slockOutputs) {
outputs.remove(candidateClient);
}
throw e;
}
} else {
synchronized (slockOutputs) {
outputPipe = outputs.get(candidateClient);
}
}
}
}
if (outputPipe != null) outputPipe.write(dPacket);
} catch (InterruptedIOException e) {
} catch (Exception e) {
if (acceptExceptionBiConsumer != null) acceptExceptionBiConsumer.accept(e, this);
}
}
}