import argparse import re import json from dataclasses import dataclass, field import paramiko from paramiko.ssh_exception import AuthenticationException from paramiko import SSHClient from scp import SCPClient from os import get_terminal_size from colored import fg, bg, attr # import colors """ Author: Oriol Filter Date 06/03/2021 """ version = '0.9' @dataclass class RGB: r: int = 0 g: int = 0 b: int = 0 def sent_message(filename: str, sent: bool) -> None: print(f'\t[{filename}] ==> [{(fg(10), fg(9))[sent]}{("Sent", "ERROR")[sent]}{attr("reset")}]') class LOADING: pass def sending_file_progress(filename: bytes, size: int, sent: int) -> None: def return_diff_color(c1: RGB, c2: RGB, percent: int) -> RGB: def return_diff(n1, n2, percent=100) -> int: if n1 > n2: return n1 - int((n1 - n2) * (percent / 100)) elif n1 < n2: return n1 + int((n2 - n1) * (percent / 100)) return n1 new_rgb = RGB(r=return_diff(c1.r, c2.r, percent), g=return_diff(c1.g, c2.g, percent), b=return_diff(c1.b, c2.b, percent)) return new_rgb def color_to_hex(color: RGB) -> str: def return_hex_number(n: int): hnum = hex(int(n)) return f'{str(hnum).replace("0x", "").zfill(2)}' r: str = return_hex_number(color.r) g: str = return_hex_number(color.g) b: str = return_hex_number(color.b) return f"{r}{g}{b}" base_color= RGB(r=68, g=121, b=84) end_color= RGB(r=0, g=255, b=68) loading_chars = "|/-\\" _filename = filename.decode() screen_size = get_terminal_size() available_colums = int(screen_size.columns / 100 * 50) percent = int(float(sent) / float(size) * 100) _n = percent % len(loading_chars) space_filling = " " * int((available_colums / 100) * (100 - percent)) load_bar = "=" * int((available_colums / 100) * percent) + '=>' _color = f'#{color_to_hex(return_diff_color(base_color,end_color,percent))}' print( f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]', end='\r') def return_list(txt: str) -> [str]: _list = [] if type(txt) in (tuple, list): _list = txt elif type(txt) in (str, int): _list = [str(txt)] return _list @dataclass class CONFIG: files_to_send: str or [str] host_lists: str or [str] credentials_files: str or [str] _files_to_send: str or [str] = field(default=None, init=False) _host_lists: str or [str] = field(default="./hostlist", init=False) _credentials_files: str or [str] = field(default="./users.json", init=False) def __post_init__(self): self.files_to_send = self._files_to_send or CONFIG._files_to_send self.host_lists = self._host_lists or CONFIG._host_lists self.credentials_files = self._credentials_files or CONFIG._credentials_files @property def files_to_send(self) -> str or [str]: return self._files_to_send @files_to_send.setter def files_to_send(self, files): self._files_to_send = return_list(files) @property def host_lists(self) -> str or [str]: return self._host_lists @host_lists.setter def host_lists(self, files): self._host_lists = return_list(files) @property def credentials_files(self) -> str or [str]: return self._credentials_files @credentials_files.setter def credentials_files(self, files): self._credentials_files = return_list(files) @dataclass class HOST: hostname: str client: paramiko.SSHClient or bool = False files_sent: dict = False def __post_init__(self): if not self.files_sent: self.files_sent = {} @property def username(self) -> str or None: if hasattr(self, "client") and self.client: return self.client.get_transport().get_username() def return_connection(hostname, user: str, password: str = "") -> SSHClient or bool: try: client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname, username=user, password=password, timeout=2) return client except (AuthenticationException, TimeoutError) as e: pass return False class MANAGER: host_list: [HOST] user_dict: {} config: CONFIG def __init__(self, config: CONFIG = None): self.update_config(config) def update_config(self, config: CONFIG = None): self.host_list = [] self.user_dict = {} if config and type(config) is CONFIG: self.config = config else: self.config = CONFIG(**{}) self.load_hosts() self.load_users() @property def _files_to_send(self) -> [str]: return self.config.files_to_send @property def _hostlist_files(self) -> [str]: return self.config.host_lists @property def _credentials_files(self) -> [str]: return self.config.credentials_files def load_config(self): self.load_hosts() self.load_users() def load_hosts(self): host_list: MANAGER.host_list = [] for file in self._hostlist_files: with open(file or "hostlist") as f: lines = f.read().splitlines() split_chars = ";,-" regex_rule = "|".join(split_chars) for line in lines: result = re.split(regex_rule, line) if result: for entry in result: host_list.append(HOST(hostname=entry)) self.host_list = host_list def load_users(self): user_dict = {} for file in self._credentials_files: with open(file or "users.json") as f: data = json.load(f) for user in data: user_dict[user] = data[user] self.user_dict = user_dict def print_connections(self): print("\nPrinting connections:") print("\tHOSTNAME\tUSER") for host in self.host_list: print(f"\t{host.hostname}\t{host.username or 'Ns/Nc'}") def test_connections(self, _print=True): print("Attempting to login:") for host in self.host_list: host.client = False print(f"{host.hostname}:", end="\t") for user in self.user_dict: connection = return_connection(hostname=host.hostname, user=user, password=self.user_dict[user]) if connection: host.client = connection print("YES") if not host.client: print("NO") if _print: self.print_connections() @property def _accesible_hosts(self) -> [HOST]: list = [host for host in self.host_list if host.username] return list @property def _not_accesible_hosts(self) -> [HOST]: list = [host for host in self.host_list if not host.username] return list def _send_file(self, scp_session: SCPClient, file: str, remote_path: str = "/tmp") -> bool: try: scp_session.put(file, remote_path=remote_path) print() return True except Exception as e: print() print(e) return False def send_files(self, load_hosts=False): if load_hosts: self.load_hosts() accessible_hosts = self._accesible_hosts if len(accessible_hosts) < 1 and load_hosts: self.send_files(load_hosts=True) elif len(accessible_hosts) < 1 and not load_hosts: print("No accessible hosts available") for host in accessible_hosts: print(f"{host.hostname}:") if not self._files_to_send or len(self._files_to_send) < 1: print("No files to send") else: for file in self._files_to_send: host.files_sent: dict if not host.files_sent.get(file): host.files_sent[file] = False scp_session = SCPClient(host.client.get_transport(), progress=sending_file_progress) x = self._send_file(scp_session=scp_session, file=file) print(x) print( f'\t{file} -------------> {["No", "Yes"][x]}') host.files_sent[file] = True class MENU: pass def print_help(): menu_dict = { "0": "Exits the script", "1": "Attempts to login using the loaded credentials", "2": "Attemps to send the files", "-1": "Prints help", "-2": "Reload files" } for key in menu_dict: print(f'{key}:\n\t{menu_dict[key]}') pass def menu(manager: MANAGER): while True: menu_dict = { "0": exit, "1": manager.test_connections, "2": manager.send_files, "-1": print_help, "-2": manager.update_config } msg = "\nSelect 1 of the following options:\n\ 1) Test connections\n\ 2) Send files\n\ -1) Help\n\ -2) Reload files\n\ 0) Exit" print(msg) _input = input() or "" if _input in menu_dict: option = menu_dict[_input] option() else: pass def main(): # Get ARGV parser = argparse.ArgumentParser( description=f'Python script for testing SSH connections and sending files (does not validate given routes) v{version}') parser.add_argument('-v', '--version', action='store_true', help='Prints the version') parser.add_argument('-hl', '--hosts_lists', action='append', help='File that contains a list of hosts (multiple usage allowed)') parser.add_argument('-f', '--file', action='append', help='File to send through SSH (multiple usage allowed)') parser.add_argument('-c', '--credentials_file', action='append', help='Json file that contains a list of users to use and their password ie: ' \ '{"u1":"p1","u2":"p2"}') parser.add_argument('-thi', '--threading_instances', action='store', help='Number of threading instances (NOT ' \ 'ENABLED)') args = parser.parse_args() if args.version: print(f'Connection tester: v{version}') else: args_dic = {"files_to_send": args.file, "host_lists": args.hosts_lists, "credentials_files": args.credentials_file} conf = CONFIG(**args_dic) manager = MANAGER(config=conf) menu(manager=manager) if __name__ == '__main__': main()