Fix read / write full buffer deadlock in NetMarshalClient.

Deny sending within the receive thread.
This commit is contained in:
Captain ALM 2023-05-21 01:04:55 +01:00
parent 746148c1f3
commit 100407dfc5
Signed by: alfred
GPG Key ID: 4E4ADD02609997B1
2 changed files with 38 additions and 15 deletions

View File

@ -64,6 +64,7 @@ public class NetMarshalClient implements Closeable {
protected final Thread fragmentMonitorThread; protected final Thread fragmentMonitorThread;
protected final Thread fragmentFinishReceiveMonitorThread; protected final Thread fragmentFinishReceiveMonitorThread;
protected final Thread fragmentFinishSendMonitorThread; protected final Thread fragmentFinishSendMonitorThread;
protected final Thread fragmentSendThread;
private NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull, FragmentationOptions fragmentationOptions) { private NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull, FragmentationOptions fragmentationOptions) {
if (isSocketNull) throw new NullPointerException("socketIn is null"); if (isSocketNull) throw new NullPointerException("socketIn is null");
@ -85,6 +86,7 @@ public class NetMarshalClient implements Closeable {
fragmentMonitorThread = null; fragmentMonitorThread = null;
fragmentFinishReceiveMonitorThread = null; fragmentFinishReceiveMonitorThread = null;
fragmentFinishSendMonitorThread = null; fragmentFinishSendMonitorThread = null;
fragmentSendThread = null;
receiveThread = new Thread(() -> { receiveThread = new Thread(() -> {
while (running) receiveThreadExecuted(); while (running) receiveThreadExecuted();
}, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort); }, "thread_receive_" + remoteAddress.getHostAddress() + ":" + remotePort);
@ -148,6 +150,28 @@ public class NetMarshalClient implements Closeable {
} }
fragmentSender.clearLastIDFinished(); fragmentSender.clearLastIDFinished();
}, "thread_frag_fin_send_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort); }, "thread_frag_fin_send_monitor_" + remoteAddress.getHostAddress() + ":" + remotePort);
fragmentSendThread = new Thread(() -> {
while (running) {
try {
synchronized (slocksock) {
slocksock.wait();
IPacket[] packets = fragmentSender.sendPacket();
for (IPacket c : packets) if (c != null) {
updateMState(fragmentSMM, c);
loader.writePacket(outputStream, c, true);
}
packets = fragmentReceiver.sendPacket();
for (IPacket c : packets) if (c != null) {
updateMState(fragmentRMM, c);
loader.writePacket(outputStream, c, true);
}
}
} catch (InterruptedException e) {
} catch (IOException | PacketException e) {
if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this);
}
}
}, "thread_frag_send_" + remoteAddress.getHostAddress() + ":" + remotePort);
receiveThread = new Thread(() -> { receiveThread = new Thread(() -> {
while (running) receiveThreadExecutedWithFragmentation(); while (running) receiveThreadExecutedWithFragmentation();
fragmentReceiver.clearWaitingPackets(); fragmentReceiver.clearWaitingPackets();
@ -234,19 +258,6 @@ public class NetMarshalClient implements Closeable {
} }
} }
protected void sendFragmentData() throws PacketException, IOException {
IPacket[] packets = fragmentSender.sendPacket();
for (IPacket c : packets) if (c != null) {
updateMState(fragmentSMM, c);
loader.writePacket(outputStream, c, true);
}
packets = fragmentReceiver.sendPacket();
for (IPacket c : packets) if (c != null) {
updateMState(fragmentRMM, c);
loader.writePacket(outputStream, c, true);
}
}
/** /**
* Opens the marshal. * Opens the marshal.
*/ */
@ -257,6 +268,7 @@ public class NetMarshalClient implements Closeable {
fragmentMonitorThread.start(); fragmentMonitorThread.start();
fragmentFinishReceiveMonitorThread.start(); fragmentFinishReceiveMonitorThread.start();
fragmentFinishSendMonitorThread.start(); fragmentFinishSendMonitorThread.start();
fragmentSendThread.start();
} }
receiveThread.start(); receiveThread.start();
} }
@ -397,15 +409,17 @@ public class NetMarshalClient implements Closeable {
* @throws IOException A stream exception has occurred. * @throws IOException A stream exception has occurred.
* @throws PacketException An exception has occurred. * @throws PacketException An exception has occurred.
* @throws NullPointerException packetIn is null. * @throws NullPointerException packetIn is null.
* @throws IllegalStateException sendPacket accessed in the receive thread.
*/ */
public final void sendPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException { public final void sendPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException {
if (packetIn == null) throw new NullPointerException("packetIn is null"); if (packetIn == null) throw new NullPointerException("packetIn is null");
if (Thread.currentThread() == receiveThread) throw new IllegalStateException("sendPacket accessed in the receive thread");
synchronized (slocksock) { synchronized (slocksock) {
if (fragmentationOptions == null || directSend) { if (fragmentationOptions == null || directSend) {
loader.writePacket(outputStream, packetIn, true); loader.writePacket(outputStream, packetIn, true);
} else { } else {
fragmentSender.sendPacket(packetIn); fragmentSender.sendPacket(packetIn);
sendFragmentData(); slocksock.notifyAll();
} }
} }
} }
@ -414,8 +428,10 @@ public class NetMarshalClient implements Closeable {
* Flushes the output streams. * Flushes the output streams.
* *
* @throws IOException A stream exception has occurred. * @throws IOException A stream exception has occurred.
* @throws IllegalStateException flush accessed in the receive thread.
*/ */
public final void flush() throws IOException { public final void flush() throws IOException {
if (Thread.currentThread() == receiveThread) throw new IllegalStateException("sendPacket accessed in the receive thread");
synchronized (slocksock) { synchronized (slocksock) {
outputStream.flush(); outputStream.flush();
rootOutputStream.flush(); rootOutputStream.flush();
@ -542,6 +558,7 @@ public class NetMarshalClient implements Closeable {
/** /**
* Gets the {@link BiConsumer} receiver consumer. * Gets the {@link BiConsumer} receiver consumer.
* WARNING: {@link #sendPacket(IPacket, boolean)} and {@link #flush()} cannot be called within the consumer.
* *
* @return The receiver consumer or null. * @return The receiver consumer or null.
*/ */
@ -551,6 +568,7 @@ public class NetMarshalClient implements Closeable {
/** /**
* Sets the {@link BiConsumer} receiver consumer. * Sets the {@link BiConsumer} receiver consumer.
* WARNING: {@link #sendPacket(IPacket, boolean)} and {@link #flush()} cannot be called within the consumer.
* *
* @param consumer The new receiver consumer. * @param consumer The new receiver consumer.
* @throws NullPointerException consumer is null. * @throws NullPointerException consumer is null.
@ -614,6 +632,7 @@ public class NetMarshalClient implements Closeable {
fragmentMonitorThread.interrupt(); fragmentMonitorThread.interrupt();
fragmentFinishReceiveMonitorThread.interrupt(); fragmentFinishReceiveMonitorThread.interrupt();
fragmentFinishSendMonitorThread.interrupt(); fragmentFinishSendMonitorThread.interrupt();
fragmentSendThread.interrupt();
} }
receivedPackets.clear(); receivedPackets.clear();
synchronized (slockReceive) { synchronized (slockReceive) {
@ -681,7 +700,7 @@ public class NetMarshalClient implements Closeable {
} }
} }
synchronized (slocksock) { synchronized (slocksock) {
sendFragmentData(); slocksock.notifyAll();
} }
} else { } else {
if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this); if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this);

View File

@ -194,6 +194,7 @@ public class NetMarshalServer implements Closeable {
* @throws IOException A stream exception has occurred. * @throws IOException A stream exception has occurred.
* @throws PacketException An exception has occurred. * @throws PacketException An exception has occurred.
* @throws NullPointerException packetIn is null. * @throws NullPointerException packetIn is null.
* @throws IllegalStateException sendPacket accessed in the receive thread.
*/ */
public final void broadcastPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException { public final void broadcastPacket(IPacket packetIn, boolean directSend) throws IOException, PacketException {
if (packetIn == null) throw new NullPointerException("packetIn is null"); if (packetIn == null) throw new NullPointerException("packetIn is null");
@ -207,6 +208,7 @@ public class NetMarshalServer implements Closeable {
* Flushes all the output streams on all the clients. * Flushes all the output streams on all the clients.
* *
* @throws IOException A stream exception has occurred. * @throws IOException A stream exception has occurred.
* @throws IllegalStateException flush accessed in the receive thread.
*/ */
public final void flush() throws IOException { public final void flush() throws IOException {
synchronized (slocksock) { synchronized (slocksock) {
@ -226,6 +228,7 @@ public class NetMarshalServer implements Closeable {
/** /**
* Gets the {@link BiConsumer} receiver consumer. * Gets the {@link BiConsumer} receiver consumer.
* WARNING: {@link #broadcastPacket(IPacket, boolean)} and {@link #flush()} cannot be called within the consumer.
* *
* @return The receiver consumer or null. * @return The receiver consumer or null.
*/ */
@ -235,6 +238,7 @@ public class NetMarshalServer implements Closeable {
/** /**
* Sets the {@link BiConsumer} receiver consumer. * Sets the {@link BiConsumer} receiver consumer.
* WARNING: {@link #broadcastPacket(IPacket, boolean)} and {@link #flush()} cannot be called within the consumer.
* *
* @param consumer The new receiver consumer. * @param consumer The new receiver consumer.
* @throws NullPointerException consumer is null. * @throws NullPointerException consumer is null.