added docstrings and comments

This commit is contained in:
Nicholas Dyer 2022-12-21 13:00:27 -05:00
parent dcd1d9c6b0
commit a5c47f72ef
No known key found for this signature in database
GPG Key ID: E4E6388793FA2105
2 changed files with 71 additions and 23 deletions

View File

@ -31,9 +31,10 @@ class InterfaceInfo:
def request_server_info(server_ip, server_port): def request_server_info(server_ip, server_port):
""" """
Request server info from a PyPong server given an IP and a port Request server info from a PyPong server given an IP and a port
:param server_ip: Server IP to request info from :param server_ip: Server IP to request info from
:param server_port: Port to connect to server on :param server_port: Port to connect to server on
:return response given by server (or lack thereof): :return: response given by server (or lack thereof)
""" """
request = 'PYPONGREQ;SVRINFO' request = 'PYPONGREQ;SVRINFO'
@ -55,37 +56,63 @@ def request_server_info(server_ip, server_port):
def get_interface_info(): def get_interface_info():
"""
Gets information using psutil about the current machine's interfaces and their networks. This only works for IPv4
networks and will not return any data in regard to IPv6 links.
:return: List of InterfaceInfo objects containing information about all interfaces on the current machine excluding
the loopback interface (lo or 127.0.0.1)
"""
raw_interface_data = psutil.net_if_addrs() raw_interface_data = psutil.net_if_addrs()
interface_data = [] interface_data = []
# Extract needed information from psutil and put it into a InterfaceInfo object
for interface in raw_interface_data: for interface in raw_interface_data:
# Extract needed information from psutil and put it into a InterfaceInfo object
address = (raw_interface_data.get(interface)[0]).address address = (raw_interface_data.get(interface)[0]).address
cidr_suffix = ipaddress.IPv4Network('{0}/{1}'.format(address, raw_interface_data.get(interface)[0].netmask), cidr_suffix = ipaddress.IPv4Network('{0}/{1}'.format(address, raw_interface_data.get(interface)[0].netmask),
strict=False).prefixlen strict=False).prefixlen
# Do not add the loopback interface or IPv6 interfaces
if ':' not in address and address != '127.0.0.1': if ':' not in address and address != '127.0.0.1':
interface_data.append(InterfaceInfo(interface, '{0}/{1}'.format(address, cidr_suffix))) interface_data.append(InterfaceInfo(interface, '{0}/{1}'.format(address, cidr_suffix)))
return interface_data return interface_data
def get_ips_from_network(network): def get_ips_from_network(network):
"""
Generate a list of IP addresses given an address in CIDR notation. This works from smallest to largest, meaning that
it builds a list starting at <ADDRESS>/32 (aka just the current machine) decrementing the CIDR by 1 until the
correct subnet is reached.
:param network:
:return:
"""
network_split = network.split('/') network_split = network.split('/')
address = network_split[0] address = network_split[0]
cidr = int(network_split[1]) cidr = int(network_split[1])
all_addresses = [] all_addresses = []
test_cidr = 32 test_cidr = 32 # Start with the machine itself
while test_cidr >= cidr: while test_cidr >= cidr:
# Generate all addresses inside the current subnet given by 'address/test_cidr'
current_addresses = [str(ip) for ip in ipaddress.IPv4Network('{0}/{1}'.format(address, test_cidr), current_addresses = [str(ip) for ip in ipaddress.IPv4Network('{0}/{1}'.format(address, test_cidr),
strict=False)] strict=False)]
# Do not add the address again if it is already in the list of all addresses
for test_address in current_addresses: for test_address in current_addresses:
if test_address not in all_addresses: if test_address not in all_addresses:
all_addresses.append(test_address) all_addresses.append(test_address)
test_cidr -= 1 test_cidr -= 1
print(test_cidr)
return all_addresses return all_addresses
def check_ip(ip_address): def check_ip(ip_address):
"""
Check a single IP address for a running game server
:param ip_address: IP address to check
:return: response of the server (or timeout if none)
"""
response = request_server_info(ip_address, SERVER_PORT) response = request_server_info(ip_address, SERVER_PORT)
return response return response

View File

