Update net test to use the latest calmnetlib via using marshals.

This commit is contained in:
Captain ALM 2023-05-21 01:29:47 +01:00
parent 04e22a7dc7
commit be3dff489d
Signed by: alfred
GPG Key ID: 4E4ADD02609997B1
4 changed files with 259 additions and 588 deletions

View File

@ -1,6 +1,6 @@
BSD 3-Clause License
Copyright (c) 2022, Captain ALM
Copyright (c) 2023, Captain ALM
All rights reserved.
Redistribution and use in source and binary forms, with or without

View File

@ -1,6 +1,6 @@
# Captain ALM Network Library Tester (Java)
This is the tester for the [calmnetlib](https://code.mrmelon54.xyz/alfred/calmnetlib) java library.
This is the tester for the [calmnetlib](https://code.mrmelon54.com/alfred/calmnetlib) java library.
There is a keystore file with the password keystore containing a self-signed localhost certificate for testing.

View File

@ -1,5 +1,9 @@
package com.captainalm.test.calmnet;
import com.captainalm.lib.calmnet.marshal.FragmentationOptions;
import com.captainalm.lib.calmnet.marshal.NetMarshalClient;
import com.captainalm.lib.calmnet.marshal.NetMarshalServer;
import com.captainalm.lib.calmnet.packet.PacketLoader;
import com.captainalm.lib.calmnet.ssl.*;
import com.captainalm.lib.calmnet.packet.IPacket;
import com.captainalm.lib.calmnet.packet.PacketException;
@ -12,22 +16,26 @@ import java.io.*;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.Queue;
import java.util.*;
public final class Main {
private static NetworkRuntime runtime;
private static Thread recvThread;
private static boolean akned = false;
public static final MyPacketFactory factory = new MyPacketFactory(new PacketLoader());
private static NetMarshalServer server;
private static NetMarshalClient client;
private static final Map<NetMarshalClient, Thread> recvThreads = Collections.synchronizedMap(new HashMap<>());
private final static Queue<DataPacket> packetQueue = new LinkedList<>();
private static SSLContext sslContext;
private static boolean isClient;
private static String sslHost;
private static boolean sslUpgraded;
private static final Object slockAckned = new Object();
private static boolean ackn;
private static int sendLoopsRemainingSetting;
private static int sendLoopWaitTime;
private static final Object slock = new Object();
public static void main(String[] args) {
@ -86,7 +94,8 @@ public final class Main {
}
private static void start() {
if (runtime != null && runtime.isProcessing()) return;
if (server != null && server.isRunning()) return;
if (client != null && client.isRunning()) return;
Console.writeLine("Socket Setup:");
Console.writeLine("IP Address:");
InetAddress address = null;
@ -113,6 +122,14 @@ public final class Main {
fverifyp = (fopt == 'Y' || fopt == 'y');
}
FragmentationOptions fragOpts;
if (fragmentation) {
fragOpts = new FragmentationOptions();
fragOpts.verifyFragments = fverifyp;
fragOpts.equalityVerifyFragments = fverifyp;
} else {
fragOpts = null;
}
Console.writeLine("Select Socket Mode:");
Console.writeLine("0) TCP Listen");
Console.writeLine("1) TCP Client");
@ -127,14 +144,17 @@ public final class Main {
char opt = Console.readCharacter();
Console.writeLine("Starting Socket...");
runtime = null;
server = null;
client = null;
sslUpgraded = false;
boolean connectFromServer = false;
switch (opt) {
case '0':
sendLoopsRemainingSetting = 1;
sendLoopWaitTime = 50;
try (ServerSocket serverSocket = new ServerSocket(port, 1, address)) {
Socket socket = serverSocket.accept();
runtime = new NetworkRuntime(socket, fragmentation, fverifyp);
try {
ServerSocket serverSocket = new ServerSocket(port, 1, address);
server = new NetMarshalServer(serverSocket, factory, factory.getPacketLoader(), fragOpts);
isClient = false;
} catch (IOException e) {
e.printStackTrace();
@ -145,7 +165,7 @@ public final class Main {
sendLoopWaitTime = 50;
try {
Socket socket = new Socket(address, port);
runtime = new NetworkRuntime(socket, fragmentation, fverifyp);
client = new NetMarshalClient(socket, factory, factory.getPacketLoader(), fragOpts);
isClient = true;
} catch (IOException e) {
e.printStackTrace();
@ -155,7 +175,7 @@ public final class Main {
requestSendSettings();
try {
DatagramSocket socket = new DatagramSocket(port, address);
runtime = new NetworkRuntime(socket, fragmentation, fverifyp, null, -1);
server = new NetMarshalServer(socket, factory, factory.getPacketLoader(), fragOpts);
isClient = false;
} catch (SocketException e) {
e.printStackTrace();
@ -165,9 +185,10 @@ public final class Main {
requestSendSettings();
try {
DatagramSocket socket = new DatagramSocket();
runtime = new NetworkRuntime(socket, fragmentation, fverifyp, address, port);
server = new NetMarshalServer(socket, factory, factory.getPacketLoader(), fragOpts);
connectFromServer = true;
isClient = true;
} catch (SocketException e) {
} catch (IOException e) {
e.printStackTrace();
}
break;
@ -176,8 +197,7 @@ public final class Main {
try {
MulticastSocket socket = new MulticastSocket(port);
if (!socket.getLoopbackMode()) socket.setLoopbackMode(true);
socket.joinGroup(address);
runtime = new NetworkRuntime(socket, fragmentation, fverifyp, address, port);
client = new NetMarshalClient(socket, address, port, factory, factory.getPacketLoader(), fragOpts);
isClient = false;
} catch (IOException e) {
e.printStackTrace();
@ -187,9 +207,10 @@ public final class Main {
requestSendSettings();
try {
DatagramSocket socket = new DatagramSocket(port, address);
runtime = new NetworkRuntime(socket, fragmentation, fverifyp, address, port);
server = new NetMarshalServer(socket, factory, factory.getPacketLoader(), fragOpts);
connectFromServer = true;
isClient = true;
} catch (SocketException e) {
} catch (IOException e) {
e.printStackTrace();
}
break;
@ -198,8 +219,7 @@ public final class Main {
try {
MulticastSocket socket = new MulticastSocket(port);
if (socket.getLoopbackMode()) socket.setLoopbackMode(false);
socket.joinGroup(address);
runtime = new NetworkRuntime(socket, fragmentation, fverifyp, address, port);
client = new NetMarshalClient(socket, address, port, factory, factory.getPacketLoader(), fragOpts);
isClient = false;
} catch (IOException e) {
e.printStackTrace();
@ -209,11 +229,12 @@ public final class Main {
sendLoopsRemainingSetting = 1;
sendLoopWaitTime = 50;
if (sslContext == null) break;
try (SSLServerSocket serverSocket = SSLUtilities.getSSLServerSocket(sslContext, port, 1, address)) {
Socket socket = serverSocket.accept();
runtime = new NetworkRuntime(socket, fragmentation, fverifyp);
try {
SSLServerSocket serverSocket = SSLUtilities.getSSLServerSocket(sslContext, port, 1, address);
server = new NetMarshalServer(serverSocket, factory, factory.getPacketLoader(), fragOpts);
isClient = false;
} catch (IOException | SSLUtilityException e) {
sslUpgraded = true;
} catch (SSLUtilityException e) {
e.printStackTrace();
}
break;
@ -223,58 +244,75 @@ public final class Main {
if (sslContext == null) break;
try {
Socket socket = SSLUtilities.getSSLClientSocket(sslContext, sslHost, port);
runtime = new NetworkRuntime(socket, fragmentation, fverifyp);
client = new NetMarshalClient(socket, factory, factory.getPacketLoader(), fragOpts);
isClient = true;
sslUpgraded = true;
} catch (SSLUtilityException e) {
e.printStackTrace();
}
break;
}
if (runtime == null) {
if (client == null && server == null) {
Console.writeLine("!FAILED TO START!");
} else {
while (runtime.notReadyToSend()) {
if (server != null) {
server.setAcceptExceptionBiConsumer(Main::errH);
server.setReceiveExceptionBiConsumer(Main::errH);
server.setReceiveBiConsumer(Main::sslUpgUnit);
server.setOpenedConsumer(Main::connectH);
server.setClosedConsumer(Main::closeH);
server.open();
if (connectFromServer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
server.connect(address, port, 0);
} catch (IOException e) {
e.printStackTrace();
}
}
createAndStartRecvThread();
}
if (client != null) {
client.setReceiveExceptionBiConsumer(Main::errH);
client.setReceiveBiConsumer(Main::sslUpgUnit);
client.setClosedConsumer(Main::closeH);
client.open();
connectH(client);
}
Console.writeLine("Socket Started.");
}
}
private static void requestSendSettings() {
Console.writeLine("Enter number of send retries:");
Integer num = Console.readInt();
if (num == null || num < 0) num = 0;
Console.writeLine("Send Retires set to: " + num);
sendLoopsRemainingSetting = num + 1;
Console.writeLine("Enter timeout before send retry (Milliseconds) :");
num = Console.readInt();
if (num == null || num < 50) num = 50;
Console.writeLine("Retry Send Timeout set to: " + num);
sendLoopWaitTime = num;
}
private static void createAndStartRecvThread() {
recvThread = new Thread(() -> {
private static void connectH(NetMarshalClient client) {
Thread recvThread = new Thread(() -> {
PacketType nextType = null;
Path nextPath = null;
while (runtime != null && runtime.isProcessing()) {
if (sslUpgraded) {
try {
client.sendPacket(new NetworkSSLUpgradePacket(false), true);
} catch (IOException | PacketException e) {
e.printStackTrace();
}
}
while (client.isRunning()) {
try {
IPacket packet;
while ((packet = runtime.receiveLastPacket()) != null) {
while ((packet = client.receivePacket()) != null) {
if (!packet.isValid()) continue;
if (packet instanceof AKNPacket) {
synchronized (slockAckned) {
ackn = true;
slockAckned.notifyAll();
}
}
if (packet instanceof TypePacket && nextType == null) {
nextType = ((TypePacket) packet).type;
runtime.sendPacket(new AKNPacket(), false);
client.sendPacket(new AKNPacket(), false);
}
if (packet instanceof DataPacket && nextType == PacketType.Message) {
nextType = null;
synchronized (slock) {
packetQueue.add((DataPacket) packet);
}
runtime.sendPacket(new AKNPacket(), false);
client.sendPacket(new AKNPacket(), false);
}
if (packet instanceof DataPacket && nextType == PacketType.Name) {
nextType = null;
@ -283,7 +321,7 @@ public final class Main {
} catch (PacketException e) {
e.printStackTrace();
}
runtime.sendPacket(new AKNPacket(), false);
client.sendPacket(new AKNPacket(), false);
}
if (packet instanceof StreamedDataPacket && nextType == PacketType.Data && nextPath != null) {
nextType = null;
@ -299,62 +337,120 @@ public final class Main {
nextPath = null;
}
}
if (packet instanceof NetworkSSLUpgradePacket && sslContext != null) {
if (((NetworkSSLUpgradePacket) packet).isAcknowledgement()) {
runtime.sslUpgrade(sslContext, sslHost, isClient);
} else {
runtime.sendPacket(new NetworkSSLUpgradePacket(true), true);
runtime.sslUpgrade(sslContext, sslHost, isClient);
}
} catch (InterruptedException e) {
} catch (IOException | PacketException e) {
e.printStackTrace();
}
}
}, "recv_thread_"+client.remoteAddress()+":"+client.remotePort());
recvThread.start();
recvThreads.put(client, recvThread);
}
private static void closeH(NetMarshalClient client) {
Thread waitOn = recvThreads.remove(client);
if (waitOn != null) {
try {
Thread.sleep(500);
waitOn.join();
} catch (InterruptedException e) {
}
}
}, "main_recv_thread");
recvThread.start();
}
private static void sslUpgUnit(IPacket packet, NetMarshalClient client) {
try {
if (packet instanceof NetworkSSLUpgradePacket && sslContext != null) {
if (!((NetworkSSLUpgradePacket) packet).isAcknowledgement()) {
client.sendPacket(new NetworkSSLUpgradePacket(true), true);
}
if (isClient) client.sslUpgradeClientSide(sslContext, sslHost);
else client.sslUpgradeServerSide(sslContext);
sslUpgraded = true;
}
} catch (PacketException | IOException | SSLUtilityException e) {
e.printStackTrace();
try {
client.close();
} catch (IOException ex) {
e.printStackTrace();
}
}
}
private static void errH(Exception ex, NetMarshalServer server) {
ex.printStackTrace();
}
private static void errH(Exception ex, NetMarshalClient client) {
ex.printStackTrace();
}
private static void requestSendSettings() {
Console.writeLine("Enter number of send retries:");
Integer num = Console.readInt();
if (num == null || num < 0) num = 0;
Console.writeLine("Send Retires set to: " + num);
sendLoopsRemainingSetting = num + 1;
Console.writeLine("Enter timeout before send retry (Milliseconds) :");
num = Console.readInt();
if (num == null || num < 50) num = 50;
Console.writeLine("Retry Send Timeout set to: " + num);
sendLoopWaitTime = num;
}
private static void stop() {
if (runtime == null || !runtime.isProcessing()) return;
if ((server == null || !server.isRunning()) && (client == null || !client.isRunning())) return;
Console.writeLine("Socket Stopping...");
runtime.stopProcessing();
if (server != null) {
try {
recvThread.join();
} catch (InterruptedException e) {
server.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
server = null;
}
}
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
client = null;
}
}
runtime = null;
isClient = false;
sslUpgraded = false;
Console.writeLine("Socket Stopped.");
}
private static boolean waitForAKN(IPacket packet) {
if (packet instanceof AKNPacket && packet.isValid()) {
akned = true;
return false;
}
return true;
}
private static void doAKNWait(IPacket packet) {
akned = false;
runtime.setPacketReceiveCallback(Main::waitForAKN);
ackn = false;
int i = 0;
while (!akned) {
runtime.sendPacket(packet, false);
while (++i <= sendLoopsRemainingSetting && !ackn) {
try {
Thread.sleep(sendLoopWaitTime);
if (server != null) {
server.broadcastPacket(packet, false);
}
if (client != null) {
client.sendPacket(packet, false);
}
} catch (IOException | PacketException e) {
e.printStackTrace();
}
try {
synchronized (slockAckned) {
slockAckned.wait(sendLoopWaitTime);
}
} catch (InterruptedException e) {
}
if (++i >= sendLoopsRemainingSetting) akned = true;
}
runtime.setPacketReceiveCallback(null);
}
private static void message() {
if (runtime == null || !runtime.isProcessing()) return;
if ((server == null || !server.isRunning()) && (client == null || !client.isRunning())) return;
Console.writeLine("Message To Send:");
String message = Console.readString();
doAKNWait(new TypePacket(PacketType.Message));
@ -364,7 +460,7 @@ public final class Main {
}
private static void send() {
if (runtime == null || !runtime.isProcessing()) return;
if ((server == null || !server.isRunning()) && (client == null || !client.isRunning())) return;
Console.writeLine("Path of File To Send:");
File file = new File(Console.readString());
if (file.exists()) {
@ -398,13 +494,14 @@ public final class Main {
}
private static void sslSetup() {
if (runtime != null && runtime.isSSLUpgraded()) return;
if (sslUpgraded) return;
Console.writeLine("SSL Setup:");
Console.writeLine("SSL Host Name (Enter empty to set to null):");
sslHost = Console.readString();
if (sslHost.equals("")) sslHost = null;
Console.writeLine("SSL Keystore Path:");
String kpath = Console.readString();
if (!kpath.equals("")) {
Console.writeLine("SSL Keystore Password:");
String kpass = Console.readString();
if (kpass.equals("")) kpass = "changeit";
@ -415,27 +512,53 @@ public final class Main {
e.printStackTrace();
Console.writeLine("SSL Setup Failed!");
}
} else {
sslContext = null;
Console.writeLine("SSL Setup Cleared!");
}
}
private static void upgrade() {
if (runtime == null || !runtime.isProcessing() || sslContext == null) return;
if (sslContext == null) return;
if (server != null) {
Console.writeLine("Upgrading Connections to SSL...");
try {
server.broadcastPacket(new NetworkSSLUpgradePacket(false), true);
} catch (IOException | PacketException e) {
e.printStackTrace();
}
}
if (client != null) {
Console.writeLine("Upgrading Connection to SSL...");
runtime.sendPacket(new NetworkSSLUpgradePacket(false), true);
try {
client.sendPacket(new NetworkSSLUpgradePacket(false), true);
} catch (IOException | PacketException e) {
e.printStackTrace();
}
}
}
private static void info() {
Console.writeLine("INFORMATION:");
Console.writeLine("Local Socket: " + ((runtime != null && runtime.isProcessing()) ? runtime.getLocalAddress() + ":" + runtime.getLocalPort() : ":"));
Console.writeLine("Remote Socket: " + ((runtime != null && runtime.isProcessing()) ? runtime.getTargetAddress() + ":" + runtime.getTargetPort() : ":"));
Console.writeLine("Is Active: " + ((runtime != null && runtime.isProcessing()) ? "Yes" : "No"));
Console.writeLine("Number Of Packets To Process: " + (((runtime == null) ? 0 : runtime.numberOfQueuedReceivedPackets()) + packetQueue.size()));
Console.writeLine("SSL Upgrade Status: " + ((runtime != null && runtime.isSSLUpgraded()) ? "Upgraded" : "Not Upgraded"));
if (server != null) {
Console.writeLine("Local Socket: " + ((server.isRunning()) ? server.localAddress() + ":" + server.localPort() : ":"));
Console.writeLine("Client Count: " + server.getConnectedClients().length);
Console.writeLine("Is Active: Yes");
}
if (client != null) {
Console.writeLine("Local Socket: " + ((client.isRunning()) ? client.localAddress() + ":" + client.localPort() : ":"));
Console.writeLine("Remote Socket: " + ((client.isRunning()) ? client.remoteAddress() + ":" + client.remotePort() : ":"));
Console.writeLine("Is Active: Yes");
}
if (client == null && server == null) Console.writeLine("Is Active: No");
Console.writeLine("Number Of Packets To Process: " + packetQueue.size());
Console.writeLine("SSL Upgrade Status: " + ((sslUpgraded) ? "Upgraded" : "Not Upgraded"));
Console.writeLine("SSL Host Name: " + ((sslHost == null) ? "<null>" : sslHost));
Console.writeLine("SSL Context Status: " + ((sslContext == null) ? "Unavailable" : "Available"));
}
private static void header() {
Console.writeLine("C-ALM Net Test (C) Captain ALM 2022");
Console.writeLine("C-ALM Net Test (C) Captain ALM 2023");
Console.writeLine("Under The BSD 3-Clause License");
}

View File

@ -1,452 +0,0 @@
package com.captainalm.test.calmnet;
import com.captainalm.lib.calmnet.ssl.*;
import com.captainalm.lib.calmnet.packet.*;
import com.captainalm.lib.calmnet.packet.core.NetworkSSLUpgradePacket;
import com.captainalm.lib.calmnet.packet.fragment.*;
import com.captainalm.lib.calmnet.stream.NetworkInputStream;
import com.captainalm.lib.calmnet.stream.NetworkOutputStream;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;
import java.util.function.Function;
/**
* This class is the network runtime class.
*
* @author Captain ALM
*/
public final class NetworkRuntime {
public final MyPacketFactory factory = new MyPacketFactory(new PacketLoader());
private NetworkInputStream inputStream;
private NetworkOutputStream outputStream;
private FragmentReceiver fragmentReceiver;
private final HashMap<Integer, LocalDateTime> fragmentRMM = new HashMap<>();
private FragmentSender fragmentSender;
private final HashMap<Integer, LocalDateTime> fragmentSMM = new HashMap<>();
private boolean processing = true;
private InetAddress targetAddress;
private int targetPort = -1;
private final Queue<IPacket> packetQueue = new LinkedList<>();
private Function<IPacket, Boolean> packetReceiveCallback;
private final Object slock = new Object();
private final Object slocksend = new Object();
private final Object slockfmon = new Object();
private final Object slockupg = new Object();
/**
* Constructs a new NetworkRuntime with the specified parameters.
*
* @param socketIn The socket to use.
* @param useFragmentation If fragmentation should be used.
* @param verifyFragmentPayloads If a fragment payload should be verified.
*/
public NetworkRuntime(Socket socketIn, boolean useFragmentation, boolean verifyFragmentPayloads) {
inputStream = new NetworkInputStream(socketIn);
outputStream = new NetworkOutputStream(socketIn);
init(useFragmentation, verifyFragmentPayloads);
}
/**
* Constructs a new NetworkRuntime with the specified parameters.
*
* @param socketIn The datagram socket to use.
* @param useFragmentation If fragmentation should be used.
* @param verifyFragmentPayloads If a fragment payload should be verified.
* @param address The target address (Can be null).
* @param port The target port.
*/
public NetworkRuntime(DatagramSocket socketIn, boolean useFragmentation, boolean verifyFragmentPayloads, InetAddress address, int port) {
inputStream = new NetworkInputStream(socketIn);
outputStream = new NetworkOutputStream(socketIn);
targetAddress = address;
targetPort = port;
if (address != null && port >= 0) {
try {
outputStream.setDatagramTarget(address, port);
} catch (IOException e) {
}
}
try {
outputStream.setDatagramBufferSize(65535);
} catch (IOException e) {
}
init(useFragmentation, verifyFragmentPayloads);
}
private void init(boolean useFragmentation, boolean verifyFragmentPayloads) {
fragmentReceiver = (useFragmentation) ? new FragmentReceiver(factory.getPacketLoader(), factory) : null;
if (fragmentReceiver != null) {
fragmentReceiver.setResponseVerification(verifyFragmentPayloads);
fragmentReceiver.setSentDataWillBeAllVerified(verifyFragmentPayloads);
}
fragmentSender = (useFragmentation) ? new FragmentSender(factory.getPacketLoader()) : null;
if (fragmentSender != null) {
fragmentSender.setResponseVerification(verifyFragmentPayloads);
fragmentSender.setSentDataWillBeAllVerified(verifyFragmentPayloads);
}
receiveThread.start();
if (useFragmentation) {
fragmentMonitorThread.start();
fragmentFinishRecvMonitorThread.start();
fragmentFinishSendMonitorThread.start();
fragmentReceiveThread.start();
fragmentSendThread.start();
}
}
private final Thread receiveThread = new Thread(() -> {
while (processing) {
try {
IPacket packet = factory.getPacketLoader().readStreamedPacket(inputStream, factory, null);
if (packet == null) continue;
if (inputStream.getDatagramSocket() != null && (targetAddress == null || targetPort < 0)) {
targetAddress = inputStream.getAddress();
targetPort = inputStream.getPort();
if (targetAddress != null && targetPort >= 0) {
try {
outputStream.setDatagramTarget(targetAddress, targetPort);
} catch (IOException e) {
}
}
}
if (fragmentReceiver != null) {
updateMState(fragmentRMM, packet);
fragmentReceiver.receivePacket(packet);
}
if (fragmentSender != null) {
updateMState(fragmentSMM, packet);
fragmentSender.receivePacket(packet);
}
synchronized (slock) {
if (packetReceiveCallback == null || packetReceiveCallback.apply(packet)) packetQueue.add(packet);
}
if (packet.isValid() && packet instanceof NetworkSSLUpgradePacket) {
synchronized (slockupg) {
int timeout = 4;
while (!isSSLUpgraded() && inputStream.getDatagramSocket() == null && timeout-- > 0) slockupg.wait();
}
}
synchronized (slocksend) {
slocksend.notifyAll();
}
} catch (PacketException | IOException e) {
e.printStackTrace();
stopProcessing();
} catch (InterruptedException e) {
}
}
}, "recv_thread");
private final Thread fragmentReceiveThread = new Thread(() -> {
while (processing) {
try {
IPacket packet = fragmentReceiver.receivePacket();
if (packet == null) continue;
synchronized (slock) {
if (packetReceiveCallback == null || packetReceiveCallback.apply(packet)) packetQueue.add(packet);
}
} catch (InterruptedException e) {
}
}
}, "frag_recv_thread");
private final Thread fragmentSendThread = new Thread(() -> {
while (processing) {
try {
synchronized (slocksend) {
slocksend.wait();
IPacket[] packets = fragmentSender.sendPacket();
for (IPacket c : packets) if (c != null) {
updateMState(fragmentSMM, c);
factory.getPacketLoader().writePacket(outputStream, c, true);
}
packets = fragmentReceiver.sendPacket();
for (IPacket c : packets) if (c != null) {
updateMState(fragmentRMM, c);
factory.getPacketLoader().writePacket(outputStream, c, true);
}
}
} catch (PacketException | IOException e) {
e.printStackTrace();
stopProcessing();
} catch (InterruptedException e) {
}
}
}, "frag_send_thread");
private final Thread fragmentMonitorThread = new Thread(() -> {
while (processing) {
int id = -1;
synchronized (slockfmon) {
for (int c : fragmentRMM.keySet()) {
if (!fragmentRMM.get(c).plusSeconds(29).isAfter(LocalDateTime.now())) {
fragmentRMM.remove(id);
fragmentReceiver.deletePacketFromRegistry(c);
}
}
for (int c : fragmentSMM.keySet()) {
if (!fragmentSMM.get(c).plusSeconds(29).isAfter(LocalDateTime.now())) {
fragmentSMM.remove(id);
fragmentSender.deletePacketFromRegistry(c);
}
}
}
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
}
}
}, "frag_mntr_thread");
private final Thread fragmentFinishRecvMonitorThread = new Thread(() -> {
while (processing) {
int id = -1;
try {
while ((id = fragmentReceiver.getLastIDFinished()) != -1) synchronized (slockfmon) {
fragmentRMM.remove(id);
}
} catch (InterruptedException e) {
}
}
}, "frag_fin_recv_mntr_thread");
private final Thread fragmentFinishSendMonitorThread = new Thread(() -> {
while (processing) {
int id = -1;
try {
while ((id = fragmentSender.getLastIDFinished()) != -1) synchronized (slockfmon) {
fragmentSMM.remove(id);
}
} catch (InterruptedException e) {
}
}
}, "frag_fin_send_mntr_thread");
private void updateMState(HashMap<Integer, LocalDateTime> mm, IPacket packet) {
if (packet == null || !packet.isValid()) return;
synchronized (slockfmon) {
if (packet instanceof FragmentAllocationPacket) {
mm.put(((FragmentAllocationPacket) packet).getPacketID(), LocalDateTime.now());
} else if (packet instanceof FragmentPIDPacket && !(packet instanceof FragmentSendStopPacket)) {
if (mm.containsKey(((FragmentPIDPacket) packet).getPacketID()))
mm.put(((FragmentPIDPacket) packet).getPacketID(), LocalDateTime.now());
}
}
}
/**
* Gets the current remote endpoint address or null.
*
* @return The remote address or null.
*/
public InetAddress getTargetAddress() {
if (!processing) return null;
return (targetAddress == null) ? inputStream.getAddress() : targetAddress;
}
/**
* Gets the current target port or -1.
*
* @return The target port or -1.
*/
public int getTargetPort() {
if (!processing) return -1;
return (targetPort < 0) ? inputStream.getPort() : targetPort;
}
/**
* Gets the current remote endpoint address or null.
*
* @return The remote address or null.
*/
public InetAddress getLocalAddress() {
if (!processing) return null;
return inputStream.getLocalAddress();
}
/**
* Gets the current local port or -1.
*
* @return The local port or -1.
*/
public int getLocalPort() {
if (!processing) return -1;
return inputStream.getLocalPort();
}
/**
* Performs an SSL Upgrade on a TCP Connection.
*
* @param context The SSL Context to use.
* @param host The host to check the remote certificate using (Set to null on server).
* @param isClientSide If the caller is directly connected (Not using an accepted socket).
*/
public void sslUpgrade(SSLContext context, String host, boolean isClientSide) {
if (!processing || inputStream.getSocket() == null || inputStream.getSocket() instanceof SSLSocket) return;
synchronized (slock) {
synchronized (slocksend) {
synchronized (slockupg) {
Socket original = inputStream.getSocket();
try {
inputStream.setSocket(SSLUtilities.upgradeClientSocketToSSL(context, inputStream.getSocket(), host, inputStream.getPort(), true, isClientSide));
outputStream.setSocket(inputStream.getSocket());
} catch (SSLUtilityException | IOException e) {
e.printStackTrace();
try {
inputStream.setSocket(original);
outputStream.setSocket(original);
} catch (IOException ex) {
}
}
slockupg.notifyAll();
}
}
}
}
/**
* Is the runtime SSL Upgraded.
*
* @return If the runtime is upgraded.
*/
public boolean isSSLUpgraded() {
if (!processing) return false;
return inputStream.getSocket() instanceof SSLSocket;
}
/**
* Sends a packet.
*
* @param packet The packet to send.
* @param forceNormalSend Forces a normal send operation without fragmentation.
*/
public void sendPacket(IPacket packet, boolean forceNormalSend) {
if (packet == null || notReadyToSend()) return;
synchronized (slocksend) {
if (fragmentSender == null || forceNormalSend) {
try {
factory.getPacketLoader().writePacket(outputStream, packet, true);
slocksend.notifyAll();
} catch (IOException | PacketException e) {
e.printStackTrace();
stopProcessing();
}
} else {
fragmentSender.sendPacket(packet);
slocksend.notifyAll();
}
}
}
/**
* Receives the last packet.
*
* @return Receives the last packet.
*/
public IPacket receiveLastPacket() {
if (!processing) return null;
synchronized (slock) {
return packetQueue.poll();
}
}
/**
* Gets the number of queued received packets.
*
* @return The number of queued received packets.
*/
public int numberOfQueuedReceivedPackets() {
synchronized (slock) {
return packetQueue.size();
}
}
/**
* Gets the packet receive callback.
* The return value of the passed function is whether to add the packet to the receive queue.
*
* @return The packet receive callback.
*/
public Function<IPacket, Boolean> getPacketReceiveCallback() {
return packetReceiveCallback;
}
/**
* Sets the packet receive callback.
* The return value of the passed function is whether to add the packet to the receive queue.
*
* @param callback The new packet receive callback.
*/
public void setPacketReceiveCallback(Function<IPacket, Boolean> callback) {
synchronized (slock) {
packetReceiveCallback = callback;
}
}
/**
* Is the runtime not ready to send.
*
* @return Cannot send.
*/
public boolean notReadyToSend() {
return !processing || ((outputStream.getSocket() == null) && (outputStream.getDatagramSocket() == null || targetAddress == null || targetPort < 0));
}
/**
* Gets if the runtime is processing.
*
* @return Is the runtime processing.
*/
public boolean isProcessing() {
return processing;
}
/**
* Stops processing.
*/
public void stopProcessing() {
if (processing) {
processing = false;
if (Thread.currentThread() != fragmentSendThread && fragmentSendThread.isAlive()) fragmentSendThread.interrupt();
if (Thread.currentThread() != fragmentReceiveThread && fragmentReceiveThread.isAlive()) fragmentReceiveThread.interrupt();
if (Thread.currentThread() != fragmentMonitorThread && fragmentMonitorThread.isAlive()) fragmentMonitorThread.interrupt();
if (Thread.currentThread() != fragmentFinishRecvMonitorThread && fragmentFinishRecvMonitorThread.isAlive()) fragmentFinishRecvMonitorThread.interrupt();
if (Thread.currentThread() != fragmentFinishSendMonitorThread && fragmentFinishSendMonitorThread.isAlive()) fragmentFinishSendMonitorThread.interrupt();
try {
inputStream.close();
} catch (IOException e) {
}
inputStream = null;
try {
outputStream.close();
} catch (IOException e) {
}
outputStream = null;
if (fragmentSender != null) {
fragmentSender.clearLastIDFinished();
fragmentSender.clearRegistry();
fragmentSender.clearWaitingPackets();
fragmentSender = null;
fragmentSMM.clear();
}
if (fragmentReceiver != null) {
fragmentReceiver.clearLastIDFinished();
fragmentReceiver.clearRegistry();
fragmentReceiver.clearWaitingPackets();
fragmentReceiver = null;
fragmentRMM.clear();
}
}
}
}