python中的简单P2P服务器



对等网络或简称为P2P网络是指所有用户均平等且拥有平等权利的网络这种网络与传统网络的一个显着特征是,它们没有用户连接的单个服务器,而是彼此连接这种网络有多种混合形式,其中有一个服务器仅执行协调工作。



今天,我想为python中的这种网络提供P2P服务器的简单实现。



背景



1- . . , python, . .



. , 2 , . , P2P .





.



import socket
import rsa
from threading import Thread
from time import sleep
import datetime


# P2P 
class P2P:
    def __init__(self, _port: int, _max_clients: int = 1):
        #   
        self.running = True
        #  
        self.port = _port
        #  - 
        self.max_clients = _max_clients
        #  
        self.clients_ip = ["" for i in range(self.max_clients)]
        #    
        self.incoming_requests = {}
        #  
        self.clients_logs = [Log for i in range(self.max_clients)]
        #  
        self.client_sockets = [socket.socket() for i in range(self.max_clients)]
        #  
        for i in self.client_sockets:
            i.settimeout(0.2)
        #     
        self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
        #     
        self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
        #   
        self.socket_busy = [False for i in range(self.max_clients)]
        #  
        self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
        #  
        self.server_socket = socket.socket()
        #  
        self.server_socket.settimeout(0.2)
        #  
        self.server_socket.bind(('localhost', _port))
        self.server_socket.listen(self.max_clients)
        self.log = Log("server.log")
        self.log.save_data("Server initialized")

    # server control

    #     
    def create_session(self, _address: str):
        self.log.save_data("Creating session with {}".format(_address))
        ind = self.__get_free_socket()
        if _address in self.blacklist:
            self.log.save_data("{} in blacklist".format(_address))
            return
        if ind is None:
            self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
            return
        try:
            self.__add_user(_address)
            thread = Thread(target=self.__connect, args=(_address, 1))
            thread.start()
            thread.join(0)
            connection, address = self.server_socket.accept()
            connection.settimeout(0.2)
        except OSError:
            self.log.save_data("Failed to create session with {}".format(_address))
            self.__del_user(_address)
            return
        my_key = rsa.newkeys(512)
        self.raw_send(_address, my_key[0].save_pkcs1())
        key = connection.recv(162).decode()
        self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
        key = rsa.PublicKey.load_pkcs1(key)
        self.__add_keys(_address, key, my_key[1])
        while self.running and self.socket_busy[ind]:
            try:
                data = connection.recv(2048)
            except socket.timeout:
                continue
            except OSError:
                self.close_connection(_address)
                return
            if data:
                data = rsa.decrypt(data, self.my_keys[ind])
                self.__add_request(_address, data)
        try:
            self.close_connection(_address)
        except TypeError or KeyError:
            pass

    #   
    def __connect(self, _address: str, *args):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].connect((_address, self.port))
            self.socket_busy[ind] = True
            return True
        except OSError:
            return False

    #  
    def __reload_socket(self, _ind: int):
        self.client_sockets[_ind].close()
        self.client_sockets[_ind] = socket.socket()
        self.socket_busy[_ind] = False

    #  
    def close_connection(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.__del_key(_address)
        self.__reload_socket(ind)
        self.__del_user(_address)

    #  
    def kill_server(self):
        self.running = False
        sleep(1)
        self.server_socket.close()
        self.log.kill_log()
        for i in self.client_sockets:
            i.close()
        for i in self.clients_logs:
            try:
                i.kill_log()
            except TypeError:
                pass

    #    
    def send(self, _address: str, _message: str):
        ind = self.__get_ind_by_address(_address)
        try:
            self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
            self.client_sockets[ind].send(rsa.encrypt(_message.encode(), self.keys[ind]))
            self.log.save_data("Send message to {}".format(_address))
        except OSError:
            self.log.save_data("Can`t send message to {}".format(_address))

    #    
    def raw_send(self, _address: str, _message: bytes):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].send(_message)
            self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
            self.log.save_data("Raw send message to {}".format(_address))
        except OSError:
            self.log.save_data("Raw send to {} Failed".format(_address))
    # add

    #  
    def __add_user(self, _address: str):
        ind = self.__get_free_socket()
        self.clients_logs[ind] = Log("{}.log".format(_address))
        self.clients_ip[ind] = _address
        self.incoming_requests[_address] = []
        self.log.save_data("Added user {}".format(_address))

    #       
    def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
        ind = self.__get_ind_by_address(_address)
        try:
            self.keys[ind] = _key
            self.my_keys[ind] = _my_key
        except TypeError:
            return

    #     
    def __add_request(self, _address: str, _message: bytes):
        self.incoming_requests[_address].append(_message.decode())
        self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
        self.log.save_data("Get incoming message from {}".format(_address))

    # get

    #     
    # if self.__get_free_socket() is not None: *
    def __get_free_socket(self):
        for i in range(len(self.socket_busy)):
            if not self.socket_busy[i]:
                return i
        return None

    #   ,    
    def __get_ind_by_address(self, _address: str):
        for i in range(len(self.clients_ip)):
            if self.clients_ip[i] == _address:
                return i
        else:
            return None

    #     
    def get_request(self, _address: str):
        data = self.incoming_requests[_address][0]
        self.incoming_requests[_address] = [self.incoming_requests[_address][i]
                                            for i in range(1, len(self.incoming_requests[_address]))]
        return data

    # check

    #      
    # if self.check_request(_address): *
    def check_request(self, _address: str):
        return bool(self.incoming_requests.get(_address))

    # return True if you already connected to _address else False
    def check_address(self, _address: str):
        return True if _address in self.clients_ip else False

    # del

    #  
    def __del_user(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.clients_logs[ind].kill_log()
        self.clients_logs[ind] = Log
        self.clients_ip[ind] = ""
        self.incoming_requests.pop(_address)
        self.log.save_data("Deleted user {}".format(_address))

    #  
    def __del_key(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.keys[ind] = rsa.key.PublicKey
        self.my_keys[ind] = rsa.key.PrivateKey

    # others

    #    
    def __len__(self):
        num = 0
        for i in self.clients_ip:
            if i != "":
                num += 1
        return num

    #        
    def __bool__(self):
        for i in self.clients_ip:
            if i != "":
                return True
        return False


class Log:
    def __init__(self, _name: str):
        self.name = _name
        try:
            self.file = open(_name, "a")
        except FileNotFoundError:
            self.file = open(_name, "w")
        self.save_data("Log started at " + str(datetime.datetime.now()))
        self.file.close()

    #    
    def save_data(self, _data: str):
        self.file = open(self.name, "a")
        self.file.write("{}\n".format(_data))
        self.file.close()

    #       
    @staticmethod
    def read_and_return_list(_name: str):
        try:
            file = open(_name, "r")
        except FileNotFoundError:
            return []
        data = file.read()
        return data.split("\n")

    #  
    def kill_log(self):
        self.file = open(self.name, "a")
        self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
        self.file.close()


. , :



  • add
  • del
  • check
  • get
  • server control




init
    def __init__(self, _port: int, _max_clients: int = 1):
        #   
        self.running = True
        #  
        self.port = _port
        #  - 
        self.max_clients = _max_clients
        #  
        self.clients_ip = ["" for i in range(self.max_clients)]
        #    
        self.incoming_requests = {}
        #  
        self.clients_logs = [Log for i in range(self.max_clients)]
        #  
        self.client_sockets = [socket.socket() for i in range(self.max_clients)]
        #  
        for i in self.client_sockets:
            i.settimeout(0.2)
        #     
        self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
        #     
        self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
        #   
        self.socket_busy = [False for i in range(self.max_clients)]
        #  
        self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
        #  
        self.server_socket = socket.socket()
        #  
        self.server_socket.settimeout(0.2)
        #  
        self.server_socket.bind(('localhost', _port))
        self.server_socket.listen(self.max_clients)
        self.log = Log("server.log")
        self.log.save_data("Server initialized")


, - , 1. :



  • -


:



  • Ip


, "127.0.0.1" " " ( localhost ), , . - listen().



add



. .



add_user , .



add_user
    def __add_user(self, _address: str):
        ind = self.__get_free_socket()
        self.clients_logs[ind] = Log("{}.log".format(_address))
        self.clients_ip[ind] = _address
        self.incoming_requests[_address] = []
        self.log.save_data("Added user {}".format(_address))


add_keys .



add_keys
    def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
        ind = self.__get_ind_by_address(_address)
        try:
            self.keys[ind] = _key
            self.my_keys[ind] = _my_key
        except TypeError:
            return


add_request .



add_request
    def __add_request(self, _address: str, _message: bytes):
        self.incoming_requests[_address].append(_message.decode())
        self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
        self.log.save_data("Get incoming message from {}".format(_address))


del



. , , .



del_user add_user. , , .



del_user
    def __del_user(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.clients_logs[ind].kill_log()
        self.clients_logs[ind] = Log
        self.clients_ip[ind] = ""
        self.incoming_requests.pop(_address)
        self.log.save_data("Deleted user {}".format(_address))


del_key .



del_key
    def __del_key(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.keys[ind] = rsa.key.PublicKey
        self.my_keys[ind] = rsa.key.PrivateKey


check



.



check_request True False .



check_request
    def check_request(self, _address: str):
        return bool(self.incoming_requests.get(_address))


check_address True, False, .



check_address
    def check_address(self, _address: str):
        return True if _address in self.clients_ip else False


get



get_free_socket , , .



get_free_socket
    def __get_free_socket(self):
        for i in range(len(self.socket_busy)):
            if not self.socket_busy[i]:
                return i
        return None


get_ind_by_address , , , .



get_ind_by_address
    def __get_ind_by_address(self, _address: str):
        for i in range(len(self.clients_ip)):
            if self.clients_ip[i] == _address:
                return i
        else:
            return None


get_request . , .



get_request
    def get_request(self, _address: str):
        data = self.incoming_requests[_address][0]
        self.incoming_requests[_address] = [self.incoming_requests[_address][i]
                                            for i in range(1, len(self.incoming_requests[_address]))]
        return data


server control



, .



— create_session — . , , , .



create_session
    def create_session(self, _address: str):
        self.log.save_data("Creating session with {}".format(_address))
        ind = self.__get_free_socket()
        if _address in self.blacklist:
            self.log.save_data("{} in blacklist".format(_address))
            return
        if ind is None:
            self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
            return
        try:
            self.__add_user(_address)
            thread = Thread(target=self.__connect, args=(_address, 1))
            thread.start()
            thread.join(0)
            connection, address = self.server_socket.accept()
            connection.settimeout(0.2)
        except OSError:
            self.log.save_data("Failed to create session with {}".format(_address))
            self.__del_user(_address)
            return
        my_key = rsa.newkeys(512)
        self.raw_send(_address, my_key[0].save_pkcs1())
        key = connection.recv(162).decode()
        self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
        key = rsa.PublicKey.load_pkcs1(key)
        self.__add_keys(_address, key, my_key[1])
        while self.running and self.socket_busy[ind]:
            try:
                data = connection.recv(2048)
            except socket.timeout:
                continue
            except OSError:
                self.close_connection(_address)
                return
            if data:
                data = rsa.decrypt(data, self.my_keys[ind])
                self.__add_request(_address, data)
        try:
            self.close_connection(_address)
        except TypeError or KeyError:
            pass


connect True False . .



connect
    def __connect(self, _address: str, *args):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].connect((_address, self.port))
            self.socket_busy[ind] = True
            return True
        except OSError:
            return False


close_connection .



close_connection
    def close_connection(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.__del_key(_address)
        self.__reload_socket(ind)
        self.__del_user(_address)


kill_server .



kill_server
    def kill_server(self):
        self.running = False
        sleep(1)
        self.server_socket.close()
        self.log.kill_log()
        for i in self.client_sockets:
            i.close()
        for i in self.clients_logs:
            try:
                i.kill_log()
            except TypeError:
                pass


reload_socket, , .



reload_socket
    def __reload_socket(self, _ind: int):
        self.client_sockets[_ind].close()
        self.client_sockets[_ind] = socket.socket()
        self.socket_busy[_ind] = False




bool True, - , False, .



bool
    def __bool__(self):
        for i in self.clients_ip:
            if i != "":
                return True
        return False


len .



len
    def __len__(self):
        num = 0
        for i in self.clients_ip:
            if i != "":
                num += 1
        return num




, . , . , .



, .



. , , . .



init
    def __init__(self, _name: str):
        self.name = _name
        try:
            self.file = open(_name, "a")
        except FileNotFoundError:
            self.file = open(_name, "w")
        self.save_data("Log started at " + str(datetime.datetime.now()))
        self.file.close()


save_data .



save_data
    def save_data(self, _data: str):
        self.file = open(self.name, "a")
        self.file.write("{}\n".format(_data))
        self.file.close()


静态函数read_and_return_list不需要使用该类的对象,但是对于其操作,它需要从中获取所有信息并以工作表形式返回的文件名。



阅读并返回清单
    @staticmethod
    def read_and_return_list(_name: str):
        try:
            file = open(_name, "r")
        except FileNotFoundError:
            return []
        data = file.read()
        return data.split("\n")


最后一个函数kill_log将日志停止时间写入文件并关闭文件。



kill_log
    def kill_log(self):
        self.file = open(self.name, "a")
        self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
        self.file.close()


结论



编写对等服务器并不困难,但是另一方面,还有移动的空间。您可以实现发送文件和图像,一次从多个用户中分批下载它们。您可以改善日志或添加加密以发送密钥进行加密和解密。



如果对此问题有任何建议或改进代码的方法,我将很高兴阅读评论。




All Articles