diff --git a/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java b/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java index 86c43b2..8827fde 100644 --- a/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java +++ b/src/com/captainalm/lib/calmnet/marshal/NetMarshalClient.java @@ -64,6 +64,7 @@ public class NetMarshalClient implements Closeable { protected final Thread fragmentMonitorThread; protected final Thread fragmentFinishReceiveMonitorThread; protected final Thread fragmentFinishSendMonitorThread; + protected final Thread fragmentSendThread; private NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull, FragmentationOptions fragmentationOptions) { if (isSocketNull) throw new NullPointerException("socketIn is null"); @@ -85,6 +86,7 @@ public class NetMarshalClient implements Closeable { fragmentMonitorThread = null; fragmentFinishReceiveMonitorThread = null; fragmentFinishSendMonitorThread = null; + fragmentSendThread = null; receiveThread = new Thread(() -> { while (running) receiveThreadExecuted(); }, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort); @@ -148,6 +150,28 @@ public class NetMarshalClient implements Closeable { } fragmentSender.clearLastIDFinished(); }, "thread_frag_fin_send_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort); + fragmentSendThread = new Thread(() -> { + while (running) { + try { + synchronized (slocksock) { + slocksock.wait(); + IPacket[] packets = fragmentSender.sendPacket(); + for (IPacket c : packets) if (c != null) { + updateMState(fragmentSMM, c); + loader.writePacket(outputStream, c, true); + } + packets = fragmentReceiver.sendPacket(); + for (IPacket c : packets) if (c != null) { + updateMState(fragmentRMM, c); + loader.writePacket(outputStream, c, true); + } + } + } catch (InterruptedException e) { + } catch (IOException | PacketException e) { + if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this); + } + } + }, "thread_frag_send_" + remoteAddress.getHostAddress() + ":" + remotePort); receiveThread = new Thread(() -> { while (running) receiveThreadExecutedWithFragmentation(); fragmentReceiver.clearWaitingPackets(); @@ -234,19 +258,6 @@ public class NetMarshalClient implements Closeable { } } - protected void sendFragmentData() throws PacketException, IOException { - IPacket[] packets = fragmentSender.sendPacket(); - for (IPacket c : packets) if (c != null) { - updateMState(fragmentSMM, c); - loader.writePacket(outputStream, c, true); - } - packets = fragmentReceiver.sendPacket(); - for (IPacket c : packets) if (c != null) { - updateMState(fragmentRMM, c); - loader.writePacket(outputStream, c, true); - } - } - /** * Opens the marshal. */ @@ -257,6 +268,7 @@ public class NetMarshalClient implements Closeable { fragmentMonitorThread.start(); fragmentFinishReceiveMonitorThread.start(); fragmentFinishSendMonitorThread.start(); + fragmentSendThread.start(); } receiveThread.start(); } @@ -397,15 +409,17 @@ public class NetMarshalClient implements Closeable { * @throws IOException A stream exception has occurred. * @throws PacketException An exception has occurred. * @throws NullPointerException packetIn is null. + * @throws IllegalStateException sendPacket accessed in the receive thread. */ public final void sendPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException { if (packetIn == null) throw new NullPointerException("packetIn is null"); + if (Thread.currentThread() == receiveThread) throw new IllegalStateException("sendPacket accessed in the receive thread"); synchronized (slocksock) { if (fragmentationOptions == null || directSend) { loader.writePacket(outputStream, packetIn, true); } else { fragmentSender.sendPacket(packetIn); - sendFragmentData(); + slocksock.notifyAll(); } } } @@ -414,8 +428,10 @@ public class NetMarshalClient implements Closeable { * Flushes the output streams. * * @throws IOException A stream exception has occurred. + * @throws IllegalStateException flush accessed in the receive thread. */ public final void flush() throws IOException { + if (Thread.currentThread() == receiveThread) throw new IllegalStateException("sendPacket accessed in the receive thread"); synchronized (slocksock) { outputStream.flush(); rootOutputStream.flush(); @@ -542,6 +558,7 @@ public class NetMarshalClient implements Closeable { /** * Gets the {@link BiConsumer} receiver consumer. + * WARNING: {@link #sendPacket(IPacket, boolean)} and {@link #flush()} cannot be called within the consumer. * * @return The receiver consumer or null. */ @@ -551,6 +568,7 @@ public class NetMarshalClient implements Closeable { /** * Sets the {@link BiConsumer} receiver consumer. + * WARNING: {@link #sendPacket(IPacket, boolean)} and {@link #flush()} cannot be called within the consumer. * * @param consumer The new receiver consumer. * @throws NullPointerException consumer is null. @@ -614,6 +632,7 @@ public class NetMarshalClient implements Closeable { fragmentMonitorThread.interrupt(); fragmentFinishReceiveMonitorThread.interrupt(); fragmentFinishSendMonitorThread.interrupt(); + fragmentSendThread.interrupt(); } receivedPackets.clear(); synchronized (slockReceive) { @@ -681,7 +700,7 @@ public class NetMarshalClient implements Closeable { } } synchronized (slocksock) { - sendFragmentData(); + slocksock.notifyAll(); } } else { if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this); diff --git a/src/com/captainalm/lib/calmnet/marshal/NetMarshalServer.java b/src/com/captainalm/lib/calmnet/marshal/NetMarshalServer.java index ff9cab0..5d01821 100644 --- a/src/com/captainalm/lib/calmnet/marshal/NetMarshalServer.java +++ b/src/com/captainalm/lib/calmnet/marshal/NetMarshalServer.java @@ -194,6 +194,7 @@ public class NetMarshalServer implements Closeable { * @throws IOException A stream exception has occurred. * @throws PacketException An exception has occurred. * @throws NullPointerException packetIn is null. + * @throws IllegalStateException sendPacket accessed in the receive thread. */ public final void broadcastPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException { if (packetIn == null) throw new NullPointerException("packetIn is null"); @@ -207,6 +208,7 @@ public class NetMarshalServer implements Closeable { * Flushes all the output streams on all the clients. * * @throws IOException A stream exception has occurred. + * @throws IllegalStateException flush accessed in the receive thread. */ public final void flush() throws IOException { synchronized (slocksock) { @@ -226,6 +228,7 @@ public class NetMarshalServer implements Closeable { /** * Gets the {@link BiConsumer} receiver consumer. + * WARNING: {@link #broadcastPacket(IPacket, boolean)} and {@link #flush()} cannot be called within the consumer. * * @return The receiver consumer or null. */ @@ -235,6 +238,7 @@ public class NetMarshalServer implements Closeable { /** * Sets the {@link BiConsumer} receiver consumer. + * WARNING: {@link #broadcastPacket(IPacket, boolean)} and {@link #flush()} cannot be called within the consumer. * * @param consumer The new receiver consumer. * @throws NullPointerException consumer is null.