Add fragmentation support to NetMarshalClient, fix up NetMarshalClient.

This commit is contained in:
Captain ALM 2023-05-19 20:17:56 +01:00
parent b49dfa0938
commit 41ed32f3af
Signed by: alfred
GPG Key ID: 4E4ADD02609997B1
6 changed files with 368 additions and 35 deletions

View File

@ -0,0 +1,74 @@
package com.captainalm.lib.calmnet.marshal;
import com.captainalm.lib.calmnet.packet.fragment.FragmentReceiver;
import com.captainalm.lib.calmnet.packet.fragment.FragmentSender;
/**
* This class provides fragmentation options for using {@link FragmentSender}s and
* {@link FragmentReceiver}s in this package.
*
* @author Captain ALM
*/
public final class FragmentationOptions {
/**
* The maximum age of fragments for a specified packet in seconds before those fragments are purged.
*/
public int maximumFragmentAge = 30;
/**
* See:
* {@link FragmentSender#setSplitSize(int)}
*/
public int fragmentationSplitSize = 496;
/**
* See:
* {@link FragmentReceiver#setNumberOfEmptySendsTillForcedCompleteOrResend(int)}
*/
public int emptySendsTillForced = 2;
/**
* See:
* {@link FragmentSender#setResponseVerification(boolean)} ,
* {@link FragmentReceiver#setResponseVerification(boolean)}
*/
public boolean verifyFragments = false;
/**
* See:
* {@link FragmentSender#setSentDataWillBeAllVerified(boolean)} ,
* {@link FragmentReceiver#setSentDataWillBeAllVerified(boolean)}
*/
public boolean equalityVerifyFragments = false;
/**
* Validates the parameters within this structure.
*
* @throws IllegalArgumentException maximumFragmentAge is less than 2, fragmentationSplitSize is less than 1 or emptySendsTillForced is less than 1.
*/
public void validate() {
if (maximumFragmentAge < 2) throw new IllegalArgumentException("maximumFragmentAge is less than 2");
if (fragmentationSplitSize < 1) throw new IllegalArgumentException("fragmentationSplitSize is less than 1");
if (emptySendsTillForced < 1) throw new IllegalArgumentException("emptySendsTillForced is less than 1");
}
/**
* Sets-up the provided {@link FragmentSender} with parameters.
*
* @param sender The sender to set up.
* @throws IllegalArgumentException A parameter is incorrect.
*/
public void setupSender(FragmentSender sender) {
sender.setSplitSize(fragmentationSplitSize);
sender.setResponseVerification(verifyFragments);
sender.setSentDataWillBeAllVerified(equalityVerifyFragments);
}
/**
* Sets-up the provided {@link FragmentReceiver} with parameters.
*
* @param receiver The receiver to set up.
* @throws IllegalArgumentException A parameter is incorrect.
*/
public void setupReceiver(FragmentReceiver receiver) {
receiver.setNumberOfEmptySendsTillForcedCompleteOrResend(emptySendsTillForced);
receiver.setResponseVerification(verifyFragments);
receiver.setSentDataWillBeAllVerified(equalityVerifyFragments);
}
}

View File

