""" Networking client to connect to a server and find server hosts """ import ipaddress import random import psutil import socket SERVER_PORT = 29987 SERVER_BUFFER = 1024 SCAN_TIMEOUT = 3 class InterfaceInfo: """ Class used for storing interface information to make it easier to use """ def __init__(self, interface_name, network): self.interface_name = interface_name self.network = network def __str__(self): return 'Interface:{0}, Network:{1}'.format(self.interface_name, self.network) def __repr__(self): return '(Interface:{0}, Network:{1})'.format(self.interface_name, self.network) def request_server_info(server_ip, server_port): """ Request server info from a PyPong server given an IP and a port :param server_ip: Server IP to request info from :param server_port: Port to connect to server on :return: response given by server (or lack thereof) """ request = 'PYPONGREQ;SVRINFO' udp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Timeout is set to SCAN_TIMEOUT plus a random number between 0 and SCAN_TIMEOUT so that threads can # start asynchronously to avoid a lot of requests being sent to machines at once # Average time is SCAN_TIMEOUT*1.5 udp_client_socket.settimeout(SCAN_TIMEOUT + (SCAN_TIMEOUT * random.random())) try: udp_client_socket.sendto(str.encode(request, 'UTF-8'), (server_ip, server_port)) except OSError: pass try: server_response = udp_client_socket.recvfrom(SERVER_BUFFER)[0].decode('UTF-8') except TimeoutError: server_response = '{0};TIMEOUT'.format(server_ip) except UnicodeDecodeError: server_response = '{0};MANGLED_RESPONSE'.format(server_ip) udp_client_socket.close() return server_response def get_interface_info(): """ Gets information using psutil about the current machine's interfaces and their networks. This only works for IPv4 networks and will not return any data in regard to IPv6 links. :return: List of InterfaceInfo objects containing information about all interfaces on the current machine excluding the loopback interface (lo or 127.0.0.1) """ raw_interface_data = psutil.net_if_addrs() interface_data = [] for interface in raw_interface_data: # Extract needed information from psutil and put it into a InterfaceInfo object address = (raw_interface_data.get(interface)[0]).address cidr_suffix = ipaddress.IPv4Network('{0}/{1}'.format(address, raw_interface_data.get(interface)[0].netmask), strict=False).prefixlen # Do not add the loopback interface or IPv6 interfaces if ':' not in address and address != '127.0.0.1': interface_data.append(InterfaceInfo(interface, '{0}/{1}'.format(address, cidr_suffix))) return interface_data def get_ips_from_network(network): """ Generate a list of IP addresses given an address in CIDR notation. This works from smallest to largest, meaning that it builds a list starting at
/32 (aka just the current machine) decrementing the CIDR by 1 until the correct subnet is reached. :param network: :return: """ network_split = network.split('/') address = network_split[0] cidr = int(network_split[1]) all_addresses = [] test_cidr = 32 # Start with the machine itself while test_cidr >= cidr: # Generate all addresses inside the current subnet given by 'address/test_cidr' current_addresses = [str(ip) for ip in ipaddress.IPv4Network('{0}/{1}'.format(address, test_cidr), strict=False)] # Do not add the address again if it is already in the list of all addresses for test_address in current_addresses: if test_address not in all_addresses: all_addresses.append(test_address) test_cidr -= 1 return all_addresses def check_ip(ip_address): """ Check a single IP address for a running game server :param ip_address: IP address to check :return: response of the server (or timeout if none) """ response = request_server_info(ip_address, SERVER_PORT) return response