LearningPy/networker.py

240 lines
8.4 KiB
Python

#BSD 3-Clause, (C) Alfred Manville 2022
import pickle
import json
import socket
import time
from threading import Thread
import base64
#import traceback
#Defines a message class that has a type, header and a body.
class Message:
def __init__(self, mtype, header, content):
self.mtype = mtype
self.header = header
if mtype == MTYPE_File:
try:
f = open(header, "rb")
try:
self.content = f.read()
except:
print("An issue writing the message for \"" + self.header + "\" occured.")
f.close()
except:
print("An issue when opening a file for reading: \"" + self.header + "\" occured.")
#print(traceback.format_exc())
else:
self.content = content
def saveContent(self):
if self.mtype != MTYPE_File: pass
try:
f = open(str(self.header), "wb")
try:
if type(self.content) == bytes or type(self.content) == bytearray:
f.write(bytes(self.content))
else:
f.write(bytes(self.content, encoding='utf-8'))
except:
print("An issue writing the message for \"" + str(self.header) + "\" occured.")
#print(traceback.format_exc())
f.close()
except:
print("An issue when opening a file for writing: \"" + str(self.header) + "\" occured.")
def toDict(self):
toReturn = {"mtype":self.mtype, "header":self.header, "ident__":"Message"}
if type(self.content) == bytes or type(self.content) == bytearray:
toReturn["contentb64"] = True
toReturn["content"] = base64.b64encode(bytes(self.content)).decode()
else:
toReturn["contentb64"] = False
toReturn["content"] = self.content
return toReturn
#Message from dict:
def MessageFromDict(d):
if type(d) != None and d.get("ident__") == "Message":
if d.get("contentb64") != None:
if d.get("contentb64"):
return Message(int(d.get("mtype")),d.get("header"), base64.b64decode(d.get("content")))
else:
return Message(int(d.get("mtype")),d.get("header"), d.get("content"))
print("Invalid message dictionary!")
return None
#mtype Definitions:
MTYPE_Text = 0;
MTYPE_File = 1;
#Pickle Translator for Message to and from bytes.
class PickleTranslate:
def toString(self, m):
try:
return pickle.dumps(m)
except:
#print(traceback.format_exc())
return None
def fromString(self, b):
try:
return pickle.loads(b)
except:
#print(traceback.format_exc())
return None
#JSON Translator for Message to and from bytes.
class JSONTranslate:
def toString(self, m):
try:
return json.dumps(m.toDict())
except:
#print(traceback.format_exc())
return None
def fromString(self, b):
try:
return MessageFromDict(json.loads(b))
except:
#print(traceback.format_exc())
return None
#Connection class
class Connection:
active = True
sockets = dict()
threads = dict()
actives = dict()
def __init__(self, binder, translator, onconn, onrecv, onend):
if binder != None:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(binder)
self.socket.listen(8)
else:
self.socket = None
self.translator = translator
self.onconn = onconn
self.onrecv = onrecv
self.onend = onend
def listener(self):
if self.socket == None:
return
while self.active:
s, a = self.socket.accept()
ac = a[0] + ":" + str(a[1])
try:
self.onconn(str(ac))
except:
print("Failure to call onconn for: " + str(ac))
self.sockets[str(ac)] = s
self.actives[str(ac)] = True
self.threads[str(ac)] = Thread(target=self.processor, args=(str(ac),))
self.threads[str(ac)].start();
self.socket.close()
def processor(self, addr):
while self.active and self.actives[addr]:
try:
head = self.sockets[addr].recv(2)
if type(head) == None or len(head) != 2:
print("An issue reading the packet header has occured for: " + addr)
break
dataSize = 0
try:
dataSize = int(head[0]) * 256 + int(head[1])
except:
print("An issue reading the packet header has occured for: " + addr)
break
if dataSize < 1:
continue
data = bytearray()
while dataSize > 0:
dataL = self.sockets[addr].recv(dataSize)
dataSize -= len(dataL)
data.extend(dataL)
try:
self.onrecv(addr, self.translator.fromString(data))
except:
print("Failure to call onrecv for: " + addr)
except:
print("A network issue has occured for: " + addr)
self.actives[addr] = False
break
self.onend(addr)
def send(self, addr, m):
if self.active and self.actives[addr]:
try:
toSendData = self.translator.toString(m)
toSend = []
if type(toSendData) == bytes or type(toSendData) == bytearray:
toSend = bytes(self.translator.toString(m))
else:
toSend = bytes(self.translator.toString(m), encoding='utf-8')
sendLength = len(toSend)
if sendLength < 1:
print("Message for: " + addr + " empty?")
return
if sendLength > 65535:
print("Message for: " + addr + " too big!")
return
head = bytearray(2)
head[0] = sendLength//256
sendLength -= int(head[0])*256
head[1] = sendLength
sendLength = -2
try:
sendLength += self.sockets[addr].send(bytes(head))
sendLength += self.sockets[addr].send(toSend)
except:
try:
if sendLength >= 0:
self.sockets[addr].send(bytes(len(toSend) - sendLength))
except:
print("A network issue has occured for: " + addr)
self.actives[addr] = False
except:
print("A send failure has occured for: " + addr)
#print(traceback.format_exc())
def close(self):
self.active = False
for x in self.actives: self.actives[x] = False
for x in self.sockets: self.sockets[x].close()
ndone = True
while ndone:
ndone = False
for x in self.threads:
if self.threads[x].is_alive():
ndone = True
break
time.sleep(0.0001)
self.threads.clear()
if self.socket != None: self.socket.close()
def addresses(self):
if self.active:
tL = []
for x in self.actives:
if self.actives[x]:
tL.append(x)
return tL
else:
return []
def connect(self, target):
ac = target[0]+":"+str(target[1])
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(target)
try:
self.onconn(str(ac))
except:
print("Failure to call onconn for: " + str(ac))
self.sockets[str(ac)] = s
self.actives[str(ac)] = True
self.threads[str(ac)] = Thread(target=self.processor, args=(str(ac),))
self.threads[str(ac)].start()
except:
print("A connection error has occured for: " + ac)