From d17af203b955ab64d10c82d4da9edea050d93635 Mon Sep 17 00:00:00 2001 From: Captain ALM Date: Sat, 20 May 2023 14:49:21 +0100 Subject: [PATCH] Finish core logic: Finish NetMarshalServer core. Fix up NetMarshalClient. Allow FragmentationOptions to be duplicated. Add CandidateClient methods and equality checking. --- .../lib/calmnet/marshal/CandidateClient.java | 26 +++ .../calmnet/marshal/FragmentationOptions.java | 17 ++ .../lib/calmnet/marshal/NetMarshalClient.java | 17 +- .../lib/calmnet/marshal/NetMarshalServer.java | 192 ++++++++++++++++-- 4 files changed, 227 insertions(+), 25 deletions(-) diff --git a/src/com/captainalm/lib/calmnet/marshal/CandidateClient.java b/src/com/captainalm/lib/calmnet/marshal/CandidateClient.java index e57860a..44eef9f 100644 --- a/src/com/captainalm/lib/calmnet/marshal/CandidateClient.java +++ b/src/com/captainalm/lib/calmnet/marshal/CandidateClient.java @@ -1,6 +1,7 @@ package com.captainalm.lib.calmnet.marshal; import java.net.InetAddress; +import java.util.Objects; /** * This class provides a candidate client for {@link NetMarshalServer}s. @@ -26,9 +27,34 @@ public final class CandidateClient { * * @param address The remote address of the candidate. * @param port The remote port of the candidate. + * @throws NullPointerException address is null. */ public CandidateClient(InetAddress address, int port) { + if (address == null) throw new NullPointerException("address is null"); this.address = address; this.port = port; } + + /** + * Checks if this candidate matches an existing {@link NetMarshalClient}. + * + * @param toCheck The client to check against. + * @return If the candidate matches the passed client. + */ + public boolean matchesNetMarshalClient(NetMarshalClient toCheck) { + return toCheck.remoteAddress().equals(address) && toCheck.remotePort() == port; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CandidateClient)) return false; + CandidateClient that = (CandidateClient) o; + return port == that.port && address.equals(that.address); + } + + @Override + public int hashCode() { + return Objects.hash(address, port); + } } diff --git a/src/com/captainalm/lib/calmnet/marshal/FragmentationOptions.java b/src/com/captainalm/lib/calmnet/marshal/FragmentationOptions.java index a7b2ddc..c1aef87 100644 --- a/src/com/captainalm/lib/calmnet/marshal/FragmentationOptions.java +++ b/src/com/captainalm/lib/calmnet/marshal/FragmentationOptions.java @@ -37,6 +37,23 @@ public final class FragmentationOptions { */ public boolean equalityVerifyFragments = false; + public FragmentationOptions() {} + + /** + * Creates a copy of the provided FragmentationOptions. + * + * @param toCopy The options to copy. + * @throws NullPointerException toCopy is null. + */ + public FragmentationOptions(FragmentationOptions toCopy) { + if (toCopy == null) throw new NullPointerException("toCopy is null"); + maximumFragmentAge = toCopy.maximumFragmentAge; + fragmentationSplitSize = toCopy.fragmentationSplitSize; + emptySendsTillForced = toCopy.emptySendsTillForced; + verifyFragments = toCopy.verifyFragments; + equalityVerifyFragments = toCopy.equalityVerifyFragments; + } + /** * Validates the parameters within this structure. * diff --git a/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java b/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java index 58eb13c..b41f2c7 100644 --- a/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java +++ b/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java @@ -33,6 +33,7 @@ import java.util.function.Consumer; public class NetMarshalClient implements Closeable { protected boolean running; + protected final Object slocksock = new Object(); protected Socket socket; protected DatagramSocket dsocket; protected InputStream inputStream; @@ -75,7 +76,7 @@ public class NetMarshalClient implements Closeable { this.factory = factory; if (loader == null) throw new NullPointerException("loader is null"); this.loader = loader; - this.fragmentationOptions = fragmentationOptions; + this.fragmentationOptions = (fragmentationOptions == null) ? null : new FragmentationOptions(fragmentationOptions); if (fragmentationOptions == null) { fragmentReceiver = null; fragmentRMM = null; @@ -397,7 +398,7 @@ public class NetMarshalClient implements Closeable { */ public synchronized final void sendPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException { if (packetIn == null) throw new NullPointerException("packetIn is null"); - synchronized ((socket == null) ? dsocket : socket) { + synchronized (slocksock) { if (fragmentationOptions == null || directSend) { loader.writePacket(outputStream, packetIn, true); } else { @@ -413,7 +414,7 @@ public class NetMarshalClient implements Closeable { * @throws IOException A stream exception has occurred. */ public synchronized final void flush() throws IOException { - synchronized ((socket == null) ? dsocket : socket) { + synchronized (slocksock) { outputStream.flush(); rootOutputStream.flush(); } @@ -516,8 +517,8 @@ public class NetMarshalClient implements Closeable { if (context == null) throw new NullPointerException("context is null"); if (!disablePacketReading && Thread.currentThread() != receiveThread) throw new IllegalStateException("sslUpgrade methods should be called in a BiConsumer (for setReceiveBiConsumer) within the target NetMarshalClient" + " or when reading packets (arePacketsBeingRead) is disabled on the NetMarshalClient"); - Socket originalSocket = socket; - synchronized (originalSocket) { + synchronized (slocksock) { + Socket originalSocket = socket; try { socket = SSLUtilities.upgradeClientSocketToSSL(context, socket, remoteHostName, socket.getPort(), true, remoteHostName != null); if (rootInputStream instanceof NetworkInputStream) ((NetworkInputStream) rootInputStream).setSocket(socket); @@ -642,7 +643,7 @@ public class NetMarshalClient implements Closeable { } } } catch (InterruptedException | InterruptedIOException e) { - } catch (PacketException | IOException e) { + } catch (Exception e) { if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this); try { close(); @@ -674,7 +675,7 @@ public class NetMarshalClient implements Closeable { slockReceive.notify(); } } - synchronized ((socket == null) ? dsocket : socket) { + synchronized (slocksock) { sendFragmentData(); } } else { @@ -686,7 +687,7 @@ public class NetMarshalClient implements Closeable { } } } catch (InterruptedException | InterruptedIOException e) { - } catch (PacketException | IOException e) { + } catch (Exception e) { if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this); try { close(); diff --git a/src/com/captainalm/lib/calmnet/marshal/NetMarshalServer.java b/src/com/captainalm/lib/calmnet/marshal/NetMarshalServer.java index 1c561a0..df5d29b 100644 --- a/src/com/captainalm/lib/calmnet/marshal/NetMarshalServer.java +++ b/src/com/captainalm/lib/calmnet/marshal/NetMarshalServer.java @@ -6,13 +6,8 @@ import com.captainalm.lib.calmnet.packet.PacketLoader; import com.captainalm.lib.calmnet.packet.factory.IPacketFactory; import com.captainalm.lib.calmnet.stream.NetworkInputStream; -import java.io.Closeable; -import java.io.IOException; -import java.io.PipedOutputStream; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; +import java.io.*; +import java.net.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -29,10 +24,12 @@ import java.util.function.Consumer; public class NetMarshalServer implements Closeable { protected boolean running; + protected final Object slocksock = new Object(); protected ServerSocket socket; protected DatagramSocket dsocket; protected final NetworkInputStream dInputStream; - protected final Map outputs; + protected final Map outputs; + protected final Object slockOutputs; protected final List clients = new ArrayList<>(); protected BiConsumer receiveBiConsumer; @@ -62,17 +59,23 @@ public class NetMarshalServer implements Closeable { this.factory = factory; if (loader == null) throw new NullPointerException("loader is null"); this.loader = loader; - this.fragmentationOptions = fragmentationOptions; - if (fragmentationOptions != null) fragmentationOptions.validate(); + if (fragmentationOptions == null) { + this.fragmentationOptions = null; + } else { + this.fragmentationOptions = new FragmentationOptions(fragmentationOptions); + this.fragmentationOptions.validate(); + } if (dsock == null) { dInputStream = null; outputs = null; + slockOutputs = null; acceptThread = new Thread(() -> { while (running) acceptThreadExecutedSocket(); }, "thread_accept_" + localAddress.getHostAddress() + ":" + localPort); } else { dInputStream = new NetworkInputStream(dsock); outputs = new HashMap<>(); + slockOutputs = new Object(); acceptThread = new Thread(() -> { while (running) acceptThreadExecutedDSocket(); }, "thread_accept_" + localAddress.getHostAddress() + ":" + localPort); @@ -178,7 +181,7 @@ public class NetMarshalServer implements Closeable { * @return An array of connected clients. */ public synchronized final NetMarshalClient[] getConnectedClients() { - synchronized ((socket == null) ? dsocket : socket) { + synchronized (slocksock) { return clients.toArray(new NetMarshalClient[0]); } } @@ -194,7 +197,7 @@ public class NetMarshalServer implements Closeable { */ public synchronized final void broadcastPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException { if (packetIn == null) throw new NullPointerException("packetIn is null"); - synchronized ((socket == null) ? dsocket : socket) { + synchronized (slocksock) { for (NetMarshalClient c : clients) if (c.isRunning()) c.sendPacket(packetIn, directSend); } @@ -206,7 +209,7 @@ public class NetMarshalServer implements Closeable { * @throws IOException A stream exception has occurred. */ public synchronized final void flush() throws IOException { - synchronized ((socket == null) ? dsocket : socket) { + synchronized (slocksock) { for (NetMarshalClient c : clients) if (c.isRunning()) c.flush(); } @@ -222,21 +225,79 @@ public class NetMarshalServer implements Closeable { } private void disconnectAllInternal() throws IOException { - synchronized ((socket == null) ? dsocket : socket) { + synchronized (slocksock) { for (NetMarshalClient c : clients) if (c.isRunning()) c.close(); } } + protected NetMarshalClient generateClientSocket(Socket socketIn) { + return new NetMarshalClient(socketIn, factory, loader, fragmentationOptions); + } + + protected NetMarshalClient generateClientDSocket(CandidateClient candidate,PipedInputStream inputStream) { + return new NetMarshalClient(dsocket, candidate.address, candidate.port, inputStream, factory, loader, fragmentationOptions); + } + + protected void applyClientEvents(NetMarshalClient client) { + client.setReceiveBiConsumer(this::onClientReceive); + client.setReceiveExceptionBiConsumer(this::onClientReceiveException); + client.setClosedConsumer(this::onClientClose); + } + /** * Connects to a remote endpoint. * * @param remoteAddress The remote address to connect to. * @param remotePort The remote port to connect to. + * @param timeout The timeout of the connection attempt (0 for infinite timeout). * @return A NetMarshalClient instance or null for non-accepted connection. * @throws IOException A connection error has occurred. */ - public synchronized final NetMarshalClient connect(InetAddress remoteAddress, int remotePort) throws IOException { + public synchronized final NetMarshalClient connect(InetAddress remoteAddress, int remotePort, int timeout) throws IOException { + if (remoteAddress == null) throw new NullPointerException("remoteAddress is null"); + if (remotePort < 0) throw new IllegalArgumentException("remotePort is less than 0"); + if (remotePort > 65535) throw new IllegalArgumentException("remotePort is greater than 65535"); + CandidateClient candidateClient = new CandidateClient(remoteAddress, remotePort); + if (acceptanceBiConsumer != null) acceptanceBiConsumer.accept(candidateClient, this); + if (candidateClient.accept) { + NetMarshalClient found = null; + synchronized (slocksock) { + for (NetMarshalClient c : clients) + if (candidateClient.matchesNetMarshalClient(c)) { + found = c; + break; + } + if (found == null) { + if (socket == null) { + PipedInputStream inputPipe = new PipedInputStream(65535); + PipedOutputStream outputPipe = new PipedOutputStream(inputPipe); + found = generateClientDSocket(candidateClient, inputPipe); + synchronized (slockOutputs) { + outputs.put(candidateClient, outputPipe); + } + } else { + Socket clientSocket = new Socket(); + clientSocket.connect(new InetSocketAddress(remoteAddress, remotePort), timeout); + found = generateClientSocket(clientSocket); + } + try { + applyClientEvents(found); + clients.add(found); + } catch (Exception e) { + clients.remove(found); + if (socket == null) { + synchronized (slockOutputs) { + outputs.remove(new CandidateClient(found.remoteAddress(), found.remotePort())); + } + } + throw e; + } + } + } + found.open(); + return found; + } return null; } @@ -259,11 +320,108 @@ public class NetMarshalServer implements Closeable { } } - protected void acceptThreadExecutedSocket() { + protected void onClientReceive(IPacket packet, NetMarshalClient client) { + if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, client); + } + protected void onClientReceiveException(Exception e, NetMarshalClient client) { + if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, client); + } + + protected void onClientClose(NetMarshalClient closed) { + synchronized (slocksock) { + clients.remove(closed); + if (socket == null) { + CandidateClient candidate = new CandidateClient(closed.remoteAddress(), closed.remotePort()); + synchronized (slockOutputs) { + PipedOutputStream outputPipe = outputs.get(candidate); + if (outputPipe != null) { + try { + outputPipe.close(); + } catch (IOException e) { + onClientReceiveException(e, closed); + } + } + outputs.remove(candidate); + } + } + } + if (closedConsumer != null) closedConsumer.accept(closed); + } + + protected void acceptThreadExecutedSocket() { + try { + Socket clientSocket = socket.accept(); + CandidateClient candidateClient = new CandidateClient(clientSocket.getInetAddress(), clientSocket.getPort()); + try { + if (acceptanceBiConsumer != null) acceptanceBiConsumer.accept(candidateClient, this); + if (candidateClient.accept) { + NetMarshalClient client = generateClientSocket(clientSocket); + applyClientEvents(client); + synchronized (slocksock) { + clients.add(client); + } + client.open(); + } else { + clientSocket.close(); + } + } catch (Exception e) { + clientSocket.close(); + throw e; + } + } catch (InterruptedIOException e) { + } catch (Exception e) { + if (acceptExceptionBiConsumer != null) acceptExceptionBiConsumer.accept(e, this); + } } protected void acceptThreadExecutedDSocket() { - + try { + byte[] dPacket = dInputStream.readPacket(); + CandidateClient candidateClient = new CandidateClient(dInputStream.getAddress(), dInputStream.getPort()); + PipedOutputStream outputPipe; + synchronized (slockOutputs) { + outputPipe = outputs.get(candidateClient); + } + if (outputPipe == null) { + synchronized (slocksock) { + NetMarshalClient found = null; + for (NetMarshalClient c : clients) + if (candidateClient.matchesNetMarshalClient(c)) { + found = c; + break; + } + if (found == null) { + try { + if (acceptanceBiConsumer != null) acceptanceBiConsumer.accept(candidateClient, this); + if (candidateClient.accept) { + PipedInputStream inputPipe = new PipedInputStream(65535); + outputPipe = new PipedOutputStream(inputPipe); + synchronized (slockOutputs) { + outputs.put(candidateClient, outputPipe); + } + NetMarshalClient client = generateClientDSocket(candidateClient, inputPipe); + applyClientEvents(client); + clients.add(client); + client.open(); + } + } catch (Exception e) { + synchronized (slockOutputs) { + outputs.remove(candidateClient); + } + throw e; + } + } else { + synchronized (slockOutputs) { + outputPipe = outputs.get(candidateClient); + } + } + } + } + if (outputPipe != null) outputPipe.write(dPacket); + } catch (InterruptedIOException e) { + } catch (Exception e) { + if (acceptExceptionBiConsumer != null) acceptExceptionBiConsumer.accept(e, this); + } } }