Fix up NetMarshalClient.

This commit is contained in:
Captain ALM 2023-05-19 01:01:00 +01:00
parent 790426eb3e
commit 5cfdcbdbfa
Signed by: alfred
GPG Key ID: 4E4ADD02609997B1

View File

@ -11,10 +11,7 @@ import com.captainalm.lib.calmnet.stream.NetworkOutputStream;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocket;
import java.io.Closeable; import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.MulticastSocket; import java.net.MulticastSocket;
@ -25,6 +22,7 @@ import java.util.function.BiConsumer;
/** /**
* This class provides a managed way of networking on the client side. * This class provides a managed way of networking on the client side.
* NOTE: Methods that are synchronised are used here, do NOT use instances of these classes as monitors.
* *
* @author Captain ALM * @author Captain ALM
*/ */
@ -38,7 +36,7 @@ public class NetMarshalClient implements Closeable {
protected InputStream rootInputStream; protected InputStream rootInputStream;
protected OutputStream rootOutputStream; protected OutputStream rootOutputStream;
protected BiConsumer<IPacket, NetMarshalClient> receiveBiConsumer; protected BiConsumer<IPacket, NetMarshalClient> receiveBiConsumer;
protected static final BiConsumer<IPacket, NetMarshalClient> dummy = (packet, netMarshalClient) -> {}; protected BiConsumer<Exception, NetMarshalClient> receiveExceptionBiConsumer;
protected final Object slockPacketRead = new Object(); protected final Object slockPacketRead = new Object();
protected boolean disablePacketReading; protected boolean disablePacketReading;
@ -52,7 +50,7 @@ public class NetMarshalClient implements Closeable {
protected final Queue<IPacket> receivedPackets = new LinkedList<>(); protected final Queue<IPacket> receivedPackets = new LinkedList<>();
protected final Object slockReceive = new Object(); protected final Object slockReceive = new Object();
private NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull) { protected NetMarshalClient(InetAddress remoteAddress, int remotePort, IPacketFactory factory, PacketLoader loader, boolean isMulticast, boolean isSocketNull) {
if (isSocketNull) throw new NullPointerException("socketIn is null"); if (isSocketNull) throw new NullPointerException("socketIn is null");
if (remoteAddress == null) throw new NullPointerException(((isMulticast) ? "multicastGroupAddress" : "remoteAddress") + " is null"); if (remoteAddress == null) throw new NullPointerException(((isMulticast) ? "multicastGroupAddress" : "remoteAddress") + " is null");
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
@ -277,6 +275,7 @@ public class NetMarshalClient implements Closeable {
*/ */
public void setPacketsShouldBeRead(boolean shouldRead) { public void setPacketsShouldBeRead(boolean shouldRead) {
synchronized (slockPacketRead) { synchronized (slockPacketRead) {
if (receiveThread.isAlive()) receiveThread.interrupt();
disablePacketReading = !shouldRead; disablePacketReading = !shouldRead;
if (!disablePacketReading) slockPacketRead.notify(); if (!disablePacketReading) slockPacketRead.notify();
} }
@ -319,19 +318,22 @@ public class NetMarshalClient implements Closeable {
protected synchronized void sslUpgrade(SSLContext context, String remoteHostName) throws SSLUtilityException, IOException { protected synchronized void sslUpgrade(SSLContext context, String remoteHostName) throws SSLUtilityException, IOException {
if (!running || socket == null || socket instanceof SSLSocket) return; if (!running || socket == null || socket instanceof SSLSocket) return;
if (context == null) throw new NullPointerException("context is null"); if (context == null) throw new NullPointerException("context is null");
if (Thread.currentThread() != receiveThread || !disablePacketReading) throw new IllegalStateException("sslUpgrade methods should be called in a BiConsumer (for setReceiveBiConsumer) within the target NetMarshalClient" + if (!disablePacketReading && Thread.currentThread() != receiveThread) throw new IllegalStateException("sslUpgrade methods should be called in a BiConsumer (for setReceiveBiConsumer) within the target NetMarshalClient" +
" or when reading packets (arePacketsBeingRead) is disabled on the NetMarshalClient"); " or when reading packets (arePacketsBeingRead) is disabled on the NetMarshalClient");
synchronized (socket) {
Socket originalSocket = socket; Socket originalSocket = socket;
synchronized (originalSocket) {
try { try {
socket = SSLUtilities.upgradeClientSocketToSSL(context, socket, remoteHostName, socket.getPort(), true, remoteHostName != null); socket = SSLUtilities.upgradeClientSocketToSSL(context, socket, remoteHostName, socket.getPort(), true, remoteHostName != null);
if (rootInputStream instanceof NetworkInputStream) ((NetworkInputStream)rootInputStream).setSocket(socket); if (rootInputStream instanceof NetworkInputStream) ((NetworkInputStream) rootInputStream).setSocket(socket);
if (rootOutputStream instanceof NetworkOutputStream) ((NetworkOutputStream)rootOutputStream).setSocket(socket); if (rootOutputStream instanceof NetworkOutputStream) ((NetworkOutputStream) rootOutputStream).setSocket(socket);
} catch (SSLUtilityException | IOException e) { } catch (SSLUtilityException | IOException e) {
socket = originalSocket; socket = originalSocket;
try { try {
if (rootInputStream instanceof NetworkInputStream) ((NetworkInputStream)rootInputStream).setSocket(socket); if (rootInputStream instanceof NetworkInputStream) ((NetworkInputStream) rootInputStream).setSocket(socket);
if (rootOutputStream instanceof NetworkOutputStream) ((NetworkOutputStream)rootOutputStream).setSocket(socket); } catch (IOException ex) {
}
try {
if (rootOutputStream instanceof NetworkOutputStream) ((NetworkOutputStream) rootOutputStream).setSocket(socket);
} catch (IOException ex) { } catch (IOException ex) {
} }
throw e; throw e;
@ -352,6 +354,7 @@ public class NetMarshalClient implements Closeable {
* Sets the {@link BiConsumer} receiver consumer. * Sets the {@link BiConsumer} receiver consumer.
* *
* @param consumer The new receiver consumer. * @param consumer The new receiver consumer.
* @throws NullPointerException consumer is null.
*/ */
public void setReceiveBiConsumer(BiConsumer<IPacket, NetMarshalClient> consumer) { public void setReceiveBiConsumer(BiConsumer<IPacket, NetMarshalClient> consumer) {
if (consumer == null) throw new NullPointerException("consumer is null"); if (consumer == null) throw new NullPointerException("consumer is null");
@ -359,12 +362,23 @@ public class NetMarshalClient implements Closeable {
} }
/** /**
* Gets a dummy receiver consumer that does nothing. * Gets the {@link BiConsumer} receive exception consumer.
* *
* @return The dummy receiver consumer. * @return The exception consumer or null.
*/ */
public static BiConsumer<IPacket, NetMarshalClient> getDummyReceiver() { public BiConsumer<Exception, NetMarshalClient> getReceiveExceptionBiConsumer() {
return dummy; return receiveExceptionBiConsumer;
}
/**
* Sets the {@link BiConsumer} receive exception consumer.
*
* @param consumer The new exception consumer.
* @throws NullPointerException consumer is null.
*/
public void setReceiveExceptionBiConsumer(BiConsumer<Exception, NetMarshalClient> consumer) {
if (consumer == null) throw new NullPointerException("consumer is null");
receiveExceptionBiConsumer = consumer;
} }
/** /**
@ -391,16 +405,21 @@ public class NetMarshalClient implements Closeable {
while (disablePacketReading) slockPacketRead.wait(); while (disablePacketReading) slockPacketRead.wait();
} }
IPacket packet = loader.readStreamedPacket(inputStream, factory, null); IPacket packet = loader.readStreamedPacket(inputStream, factory, null);
synchronized (slockPacketRead) {
if (packet == null || !packet.isValid()) return; if (packet == null || !packet.isValid()) return;
if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this); if (receiveBiConsumer != null) receiveBiConsumer.accept(packet, this);
synchronized (slockReceive) { synchronized (slockReceive) {
receivedPackets.add(packet); receivedPackets.add(packet);
slockReceive.notify(); slockReceive.notify();
} }
} catch (InterruptedException | PacketException | IOException e) { }
} catch (InterruptedException | InterruptedIOException e) {
} catch (PacketException | IOException e) {
if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(e, this);
try { try {
close(); close();
} catch (IOException ex) { } catch (IOException ex) {
if (receiveExceptionBiConsumer != null) receiveExceptionBiConsumer.accept(ex, this);
} }
} }
} }