@ -4,6 +4,7 @@ import com.captainalm.lib.calmnet.packet.IPacket;
import com.captainalm.lib.calmnet.packet.PacketException; import com.captainalm.lib.calmnet.packet.PacketException;
import com.captainalm.lib.calmnet.packet.PacketLoader; import com.captainalm.lib.calmnet.packet.PacketLoader;
import com.captainalm.lib.calmnet.packet.factory.IPacketFactory; import com.captainalm.lib.calmnet.packet.factory.IPacketFactory;
import com.captainalm.lib.calmnet.packet.fragment.*;
import com.captainalm.lib.calmnet.ssl.SSLUtilities; import com.captainalm.lib.calmnet.ssl.SSLUtilities;
import com.captainalm.lib.calmnet.ssl.SSLUtilityException; import com.captainalm.lib.calmnet.ssl.SSLUtilityException;
import com.captainalm.lib.calmnet.stream.NetworkInputStream; import com.captainalm.lib.calmnet.stream.NetworkInputStream;
@ -16,9 +17,12 @@ import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.MulticastSocket; import java.net.MulticastSocket;
import java.net.Socket; import java.net.Socket;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
/** /**
* This class provides a managed way of networking on the client side. * This class provides a managed way of networking on the client side.
@ -27,7 +31,7 @@ import java.util.function.BiConsumer;
* @author Captain ALM * @author Captain ALM
*/ */
public class NetMarshalClient implements Closeable { public class NetMarshalClient implements Closeable {
protected boolean running = true; protected boolean running;
protected Socket socket; protected Socket socket;
protected DatagramSocket dsocket; protected DatagramSocket dsocket;
@ -37,6 +41,7 @@ public class NetMarshalClient implements Closeable {
protected OutputStream rootOutputStream; protected OutputStream rootOutputStream;
protected BiConsumer<IPacket, NetMarshalClient> receiveBiConsumer; protected BiConsumer<IPacket, NetMarshalClient> receiveBiConsumer;
protected BiConsumer<Exception, NetMarshalClient> receiveExceptionBiConsumer; protected BiConsumer<Exception, NetMarshalClient> receiveExceptionBiConsumer;
protected Consumer<NetMarshalClient> closedConsumer;
protected final Object slockPacketRead = new Object(); protected final Object slockPacketRead = new Object();
protected boolean disablePacketReading; protected boolean disablePacketReading;
@ -50,7 +55,16 @@ public class NetMarshalClient implements Closeable {
protected final Queue<IPacket> receivedPackets = new LinkedList<>(); protected final Queue<IPacket> receivedPackets = new LinkedList<>();
protected final Object slockReceive = new Object(); protected final Object slockReceive = new Object();
private NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull) { protected final FragmentationOptions fragmentationOptions;
protected final FragmentReceiver fragmentReceiver;
protected final HashMap<Integer, LocalDateTime> fragmentRMM;
protected final FragmentSender fragmentSender;
protected final HashMap<Integer, LocalDateTime> fragmentSMM;
protected final Thread fragmentMonitorThread;
protected final Thread fragmentFinishReceiveMonitorThread;
protected final Thread fragmentFinishSendMonitorThread;
private NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull, FragmentationOptions fragmentationOptions) {
if (isSocketNull) throw new NullPointerException("socketIn is null"); if (isSocketNull) throw new NullPointerException("socketIn is null");
if (remoteAddress == null) throw new NullPointerException(((isMulticast) ? "multicastGroupAddress" : "remoteAddress") + " is null"); if (remoteAddress == null) throw new NullPointerException(((isMulticast) ? "multicastGroupAddress" : "remoteAddress") + " is null");
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
@ -61,27 +75,102 @@ public class NetMarshalClient implements Closeable {
this.factory = factory; this.factory = factory;
if (loader == null) throw new NullPointerException("loader is null"); if (loader == null) throw new NullPointerException("loader is null");
this.loader = loader; this.loader = loader;
receiveThread = new Thread(() -> { this.fragmentationOptions = fragmentationOptions;
while (running) receiveThreadExecuted(); if (fragmentationOptions == null) {
}, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort); fragmentReceiver = null;
fragmentRMM = null;
fragmentSender = null;
fragmentSMM = null;
fragmentMonitorThread = null;
fragmentFinishReceiveMonitorThread = null;
fragmentFinishSendMonitorThread = null;
receiveThread = new Thread(() -> {
while (running) receiveThreadExecuted();
}, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort);
} else {
fragmentationOptions.validate();
fragmentReceiver = new FragmentReceiver(loader, factory);
fragmentationOptions.setupReceiver(fragmentReceiver);
fragmentRMM = new HashMap<>();
fragmentSender = new FragmentSender(loader);
fragmentationOptions.setupSender(fragmentSender);
fragmentSMM = new HashMap<>();
fragmentMonitorThread = new Thread(() -> {
int ageCheckTime = this.fragmentationOptions.maximumFragmentAge - 1;
while (running) {
int id = -1;
synchronized (this.fragmentationOptions) {
for (int c : fragmentRMM.keySet()) {
if (!fragmentRMM.get(c).plusSeconds(ageCheckTime).isAfter(LocalDateTime.now())) {
fragmentRMM.remove(id);
fragmentReceiver.deletePacketFromRegistry(c);
}
}
for (int c : fragmentSMM.keySet()) {
if (!fragmentSMM.get(c).plusSeconds(ageCheckTime).isAfter(LocalDateTime.now())) {
fragmentSMM.remove(id);
fragmentSender.deletePacketFromRegistry(c);
}
}
}
try {
Thread.sleep(this.fragmentationOptions.maximumFragmentAge);
} catch (InterruptedException e) {
}
}
fragmentReceiver.clearRegistry();
fragmentSender.clearRegistry();
}, "thread_frag_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort);
fragmentFinishReceiveMonitorThread = new Thread(() -> {
while (running) {
int id = -1;
try {
while ((id = fragmentReceiver.getLastIDFinished()) != -1) synchronized (this.fragmentationOptions) {
fragmentRMM.remove(id);
}
} catch (InterruptedException e) {
}
}
fragmentReceiver.clearLastIDFinished();
}, "thread_frag_fin_recv_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort);
fragmentFinishSendMonitorThread = new Thread(() -> {
while (running) {
int id = -1;
try {
while ((id = fragmentSender.getLastIDFinished()) != -1) synchronized (this.fragmentationOptions) {
fragmentSMM.remove(id);
}
} catch (InterruptedException e) {
}
}
fragmentSender.clearLastIDFinished();
}, "thread_frag_fin_recv_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort);
receiveThread = new Thread(() -> {
while (running) receiveThreadExecutedWithFragmentation();
fragmentReceiver.clearWaitingPackets();
fragmentSender.clearWaitingPackets();
}, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort);
}
} }
/** /**
* Constructs a new NetMarshalClient with the specified {@link Socket}, {@link IPacketFactory} and {@link PacketLoader}. * Constructs a new NetMarshalClient with the specified {@link Socket}, {@link IPacketFactory}, {@link PacketLoader} and {@link FragmentationOptions}.
* *
* @param socketIn The socket to use. * @param socketIn The socket to use.
* @param factory The packet factory to use. * @param factory The packet factory to use.
* @param loader The packet loader to use. * @param loader The packet loader to use.
* @param fragmentationOptions The fragmentation options, null to disable fragmentation.
* @throws NullPointerException socketIn, factory or loader is null. * @throws NullPointerException socketIn, factory or loader is null.
* @throws IllegalArgumentException Fragmentation options failed validation.
*/ */
public NetMarshalClient(Socket socketIn, IPacketFactory factory, PacketLoader loader) { public NetMarshalClient(Socket socketIn, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions) {
this((socketIn == null) ? null : socketIn.getInetAddress(), (socketIn == null) ? -1 : socketIn.getPort(), factory, loader, false, socketIn == null); this((socketIn == null) ? null : socketIn.getInetAddress(), (socketIn == null) ? -1 : socketIn.getPort(), factory, loader, false, socketIn == null, fragmentationOptions);
socket = socketIn; socket = socketIn;
setStreams(new NetworkInputStream(socketIn), new NetworkOutputStream(socketIn)); setStreams(new NetworkInputStream(socketIn), new NetworkOutputStream(socketIn));
} }
/** /**
* Constructs a new NetMarshalClient with the specified {@link MulticastSocket}, multicast group {@link InetAddress}, multicast port, {@link IPacketFactory} and {@link PacketLoader}. * Constructs a new NetMarshalClient with the specified {@link MulticastSocket}, multicast group {@link InetAddress}, multicast port, {@link IPacketFactory}, {@link PacketLoader} and {@link FragmentationOptions}.
* The {@link MulticastSocket} will join the multicast group. * The {@link MulticastSocket} will join the multicast group.
* *
* @param socketIn The multicast socket to use. * @param socketIn The multicast socket to use.
@ -89,12 +178,13 @@ public class NetMarshalClient implements Closeable {
* @param multicastGroupPort The multicast group port. * @param multicastGroupPort The multicast group port.
* @param factory The packet factory to use. * @param factory The packet factory to use.
* @param loader The packet loader to use. * @param loader The packet loader to use.
* @param fragmentationOptions The fragmentation options, null to disable fragmentation.
* @throws IOException There is an error joining or multicastGroupAddress is not a multicast address. * @throws IOException There is an error joining or multicastGroupAddress is not a multicast address.
* @throws NullPointerException socketIn, multicastGroupAddress, factory or loader is null. * @throws NullPointerException socketIn, multicastGroupAddress, factory or loader is null.
* @throws IllegalArgumentException multicastGroupPort is less than 0 or greater than 65535. * @throws IllegalArgumentException multicastGroupPort is less than 0 or greater than 65535 or fragmentation options failed validation.
*/ */
public NetMarshalClient(MulticastSocket socketIn, InetAddress multicastGroupAddress, int multicastGroupPort, IPacketFactory factory, PacketLoader loader) throws IOException { public NetMarshalClient(MulticastSocket socketIn, InetAddress multicastGroupAddress, int multicastGroupPort, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions) throws IOException {
this(multicastGroupAddress, multicastGroupPort, factory, loader, true, socketIn == null); this(multicastGroupAddress, multicastGroupPort, factory, loader, true, socketIn == null, fragmentationOptions);
socketIn.joinGroup(multicastGroupAddress); socketIn.joinGroup(multicastGroupAddress);
NetworkOutputStream netOut = new NetworkOutputStream(socketIn, 65535); NetworkOutputStream netOut = new NetworkOutputStream(socketIn, 65535);
netOut.setDatagramTarget(multicastGroupAddress, multicastGroupPort); netOut.setDatagramTarget(multicastGroupAddress, multicastGroupPort);
@ -102,7 +192,7 @@ public class NetMarshalClient implements Closeable {
} }
/** /**
* Constructs a new NetMarshalClient with the specified {@link DatagramSocket}, remote {@link InetAddress}, remote port, {@link IPacketFactory} and {@link PacketLoader}. * Constructs a new NetMarshalClient with the specified {@link DatagramSocket}, remote {@link InetAddress}, remote port, {@link IPacketFactory}, {@link PacketLoader} and {@link FragmentationOptions}.
* *
* @param socketIn The datagram socket to use. * @param socketIn The datagram socket to use.
* @param remoteAddress The remote address to send data to. * @param remoteAddress The remote address to send data to.
@ -110,13 +200,14 @@ public class NetMarshalClient implements Closeable {
* @param inputStream The receiving input stream. * @param inputStream The receiving input stream.
* @param factory The packet factory to use. * @param factory The packet factory to use.
* @param loader The loader to use. * @param loader The loader to use.
* @param fragmentationOptions The fragmentation options, null to disable fragmentation.
* @throws NullPointerException socketIn, remoteAddress, inputStream, factory or loader is null. * @throws NullPointerException socketIn, remoteAddress, inputStream, factory or loader is null.
* @throws IllegalArgumentException remotePort is less than 0 or greater than 65535. * @throws IllegalArgumentException remotePort is less than 0 or greater than 65535 or fragmentation options failed validation.
*/ */
public NetMarshalClient(DatagramSocket socketIn, InetAddress remoteAddress, int remotePort, InputStream inputStream, IPacketFactory factory, PacketLoader loader) { public NetMarshalClient(DatagramSocket socketIn, InetAddress remoteAddress, int remotePort, InputStream inputStream, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions) {
this(remoteAddress, remotePort, factory, loader, false, socketIn == null); this(remoteAddress, remotePort, factory, loader, false, socketIn == null, fragmentationOptions);
if (inputStream == null) throw new NullPointerException("inputStream is null"); if (inputStream == null) throw new NullPointerException("inputStream is null");
setStreams(null, new NetworkOutputStream(socketIn, 65535)); setStreams(null, new NetworkOutputStream(socketIn, 65535, remoteAddress, remotePort));
rootInputStream = inputStream; rootInputStream = inputStream;
this.inputStream = inputStream; this.inputStream = inputStream;
} }
@ -128,6 +219,45 @@ public class NetMarshalClient implements Closeable {
this.outputStream = rootOutputStream; this.outputStream = rootOutputStream;
} }
protected void updateMState(HashMap<Integer, LocalDateTime> mm, IPacket packet) {
if (packet == null || !packet.isValid()) return;
synchronized (fragmentationOptions) {
if (packet instanceof FragmentAllocationPacket) {
mm.put(((FragmentAllocationPacket) packet).getPacketID(), LocalDateTime.now());
} else if (packet instanceof FragmentPIDPacket && !(packet instanceof FragmentSendStopPacket)) {
if (mm.containsKey(((FragmentPIDPacket) packet).getPacketID()))
mm.put(((FragmentPIDPacket) packet).getPacketID(), LocalDateTime.now());
}
}
}
protected void sendFragmentData() throws PacketException, IOException {
IPacket[] packets = fragmentSender.sendPacket();
for (IPacket c : packets) if (c != null) {
updateMState(fragmentSMM, c);
loader.writePacket(outputStream, c, true);
}
packets = fragmentReceiver.sendPacket();
for (IPacket c : packets) if (c != null) {
updateMState(fragmentRMM, c);
loader.writePacket(outputStream, c, true);
}
}
/**
* Opens the marshal.
*/
public synchronized final void open() {
if (running) return;
running = true;
if (fragmentationOptions != null) {
fragmentMonitorThread.start();
fragmentFinishReceiveMonitorThread.start();
fragmentFinishSendMonitorThread.start();
}
receiveThread.start();
}
/** /**
* Get the input stream. * Get the input stream.
* *
@ -209,6 +339,15 @@ public class NetMarshalClient implements Closeable {
return running; return running;
} }
/**
* Gets the {@link FragmentationOptions} of the client.
*
* @return The fragmentation options or null if fragmentation is disabled.
*/
public FragmentationOptions getFragmentationOptions() {
return fragmentationOptions;
}
/** /**
* Gets if the marshal is ssl upgraded. * Gets if the marshal is ssl upgraded.
* *
@ -223,14 +362,20 @@ public class NetMarshalClient implements Closeable {
* Sends a {@link IPacket}. * Sends a {@link IPacket}.
* *
* @param packetIn The packet to send. * @param packetIn The packet to send.
* @param directSend Whether the packet should be sent directly or through the fragmentation system.
* @throws IOException A stream exception has occurred. * @throws IOException A stream exception has occurred.
* @throws PacketException An exception has occurred. * @throws PacketException An exception has occurred.
* @throws NullPointerException packetIn is null. * @throws NullPointerException packetIn is null.
*/ */
public synchronized final void sendPacket(IPacket packetIn) throws IOException, PacketException { public synchronized final void sendPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException {
if (packetIn == null) throw new NullPointerException("packetIn is null"); if (packetIn == null) throw new NullPointerException("packetIn is null");
synchronized ((socket == null) ? dsocket : socket) { synchronized ((socket == null) ? dsocket : socket) {
loader.writePacket(outputStream, packetIn, true); if (fragmentationOptions == null || directSend) {
loader.writePacket(outputStream, packetIn, true);
} else {
fragmentSender.sendPacket(packetIn);
}
if (fragmentationOptions != null) sendFragmentData();
} }
} }
@ -381,6 +526,26 @@ public class NetMarshalClient implements Closeable {
receiveExceptionBiConsumer = consumer; receiveExceptionBiConsumer = consumer;
} }
/**
* Gets the {@link Consumer} closed consumer.
*
* @return The closed or null.
*/
public Consumer<NetMarshalClient> getClosedConsumer() {
return closedConsumer;
}
/**
* Sets the {@link Consumer} closed consumer.
*
* @param consumer The new closed consumer.
* @throws NullPointerException consumer is null.
*/
public void setClosedConsumer(Consumer<NetMarshalClient> consumer) {
if (consumer == null) throw new NullPointerException("consumer is null");
closedConsumer = consumer;
}
/** /**
* Closes the marshal, closing all its streams. * Closes the marshal, closing all its streams.
* *
@ -391,11 +556,23 @@ public class NetMarshalClient implements Closeable {
if (running) { if (running) {
running = false; running = false;
if (Thread.currentThread() != receiveThread) receiveThread.interrupt(); if (Thread.currentThread() != receiveThread) receiveThread.interrupt();
if (fragmentationOptions != null) {
fragmentMonitorThread.interrupt();
fragmentFinishReceiveMonitorThread.interrupt();
fragmentFinishSendMonitorThread.interrupt();
}
receivedPackets.clear(); receivedPackets.clear();
inputStream.close(); try {
outputStream.close(); inputStream.close();
socket = null; } finally {
dsocket = null; try {
outputStream.close();
} finally {
socket = null;
dsocket = null;
if (closedConsumer != null) closedConsumer.accept(this);
}
}
} }
} }
@ -423,4 +600,48 @@ public class NetMarshalClient implements Closeable {
} }
} }
} }
protected void receiveThreadExecutedWithFragmentation() {
try {
synchronized (slockPacketRead) {
while (disablePacketReading) slockPacketRead.wait();
}
IPacket packet = loader.readStreamedPacket(inputStream, factory, null);
synchronized (slockPacketRead) {
if (packet == null || !packet.isValid()) return;
updateMState(fragmentRMM, packet);
boolean packetUsed = fragmentReceiver.receivePacket(packet);
updateMState(fragmentSMM, packet);
packetUsed = fragmentSender.receivePacket(packet) || packetUsed;
if (packetUsed) {
while (fragmentReceiver.arePacketsWaiting()) {
packet = fragmentReceiver.receivePacketPolling();
if (packet == null || !packet.isValid()) continue;
if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this);
synchronized (slockReceive) {
receivedPackets.add(packet);
slockReceive.notify();
}
}
synchronized ((socket == null) ? dsocket : socket) {
sendFragmentData();
}
} else {
if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this);
synchronized (slockReceive) {
receivedPackets.add(packet);
slockReceive.notify();
}
}
}
} catch (InterruptedException | InterruptedIOException e) {
} catch (PacketException | IOException e) {
if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this);
try {
close();
} catch (IOException ex) {
if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(ex, this);
}
}
}
} }

View File

@ -25,24 +25,26 @@ public class NetMarshalClientWrapped extends NetMarshalClient {
/** /**
* Constructs a new NetMarshalClientWrapped with the specified {@link Socket}, {@link IPacketFactory}, * Constructs a new NetMarshalClientWrapped with the specified {@link Socket}, {@link IPacketFactory},
* {@link PacketLoader}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. * {@link PacketLoader}, {@link FragmentationOptions}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream.
* Wrapped streams should close the underlying stream when closed. * Wrapped streams should close the underlying stream when closed.
* *
* @param socketIn The socket to use. * @param socketIn The socket to use.
* @param factory The packet factory to use. * @param factory The packet factory to use.
* @param loader The packet loader to use. * @param loader The packet loader to use.
* @param fragmentationOptions The fragmentation options, null to disable fragmentation.
* @param inputStreamWrapper The input stream wrapper to use (Can be null). * @param inputStreamWrapper The input stream wrapper to use (Can be null).
* @param outputStreamWrapper The output stream wrapper to use (Can be null). * @param outputStreamWrapper The output stream wrapper to use (Can be null).
* @throws NullPointerException socketIn, factory or loader is null. * @throws NullPointerException socketIn, factory or loader is null.
* @throws IllegalArgumentException Fragmentation options failed validation.
*/ */
public NetMarshalClientWrapped(Socket socketIn, IPacketFactory factory, PacketLoader loader, Function<InputStream, InputStream> inputStreamWrapper, Function<OutputStream, OutputStream> outputStreamWrapper) { public NetMarshalClientWrapped(Socket socketIn, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions, Function<InputStream, InputStream> inputStreamWrapper, Function<OutputStream, OutputStream> outputStreamWrapper) {
super(socketIn, factory, loader); super(socketIn, factory, loader, fragmentationOptions);
setupWrappers(inputStreamWrapper, outputStreamWrapper); setupWrappers(inputStreamWrapper, outputStreamWrapper);
} }
/** /**
* Constructs a new NetMarshalClientWrapped with the specified {@link MulticastSocket}, multicast group {@link InetAddress}, multicast port, {@link IPacketFactory}, * Constructs a new NetMarshalClientWrapped with the specified {@link MulticastSocket}, multicast group {@link InetAddress}, multicast port, {@link IPacketFactory},
* {@link PacketLoader}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. * {@link PacketLoader}, {@link FragmentationOptions}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream.
* The {@link MulticastSocket} will join the multicast group. * The {@link MulticastSocket} will join the multicast group.
* Wrapped streams should close the underlying stream when closed. * Wrapped streams should close the underlying stream when closed.
* *
@ -51,20 +53,21 @@ public class NetMarshalClientWrapped extends NetMarshalClient {
* @param multicastGroupPort The multicast group port. * @param multicastGroupPort The multicast group port.
* @param factory The packet factory to use. * @param factory The packet factory to use.
* @param loader The packet loader to use. * @param loader The packet loader to use.
* @param fragmentationOptions The fragmentation options, null to disable fragmentation.
* @param inputStreamWrapper The input stream wrapper to use (Can be null). * @param inputStreamWrapper The input stream wrapper to use (Can be null).
* @param outputStreamWrapper The output stream wrapper to use (Can be null). * @param outputStreamWrapper The output stream wrapper to use (Can be null).
* @throws IOException There is an error joining or multicastGroupAddress is not a multicast address. * @throws IOException There is an error joining or multicastGroupAddress is not a multicast address.
* @throws NullPointerException socketIn, multicastGroupAddress, factory or loader is null. * @throws NullPointerException socketIn, multicastGroupAddress, factory or loader is null.
* @throws IllegalArgumentException multicastGroupPort is less than 0 or greater than 65535. * @throws IllegalArgumentException multicastGroupPort is less than 0 or greater than 65535 or fragmentation options failed validation.
*/ */
public NetMarshalClientWrapped(MulticastSocket socketIn, InetAddress multicastGroupAddress, int multicastGroupPort, IPacketFactory factory, PacketLoader loader, Function<InputStream, InputStream> inputStreamWrapper, Function<OutputStream, OutputStream> outputStreamWrapper) throws IOException { public NetMarshalClientWrapped(MulticastSocket socketIn, InetAddress multicastGroupAddress, int multicastGroupPort, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions, Function<InputStream, InputStream> inputStreamWrapper, Function<OutputStream, OutputStream> outputStreamWrapper) throws IOException {
super(socketIn, multicastGroupAddress, multicastGroupPort, factory, loader); super(socketIn, multicastGroupAddress, multicastGroupPort, factory, loader, fragmentationOptions);
setupWrappers(inputStreamWrapper, outputStreamWrapper); setupWrappers(inputStreamWrapper, outputStreamWrapper);
} }
/** /**
* Constructs a new NetMarshalClientWrapped with the specified {@link DatagramSocket}, remote {@link InetAddress}, remote port, {@link IPacketFactory}, * Constructs a new NetMarshalClientWrapped with the specified {@link DatagramSocket}, remote {@link InetAddress}, remote port, {@link IPacketFactory},
* {@link PacketLoader}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream. * {@link PacketLoader}, {@link FragmentationOptions}, {@link Function} for wrapping the input stream and the {@link Function} for wrapping the output stream.
* Wrapped streams should close the underlying stream when closed. * Wrapped streams should close the underlying stream when closed.
* *
* @param socketIn The datagram socket to use. * @param socketIn The datagram socket to use.
@ -73,13 +76,14 @@ public class NetMarshalClientWrapped extends NetMarshalClient {
* @param inputStream The receiving input stream. * @param inputStream The receiving input stream.
* @param factory The packet factory to use. * @param factory The packet factory to use.
* @param loader The loader to use. * @param loader The loader to use.
* @param fragmentationOptions The fragmentation options, null to disable fragmentation.
* @param inputStreamWrapper The input stream wrapper to use (Can be null). * @param inputStreamWrapper The input stream wrapper to use (Can be null).
* @param outputStreamWrapper The output stream wrapper to use (Can be null). * @param outputStreamWrapper The output stream wrapper to use (Can be null).
* @throws NullPointerException socketIn, remoteAddress, inputStream, factory or loader is null. * @throws NullPointerException socketIn, remoteAddress, inputStream, factory or loader is null.
* @throws IllegalArgumentException remotePort is less than 0 or greater than 65535. * @throws IllegalArgumentException remotePort is less than 0 or greater than 65535 or fragmentation options failed validation.
*/ */
public NetMarshalClientWrapped(DatagramSocket socketIn, InetAddress remoteAddress, int remotePort, InputStream inputStream, IPacketFactory factory, PacketLoader loader, Function<InputStream, InputStream> inputStreamWrapper, Function<OutputStream, OutputStream> outputStreamWrapper) { public NetMarshalClientWrapped(DatagramSocket socketIn, InetAddress remoteAddress, int remotePort, InputStream inputStream, IPacketFactory factory, PacketLoader loader, FragmentationOptions fragmentationOptions, Function<InputStream, InputStream> inputStreamWrapper, Function<OutputStream, OutputStream> outputStreamWrapper) {
super(socketIn, remoteAddress, remotePort, inputStream, factory,loader); super(socketIn, remoteAddress, remotePort, inputStream, factory, loader, fragmentationOptions);
setupWrappers(inputStreamWrapper, outputStreamWrapper); setupWrappers(inputStreamWrapper, outputStreamWrapper);
} }

View File

@ -5,7 +5,6 @@
*/ */
package com.captainalm.lib.calmnet.marshal; package com.captainalm.lib.calmnet.marshal;
/*TODO: /*TODO:
NetMarshalClient(Wrapped) - Fragmentation processing support
NetMarshalServer - Has a thread for UDP receiving and has a dictionary of input streams (final, created in constructor) NetMarshalServer - Has a thread for UDP receiving and has a dictionary of input streams (final, created in constructor)
NetMarshalServerWrapped - Constructs NetMarshalClientWrapped instead of NetMarshalClient, stream wrapping support NetMarshalServerWrapped - Constructs NetMarshalClientWrapped instead of NetMarshalClient, stream wrapping support
*/ */

View File

@ -56,6 +56,17 @@ public final class FragmentReceiver {
} }
} }
/**
* Receives a {@link IPacket} from the FragmentReceiver.
*
* @return The received packet or null.
*/
public IPacket receivePacketPolling() {
synchronized (slockqueue) {
return outputQueue.poll();
}
}
/** /**
* Sends the current {@link IPacket}s from the FragmentReceiver. * Sends the current {@link IPacket}s from the FragmentReceiver.
* *
@ -206,6 +217,18 @@ public final class FragmentReceiver {
} }
} }
/**
* Polls the last finished packet ID.
*
* @return The last finished packet ID.
*/
public Integer pollLastIDFinished() {
synchronized (slockfinish) {
Integer polled = finishedIDs.poll();
return (polled == null) ? -1 : polled;
}
}
/** /**
* Clears all the last finished packet IDs. * Clears all the last finished packet IDs.
*/ */

View File

@ -179,6 +179,18 @@ public final class FragmentSender {
} }
} }
/**
* Polls the last finished packet ID.
*
* @return The last finished packet ID.
*/
public Integer pollLastIDFinished() {
synchronized (slockfinish) {
Integer polled = finishedIDs.poll();
return (polled == null) ? -1 : polled;
}
}
/** /**
* Clears all the last finished packet IDs. * Clears all the last finished packet IDs.
*/ */