@ -6,14 +6,16 @@ import concurrent.futures
import pypong.networking.client as client import pypong.networking.client as client
global running global running
global listbox
def main(): def main():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=512)
""" """
Runs the game launcher. Runs the game launcher.
""" """
# Executor used for scanning local network for servers
executor = concurrent.futures.ThreadPoolExecutor(max_workers=512)
main_window = tk.Tk() main_window = tk.Tk()
main_window.title('PyPong Launcher') main_window.title('PyPong Launcher')
main_window.resizable(width=False, height=False) main_window.resizable(width=False, height=False)
@ -26,32 +28,39 @@ def main():
""" """
global running global running
running = False running = False
# Stop executor from creating new scanning tasks
executor.shutdown(wait=False, cancel_futures=True) executor.shutdown(wait=False, cancel_futures=True)
def check_game(*game_ip): def check_game(game_ip):
game_ip = ''.join(game_ip) """
Checks to see if there is a game bein ghosted at a given IP
:param game_ip: IP address to check for a host
:return: Response from the server (or timeout if nothing found)
"""
result = client.check_ip(game_ip) result = client.check_ip(game_ip)
return result return result
def get_games(network): def get_games(network):
""" """
Refresh the games list. Refreshes the game list.
:param network: The network to scan for games on
""" """
results = []
listbox.delete(0, tk.END) listbox.delete(0, tk.END)
listbox.insert(0, ' Building IP range...') listbox.insert(0, ' Building IP range...')
main_window.update() main_window.update()
possible_ips = client.get_ips_from_network(network.get().split(' ')[0]) possible_ips = client.get_ips_from_network(network.get().split(' ')[0])
listbox.delete(0, tk.END) listbox.delete(0, tk.END)
listbox.insert(0, 'this might take some time...') listbox.insert(0, 'this might take some time...')
listbox.insert(0, 'Refreshing server listing,') listbox.insert(0, 'Refreshing server listing,')
main_window.update() main_window.update()
# possible_ips = ['192.168.12.19']
futures = []
futures = []
# Use the executor to create threads to search for games
for ip in possible_ips: for ip in possible_ips:
if ip == '192.168.12.19':
print(ip)
future = executor.submit(check_game, ip) future = executor.submit(check_game, ip)
futures.append(future) futures.append(future)
@ -59,30 +68,42 @@ def main():
for future in futures: for future in futures:
result = future.result() result = future.result()
main_window.update()
if 'PYPONGRST;SVRINFO' in result: if 'PYPONGRST;SVRINFO' in result: # Check to make sure response has the right header and type
print(result) raw_response = result.split(';') # Split response by block (denoted by semicolons)
raw_response = result.split(';')
response = [] response = []
# Split up each block by chunks (<COMMAND>:<RESPONSE>)
# e.g. the block 'NAME:Nicholas Dyer' -> ['NAME', 'Nicholas Dyer']
for block in raw_response: for block in raw_response:
print(block)
chunks = block.split(':') chunks = block.split(':')
for chunk in chunks: for chunk in chunks:
response.append(chunk) response.append(chunk)
print(response)
# Make sure the response has the NAME command
if 'NAME' in response: if 'NAME' in response:
# Session name is the next index after the command
session_name = response[response.index('NAME') + 1] session_name = response[response.index('NAME') + 1]
if not game_found: if not game_found: # If this is the first game found, clear out the list
listbox.delete(0, tk.END) listbox.delete(0, tk.END)
listbox.insert(0, session_name)
game_found = True game_found = True
main_window.update() listbox.insert(0, session_name)
main_window.update() # Update screen while finding games (laggy, but works)
if not game_found: if not game_found:
listbox.delete(0, tk.END) listbox.delete(0, tk.END)
listbox.insert(0, 'No games found!') listbox.insert(0, 'No games found!')
def get_addresses(): def get_addresses():
"""
Get the current machine's local interfaces and their corresponding addresses in CIDR notation
:return: Address in CIDR notation along with the interface
Example:
['172.20.126.4/24 [eth0]', 192.168.2.43/16 [wlan0]']
"""
interface_info = client.get_interface_info() interface_info = client.get_interface_info()
shortened_info = [] shortened_info = []