From 41ed32f3afa67ad95b2bee6376de68d3b1d4aade Mon Sep 17 00:00:00 2001 From: Captain ALM Date: Fri, 19 May 2023 20:17:56 +0100 Subject: [PATCH] Add fragmentation support to NetMarshalClient, fix up NetMarshalClient. --- .../calmnet/marshal/FragmentationOptions.java | 74 +++++ .../lib/calmnet/marshal/NetMarshalClient.java | 267 ++++++++++++++++-- .../marshal/NetMarshalClientWrapped.java | 26 +- .../lib/calmnet/marshal/package-info.java | 1 - .../packet/fragment/FragmentReceiver.java | 23 ++ .../packet/fragment/FragmentSender.java | 12 + 6 files changed, 368 insertions(+), 35 deletions(-) create mode 100644 src/com/captainalm/lib/calmnet/marshal/FragmentationOptions.java diff --git a/src/com/captainalm/lib/calmnet/marshal/FragmentationOptions.java b/src/com/captainalm/lib/calmnet/marshal/FragmentationOptions.java new file mode 100644 index 0000000..d5036f1 --- /dev/null +++ b/src/com/captainalm/lib/calmnet/marshal/FragmentationOptions.java @@ -0,0 +1,74 @@ +package com.captainalm.lib.calmnet.marshal; + +import com.captainalm.lib.calmnet.packet.fragment.FragmentReceiver; +import com.captainalm.lib.calmnet.packet.fragment.FragmentSender; + +/** + * This class provides fragmentation options for using {@link FragmentSender}s and + * {@link FragmentReceiver}s in this package. + * + * @author Captain ALM + */ +public final class FragmentationOptions { + /** + * The maximum age of fragments for a specified packet in seconds before those fragments are purged. + */ + public int maximumFragmentAge = 30; + /** + * See: + * {@link FragmentSender#setSplitSize(int)} + */ + public int fragmentationSplitSize = 496; + /** + * See: + * {@link FragmentReceiver#setNumberOfEmptySendsTillForcedCompleteOrResend(int)} + */ + public int emptySendsTillForced = 2; + /** + * See: + * {@link FragmentSender#setResponseVerification(boolean)} , + * {@link FragmentReceiver#setResponseVerification(boolean)} + */ + public boolean verifyFragments = false; + /** + * See: + * {@link FragmentSender#setSentDataWillBeAllVerified(boolean)} , + * {@link FragmentReceiver#setSentDataWillBeAllVerified(boolean)} + */ + public boolean equalityVerifyFragments = false; + + /** + * Validates the parameters within this structure. + * + * @throws IllegalArgumentException maximumFragmentAge is less than 2, fragmentationSplitSize is less than 1 or emptySendsTillForced is less than 1. + */ + public void validate() { + if (maximumFragmentAge < 2) throw new IllegalArgumentException("maximumFragmentAge is less than 2"); + if (fragmentationSplitSize < 1) throw new IllegalArgumentException("fragmentationSplitSize is less than 1"); + if (emptySendsTillForced < 1) throw new IllegalArgumentException("emptySendsTillForced is less than 1"); + } + + /** + * Sets-up the provided {@link FragmentSender} with parameters. + * + * @param sender The sender to set up. + * @throws IllegalArgumentException A parameter is incorrect. + */ + public void setupSender(FragmentSender sender) { + sender.setSplitSize(fragmentationSplitSize); + sender.setResponseVerification(verifyFragments); + sender.setSentDataWillBeAllVerified(equalityVerifyFragments); + } + + /** + * Sets-up the provided {@link FragmentReceiver} with parameters. + * + * @param receiver The receiver to set up. + * @throws IllegalArgumentException A parameter is incorrect. + */ + public void setupReceiver(FragmentReceiver receiver) { + receiver.setNumberOfEmptySendsTillForcedCompleteOrResend(emptySendsTillForced); + receiver.setResponseVerification(verifyFragments); + receiver.setSentDataWillBeAllVerified(equalityVerifyFragments); + } +} diff --git a/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java b/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java index af7c950..b51bd0e 100644 --- a/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java +++ b/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java @@ -4,6 +4,7 @@ import com.captainalm.lib.calmnet.packet.IPacket; import com.captainalm.lib.calmnet.packet.PacketException; import com.captainalm.lib.calmnet.packet.PacketLoader; import com.captainalm.lib.calmnet.packet.factory.IPacketFactory; +import com.captainalm.lib.calmnet.packet.fragment.*; import com.captainalm.lib.calmnet.ssl.SSLUtilities; import com.captainalm.lib.calmnet.ssl.SSLUtilityException; import com.captainalm.lib.calmnet.stream.NetworkInputStream; @@ -16,9 +17,12 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.MulticastSocket; import java.net.Socket; +import java.time.LocalDateTime; +import java.util.HashMap; import java.util.LinkedList; import java.util.Queue; import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * This class provides a managed way of networking on the client side. @@ -27,7 +31,7 @@ import java.util.function.BiConsumer; * @author Captain ALM */ public class NetMarshalClient implements Closeable { - protected boolean running = true; + protected boolean running; protected Socket socket; protected DatagramSocket dsocket; @@ -37,6 +41,7 @@ public class NetMarshalClient implements Closeable { protected OutputStream rootOutputStream; protected BiConsumer receiveBiConsumer; protected BiConsumer receiveExceptionBiConsumer; + protected Consumer closedConsumer; protected final Object slockPacketRead = new Object(); protected boolean disablePacketReading; @@ -50,7 +55,16 @@ public class NetMarshalClient implements Closeable { protected final Queue receivedPackets = new LinkedList<>(); protected final Object slockReceive = new Object(); - private NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull) { + protected final FragmentationOptions fragmentationOptions; + protected final FragmentReceiver fragmentReceiver; + protected final HashMap fragmentRMM; + protected final FragmentSender fragmentSender; + protected final HashMap fragmentSMM; + protected final Thread fragmentMonitorThread; + protected final Thread fragmentFinishReceiveMonitorThread; + protected final Thread fragmentFinishSendMonitorThread; + + private NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull, FragmentationOptions fragmentationOptions) { if (isSocketNull) throw new NullPointerException("socketIn is null"); if (remoteAddress == null) throw new NullPointerException(((isMulticast) ? "multicastGroupAddress" : "remoteAddress") + " is null"); this.remoteAddress = remoteAddress; @@ -61,27 +75,102 @@ public class NetMarshalClient implements Closeable { this.factory = factory; if (loader == null) throw new NullPointerException("loader is null"); this.loader = loader; - receiveThread = new Thread(() -> { - while (running) receiveThreadExecuted(); - }, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort); + this.fragmentationOptions = fragmentationOptions; + if (fragmentationOptions == null) { + fragmentReceiver = null; + fragmentRMM = null; + fragmentSender = null; + fragmentSMM = null; + fragmentMonitorThread = null; + fragmentFinishReceiveMonitorThread = null; + fragmentFinishSendMonitorThread = null; + receiveThread = new Thread(() -> { + while (running) receiveThreadExecuted(); + }, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort); + } else { + fragmentationOptions.validate(); + fragmentReceiver = new FragmentReceiver(loader, factory); + fragmentationOptions.setupReceiver(fragmentReceiver); + fragmentRMM = new HashMap<>(); + fragmentSender = new FragmentSender(loader); + fragmentationOptions.setupSender(fragmentSender); + fragmentSMM = new HashMap<>(); + fragmentMonitorThread = new Thread(() -> { + int ageCheckTime = this.fragmentationOptions.maximumFragmentAge - 1; + while (running) { + int id = -1; + synchronized (this.fragmentationOptions) { + for (int c : fragmentRMM.keySet()) { + if (!fragmentRMM.get(c).plusSeconds(ageCheckTime).isAfter(LocalDateTime.now())) { + fragmentRMM.remove(id); + fragmentReceiver.deletePacketFromRegistry(c); + } + } + for (int c : fragmentSMM.keySet()) { + if (!fragmentSMM.get(c).plusSeconds(ageCheckTime).isAfter(LocalDateTime.now())) { + fragmentSMM.remove(id); + fragmentSender.deletePacketFromRegistry(c); + } + } + } + try { + Thread.sleep(this.fragmentationOptions.maximumFragmentAge); + } catch (InterruptedException e) { + } + } + fragmentReceiver.clearRegistry(); + fragmentSender.clearRegistry(); + }, "thread_frag_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort); + fragmentFinishReceiveMonitorThread = new Thread(() -> { + while (running) { + int id = -1; + try { + while ((id = fragmentReceiver.getLastIDFinished()) != -1) synchronized (this.fragmentationOptions) { + fragmentRMM.remove(id); + } + } catch (InterruptedException e) { + } + } + fragmentReceiver.clearLastIDFinished(); + }, "thread_frag_fin_recv_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort); + fragmentFinishSendMonitorThread = new Thread(() -> { + while (running) { + int id = -1; + try { + while ((id = fragmentSender.getLastIDFinished()) != -1) synchronized (this.fragmentationOptions) { + fragmentSMM.remove(id); + } + } catch (InterruptedException e) { + } + } + fragmentSender.clearLastIDFinished(); + }, "thread_frag_fin_recv_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort); + receiveThread = new Thread(() -> { + while (running) receiveThreadExecutedWithFragmentation(); + fragmentReceiver.clearWaitingPackets(); + fragmentSender.clearWaitingPackets(); + }, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort); + } } /** - * Constructs a new NetMarshalClient with the specified {@link Socket}, {@link IPacketFactory} and {@link PacketLoader}. + * Constructs a new NetMarshalClient with the specified {@link Socket}, {@link IPacketFactory}, {@link PacketLoader} and {@link FragmentationOptions}. * * @param socketIn The socket to use. * @param factory The packet factory to use. * @param loader The packet loader to use. + * @param fragmentationOptions The fragmentation options, null to disable fragmentation. * @throws NullPointerException socketIn, factory or loader is null. + * @throws IllegalArgumentException Fragmentation options failed validation. */ - public NetMarshalClient(Socket socketIn, IPacketFactory factory, PacketLoader loader) { - this((socketIn == null) ? null : socketIn.getInetAddress(), (socketIn == null) ? -1 : socketIn.getPort(), factory, loader, false, socketIn == null); + public NetMarshalClient(Socket socketIn, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions) { + this((socketIn == null) ? null : socketIn.getInetAddress(), (socketIn == null) ? -1 : socketIn.getPort(), factory, loader, false, socketIn == null, fragmentationOptions); socket = socketIn; setStreams(new NetworkInputStream(socketIn), new NetworkOutputStream(socketIn)); } /** - * Constructs a new NetMarshalClient with the specified {@link MulticastSocket}, multicast group {@link InetAddress}, multicast port, {@link IPacketFactory} and {@link PacketLoader}. + * Constructs a new NetMarshalClient with the specified {@link MulticastSocket}, multicast group {@link InetAddress}, multicast port, {@link IPacketFactory}, {@link PacketLoader} and {@link FragmentationOptions}. * The {@link MulticastSocket} will join the multicast group. * * @param socketIn The multicast socket to use. @@ -89,12 +178,13 @@ public class NetMarshalClient implements Closeable { * @param multicastGroupPort The multicast group port. * @param factory The packet factory to use. * @param loader The packet loader to use. + * @param fragmentationOptions The fragmentation options, null to disable fragmentation. * @throws IOException There is an error joining or multicastGroupAddress is not a multicast address. * @throws NullPointerException socketIn, multicastGroupAddress, factory or loader is null. - * @throws IllegalArgumentException multicastGroupPort is less than 0 or greater than 65535. + * @throws IllegalArgumentException multicastGroupPort is less than 0 or greater than 65535 or fragmentation options failed validation. */ - public NetMarshalClient(MulticastSocket socketIn, InetAddress multicastGroupAddress, int multicastGroupPort, IPacketFactory factory, PacketLoader loader) throws IOException { - this(multicastGroupAddress, multicastGroupPort, factory, loader, true, socketIn == null); + public NetMarshalClient(MulticastSocket socketIn, InetAddress multicastGroupAddress, int multicastGroupPort, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions) throws IOException { + this(multicastGroupAddress, multicastGroupPort, factory, loader, true, socketIn == null, fragmentationOptions); socketIn.joinGroup(multicastGroupAddress); NetworkOutputStream netOut = new NetworkOutputStream(socketIn, 65535); netOut.setDatagramTarget(multicastGroupAddress, multicastGroupPort); @@ -102,7 +192,7 @@ public class NetMarshalClient implements Closeable { } /** - * Constructs a new NetMarshalClient with the specified {@link DatagramSocket}, remote {@link InetAddress}, remote port, {@link IPacketFactory} and {@link PacketLoader}. + * Constructs a new NetMarshalClient with the specified {@link DatagramSocket}, remote {@link InetAddress}, remote port, {@link IPacketFactory}, {@link PacketLoader} and {@link FragmentationOptions}. * * @param socketIn The datagram socket to use. * @param remoteAddress The remote address to send data to. @@ -110,13 +200,14 @@ public class NetMarshalClient implements Closeable { * @param inputStream The receiving input stream. * @param factory The packet factory to use. * @param loader The loader to use. + * @param fragmentationOptions The fragmentation options, null to disable fragmentation. * @throws NullPointerException socketIn, remoteAddress, inputStream, factory or loader is null. - * @throws IllegalArgumentException remotePort is less than 0 or greater than 65535. + * @throws IllegalArgumentException remotePort is less than 0 or greater than 65535 or fragmentation options failed validation. */ - public NetMarshalClient(DatagramSocket socketIn, InetAddress remoteAddress, int remotePort, InputStream inputStream, IPacketFactory factory, PacketLoader loader) { - this(remoteAddress, remotePort, factory, loader, false, socketIn == null); + public NetMarshalClient(DatagramSocket socketIn, InetAddress remoteAddress, int remotePort, InputStream inputStream, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions) { + this(remoteAddress, remotePort, factory, loader, false, socketIn == null, fragmentationOptions); if (inputStream == null) throw new NullPointerException("inputStream is null"); - setStreams(null, new NetworkOutputStream(socketIn, 65535)); + setStreams(null, new NetworkOutputStream(socketIn, 65535, remoteAddress, remotePort)); rootInputStream = inputStream; this.inputStream = inputStream; } @@ -128,6 +219,45 @@ public class NetMarshalClient implements Closeable { this.outputStream = rootOutputStream; } + protected void updateMState(HashMap mm, IPacket packet) { + if (packet == null || !packet.isValid()) return; + synchronized (fragmentationOptions) { + if (packet instanceof FragmentAllocationPacket) { + mm.put(((FragmentAllocationPacket) packet).getPacketID(), LocalDateTime.now()); + } else if (packet instanceof FragmentPIDPacket && !(packet instanceof FragmentSendStopPacket)) { + if (mm.containsKey(((FragmentPIDPacket) packet).getPacketID())) + mm.put(((FragmentPIDPacket) packet).getPacketID(), LocalDateTime.now()); + } + } + } + + protected void sendFragmentData() throws PacketException, IOException { + IPacket[] packets = fragmentSender.sendPacket(); + for (IPacket c : packets) if (c != null) { + updateMState(fragmentSMM, c); + loader.writePacket(outputStream, c, true); + } + packets = fragmentReceiver.sendPacket(); + for (IPacket c : packets) if (c != null) { + updateMState(fragmentRMM, c); + loader.writePacket(outputStream, c, true); + } + } + + /** + * Opens the marshal. + */ + public synchronized final void open() { + if (running) return; + running = true; + if (fragmentationOptions != null) { + fragmentMonitorThread.start(); + fragmentFinishReceiveMonitorThread.start(); + fragmentFinishSendMonitorThread.start(); + } + receiveThread.start(); + } + /** * Get the input stream. * @@ -209,6 +339,15 @@ public class NetMarshalClient implements Closeable { return running; } + /** + * Gets the {@link FragmentationOptions} of the client. + * + * @return The fragmentation options or null if fragmentation is disabled. + */ + public FragmentationOptions getFragmentationOptions() { + return fragmentationOptions; + } + /** * Gets if the marshal is ssl upgraded. * @@ -223,14 +362,20 @@ public class NetMarshalClient implements Closeable { * Sends a {@link IPacket}. * * @param packetIn The packet to send. + * @param directSend Whether the packet should be sent directly or through the fragmentation system. * @throws IOException A stream exception has occurred. * @throws PacketException An exception has occurred. * @throws NullPointerException packetIn is null. */ - public synchronized final void sendPacket(IPacket packetIn) throws IOException, PacketException { + public synchronized final void sendPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException { if (packetIn == null) throw new NullPointerException("packetIn is null"); synchronized ((socket == null) ? dsocket : socket) { - loader.writePacket(outputStream, packetIn, true); + if (fragmentationOptions == null || directSend) { + loader.writePacket(outputStream, packetIn, true); + } else { + fragmentSender.sendPacket(packetIn); + } + if (fragmentationOptions != null) sendFragmentData(); } } @@ -381,6 +526,26 @@ public class NetMarshalClient implements Closeable { receiveExceptionBiConsumer = consumer; } + /** + * Gets the {@link Consumer} closed consumer. + * + * @return The closed or null. + */ + public Consumer getClosedConsumer() { + return closedConsumer; + } + + /** + * Sets the {@link Consumer} closed consumer. + * + * @param consumer The new closed consumer. + * @throws NullPointerException consumer is null. + */ + public void setClosedConsumer(Consumer consumer) { + if (consumer == null) throw new NullPointerException("consumer is null"); + closedConsumer = consumer; + } + /** * Closes the marshal, closing all its streams. * @@ -391,11 +556,23 @@ public class NetMarshalClient implements Closeable { if (running) { running = false; if (Thread.currentThread() != receiveThread) receiveThread.interrupt(); + if (fragmentationOptions != null) { + fragmentMonitorThread.interrupt(); + fragmentFinishReceiveMonitorThread.interrupt(); + fragmentFinishSendMonitorThread.interrupt(); + } receivedPackets.clear(); - inputStream.close(); - outputStream.close(); - socket = null; - dsocket = null; + try { + inputStream.close(); + } finally { + try { + outputStream.close(); + } finally { + socket = null; + dsocket = null; + if (closedConsumer != null) closedConsumer.accept(this); + } + } } } @@ -423,4 +600,48 @@ public class NetMarshalClient implements Closeable { } } } + + protected void receiveThreadExecutedWithFragmentation() { + try { + synchronized (slockPacketRead) { + while (disablePacketReading) slockPacketRead.wait(); + } + IPacket packet = loader.readStreamedPacket(inputStream, factory, null); + synchronized (slockPacketRead) { + if (packet == null || !packet.isValid()) return; + updateMState(fragmentRMM, packet); + boolean packetUsed = fragmentReceiver.receivePacket(packet); + updateMState(fragmentSMM, packet); + packetUsed = fragmentSender.receivePacket(packet) || packetUsed; + if (packetUsed) { + while (fragmentReceiver.arePacketsWaiting()) { + packet = fragmentReceiver.receivePacketPolling(); + if (packet == null || !packet.isValid()) continue; + if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this); + synchronized (slockReceive) { + receivedPackets.add(packet); + slockReceive.notify(); + } + } + synchronized ((socket == null) ? dsocket : socket) { + sendFragmentData(); + } + } else { + if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this); + synchronized (slockReceive) { + receivedPackets.add(packet); + slockReceive.notify(); + } + } + } + } catch (InterruptedException | InterruptedIOException e) { + } catch (PacketException | IOException e) { + if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this); + try { + close(); + } catch (IOException ex) { + if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(ex, this); + } + } + } } diff --git a/src/com/captainalm/lib/calmnet/marshal/NetMarshalClientWrapped.java b/src/com/captainalm/lib/calmnet/marshal/NetMarshalClientWrapped.java index b45a084..069481a 100644 --- a/src/com/captainalm/lib/calmnet/marshal/NetMarshalClientWrapped.java +++ b/src/com/captainalm/lib/calmnet/marshal/NetMarshalClientWrapped.java @@ -25,24 +25,26 @@ public class NetMarshalClientWrapped extends NetMarshalClient { /** * Constructs a new NetMarshalClientWrapped with the specified {@link Socket}, {@link IPacketFactory}, - * {@link PacketLoader}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. + * {@link PacketLoader}, {@link FragmentationOptions}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. * Wrapped streams should close the underlying stream when closed. * * @param socketIn The socket to use. * @param factory The packet factory to use. * @param loader The packet loader to use. + * @param fragmentationOptions The fragmentation options, null to disable fragmentation. * @param inputStreamWrapper The input stream wrapper to use (Can be null). * @param outputStreamWrapper The output stream wrapper to use (Can be null). * @throws NullPointerException socketIn, factory or loader is null. + * @throws IllegalArgumentException Fragmentation options failed validation. */ - public NetMarshalClientWrapped(Socket socketIn, IPacketFactory factory, PacketLoader loader, Function inputStreamWrapper, Function outputStreamWrapper) { - super(socketIn, factory, loader); + public NetMarshalClientWrapped(Socket socketIn, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions, Function inputStreamWrapper, Function outputStreamWrapper) { + super(socketIn, factory, loader, fragmentationOptions); setupWrappers(inputStreamWrapper, outputStreamWrapper); } /** * Constructs a new NetMarshalClientWrapped with the specified {@link MulticastSocket}, multicast group {@link InetAddress}, multicast port, {@link IPacketFactory}, - * {@link PacketLoader}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. + * {@link PacketLoader}, {@link FragmentationOptions}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. * The {@link MulticastSocket} will join the multicast group. * Wrapped streams should close the underlying stream when closed. * @@ -51,20 +53,21 @@ public class NetMarshalClientWrapped extends NetMarshalClient { * @param multicastGroupPort The multicast group port. * @param factory The packet factory to use. * @param loader The packet loader to use. + * @param fragmentationOptions The fragmentation options, null to disable fragmentation. * @param inputStreamWrapper The input stream wrapper to use (Can be null). * @param outputStreamWrapper The output stream wrapper to use (Can be null). * @throws IOException There is an error joining or multicastGroupAddress is not a multicast address. * @throws NullPointerException socketIn, multicastGroupAddress, factory or loader is null. - * @throws IllegalArgumentException multicastGroupPort is less than 0 or greater than 65535. + * @throws IllegalArgumentException multicastGroupPort is less than 0 or greater than 65535 or fragmentation options failed validation. */ - public NetMarshalClientWrapped(MulticastSocket socketIn, InetAddress multicastGroupAddress, int multicastGroupPort, IPacketFactory factory, PacketLoader loader, Function inputStreamWrapper, Function outputStreamWrapper) throws IOException { - super(socketIn, multicastGroupAddress, multicastGroupPort, factory, loader); + public NetMarshalClientWrapped(MulticastSocket socketIn, InetAddress multicastGroupAddress, int multicastGroupPort, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions, Function inputStreamWrapper, Function outputStreamWrapper) throws IOException { + super(socketIn, multicastGroupAddress, multicastGroupPort, factory, loader, fragmentationOptions); setupWrappers(inputStreamWrapper, outputStreamWrapper); } /** * Constructs a new NetMarshalClientWrapped with the specified {@link DatagramSocket}, remote {@link InetAddress}, remote port, {@link IPacketFactory}, - * {@link PacketLoader}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. + * {@link PacketLoader}, {@link FragmentationOptions}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. * Wrapped streams should close the underlying stream when closed. * * @param socketIn The datagram socket to use. @@ -73,13 +76,14 @@ public class NetMarshalClientWrapped extends NetMarshalClient { * @param inputStream The receiving input stream. * @param factory The packet factory to use. * @param loader The loader to use. + * @param fragmentationOptions The fragmentation options, null to disable fragmentation. * @param inputStreamWrapper The input stream wrapper to use (Can be null). * @param outputStreamWrapper The output stream wrapper to use (Can be null). * @throws NullPointerException socketIn, remoteAddress, inputStream, factory or loader is null. - * @throws IllegalArgumentException remotePort is less than 0 or greater than 65535. + * @throws IllegalArgumentException remotePort is less than 0 or greater than 65535 or fragmentation options failed validation. */ - public NetMarshalClientWrapped(DatagramSocket socketIn, InetAddress remoteAddress, int remotePort, InputStream inputStream, IPacketFactory factory, PacketLoader loader, Function inputStreamWrapper, Function outputStreamWrapper) { - super(socketIn, remoteAddress, remotePort, inputStream, factory,loader); + public NetMarshalClientWrapped(DatagramSocket socketIn, InetAddress remoteAddress, int remotePort, InputStream inputStream, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions, Function inputStreamWrapper, Function outputStreamWrapper) { + super(socketIn, remoteAddress, remotePort, inputStream, factory, loader, fragmentationOptions); setupWrappers(inputStreamWrapper, outputStreamWrapper); } diff --git a/src/com/captainalm/lib/calmnet/marshal/package-info.java b/src/com/captainalm/lib/calmnet/marshal/package-info.java index f74f0fe..2bace37 100644 --- a/src/com/captainalm/lib/calmnet/marshal/package-info.java +++ b/src/com/captainalm/lib/calmnet/marshal/package-info.java @@ -5,7 +5,6 @@ */ package com.captainalm.lib.calmnet.marshal; /*TODO: -NetMarshalClient(Wrapped) - Fragmentation processing support NetMarshalServer - Has a thread for UDP receiving and has a dictionary of input streams (final, created in constructor) NetMarshalServerWrapped - Constructs NetMarshalClientWrapped instead of NetMarshalClient, stream wrapping support */ \ No newline at end of file diff --git a/src/com/captainalm/lib/calmnet/packet/fragment/FragmentReceiver.java b/src/com/captainalm/lib/calmnet/packet/fragment/FragmentReceiver.java index aba8893..fb1ef0e 100644 --- a/src/com/captainalm/lib/calmnet/packet/fragment/FragmentReceiver.java +++ b/src/com/captainalm/lib/calmnet/packet/fragment/FragmentReceiver.java @@ -56,6 +56,17 @@ public final class FragmentReceiver { } } + /** + * Receives a {@link IPacket} from the FragmentReceiver. + * + * @return The received packet or null. + */ + public IPacket receivePacketPolling() { + synchronized (slockqueue) { + return outputQueue.poll(); + } + } + /** * Sends the current {@link IPacket}s from the FragmentReceiver. * @@ -206,6 +217,18 @@ public final class FragmentReceiver { } } + /** + * Polls the last finished packet ID. + * + * @return The last finished packet ID. + */ + public Integer pollLastIDFinished() { + synchronized (slockfinish) { + Integer polled = finishedIDs.poll(); + return (polled == null) ? -1 : polled; + } + } + /** * Clears all the last finished packet IDs. */ diff --git a/src/com/captainalm/lib/calmnet/packet/fragment/FragmentSender.java b/src/com/captainalm/lib/calmnet/packet/fragment/FragmentSender.java index 7aff8a8..f047bb1 100644 --- a/src/com/captainalm/lib/calmnet/packet/fragment/FragmentSender.java +++ b/src/com/captainalm/lib/calmnet/packet/fragment/FragmentSender.java @@ -179,6 +179,18 @@ public final class FragmentSender { } } + /** + * Polls the last finished packet ID. + * + * @return The last finished packet ID. + */ + public Integer pollLastIDFinished() { + synchronized (slockfinish) { + Integer polled = finishedIDs.poll(); + return (polled == null) ? -1 : polled; + } + } + /** * Clears all the last finished packet IDs. */