From 6c268b36082549da8ce6241e0a227b310ee2ac17 Mon Sep 17 00:00:00 2001 From: Oriol Filter Anson Date: Sat, 5 Mar 2022 23:49:35 +0100 Subject: [PATCH 1/2] upload --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 38c94b4..8cecac0 100644 --- a/README.md +++ b/README.md @@ -25,4 +25,7 @@ Json example "user1":"passwd", "user2":"passwd" } -``` \ No newline at end of file +``` + + +... \ No newline at end of file From 857ea8b36851c57c4658792f7006c75e9ad53d55 Mon Sep 17 00:00:00 2001 From: ofilter Date: Sun, 6 Mar 2022 22:39:11 +0100 Subject: [PATCH 2/2] Rename host files to hostlist --- main.py | 373 +++++++++++++++++++++++++++++++++++------------ main_factory.py | 331 ----------------------------------------- requirements.txt | Bin 254 -> 286 bytes t4.py | 6 +- 4 files changed, 284 insertions(+), 426 deletions(-) delete mode 100644 main_factory.py diff --git a/main.py b/main.py index 9ee3d54..76b16c9 100644 --- a/main.py +++ b/main.py @@ -1,129 +1,322 @@ -import paramiko -from dataclasses import dataclass +import argparse +import re import json +from dataclasses import dataclass, field +import paramiko from paramiko.ssh_exception import AuthenticationException from paramiko import SSHClient from scp import SCPClient -from os import path +from os import get_terminal_size +from colored import fg, bg, attr + +# import colors +""" +Author: Oriol Filter +Date 06/03/2021 +""" + +version = '0.9' +def sent_message(filename: str, sent: bool) -> None: + print(f'\t[{filename}] ==> [{(fg(10), fg(9))[sent]}{("Sent", "ERROR")[sent]}{attr("reset")}]') + +class LOADING: + pass + + +def sending_file_progress(filename: bytes, size: int, sent: int) -> None: + # float(sent) / float(size) * 100 + green_scale = [28, 34, 40, 76, 82, 118] + loading_chars = "|/-\\" + _filename = filename.decode() + screen_size = get_terminal_size() + cols = int(screen_size.columns / 100 * 50) + + percent = int(float(sent) / float(size) * 100) + _n = percent % len(loading_chars) + + _color: int + _color_picker = int((percent / 10) * (len(green_scale) / 10)) + + space_filling = " " * int(100 - percent / (100 * cols)) + " " + load_bar = f'{"=" * int(percent / 100 * cols)}=>' + + if int(percent) >= 100: + _color = green_scale[len(green_scale) - 1] + else: + _color = green_scale[_color_picker] + print( + f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]', + end='\r') + + +def return_list(txt: str) -> [str]: + _list = [] + if type(txt) in (tuple, list): + _list = txt + elif type(txt) in (str, int): + _list = [str(txt)] + return _list + + +@dataclass class CONFIG: - scp_dest="/tmp" - files_folder = 'C:/Users/OriolFilterAnson/PycharmProjects/sshtest' - files_to_copy = ['t1'] + files_to_send: str or [str] + host_lists: str or [str] + credentials_files: str or [str] -@dataclass -class USER: - name: str - password: str - - # def __init__(self, name, password): - # self.name = name - # self.password = password - - -@dataclass -class VALIDATIONS: - loged_in: bool = False + _files_to_send: str or [str] = field(default=None, init=False) + _host_lists: str or [str] = field(default="./hostlist", init=False) + _credentials_files: str or [str] = field(default="./users.json", init=False) def __post_init__(self): - pass + self.files_to_send = self._files_to_send or CONFIG._files_to_send + self.host_lists = self._host_lists or CONFIG._host_lists + self.credentials_files = self._credentials_files or CONFIG._credentials_files + + @property + def files_to_send(self) -> str or [str]: + return self._files_to_send + + @files_to_send.setter + def files_to_send(self, files): + self._files_to_send = return_list(files) + + @property + def host_lists(self) -> str or [str]: + return self._host_lists + + @host_lists.setter + def host_lists(self, files): + self._host_lists = return_list(files) + + @property + def credentials_files(self) -> str or [str]: + return self._credentials_files + + @credentials_files.setter + def credentials_files(self, files): + self._credentials_files = return_list(files) @dataclass -class ITEM: +class HOST: hostname: str - client: paramiko.SSHClient - # login: USER - validations: VALIDATIONS = None + client: paramiko.SSHClient or bool = False + files_sent: dict = False - # def __init__(self, hostname, login): - # self.hostname = hostname - # self.login = login def __post_init__(self): - self.validations = VALIDATIONS() + if not self.files_sent: self.files_sent = {} + + @property + def username(self) -> str or None: + if hasattr(self, "client") and self.client: + return self.client.get_transport().get_username() -def attempt_to_login(hostname, user: USER) -> SSHClient or bool: +def return_connection(hostname, user: str, password: str = "") -> SSHClient or bool: try: client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - client.connect(hostname, username=user.name, password=user.password) - # ssh_stdin, ssh_stdout, ssh_stderr = client.exec_command("uptime") - # print(ssh_stdout.read().decode()) + client.connect(hostname, username=user, password=password, timeout=2) return client - except AuthenticationException as e: + except (AuthenticationException, TimeoutError) as e: pass - # ssh = paramiko.SSHClient() return False -config=CONFIG() +class MANAGER: + host_list: [HOST] + user_dict: {} + config: CONFIG -bs2 = USER(name='bs2cloud', password='xxxx') -ibmuser = USER(name='ibmuser', password='xxxx') -home = USER(name='testing', password='testing') -pi = USER(name='pifail', password='pifail') + def __init__(self, config: CONFIG = None): + self.update_config(config) -## found= user[] -result_dic = { - "found": {}, - "discards": [], -} + def update_config(self, config: CONFIG = None): + self.host_list = [] + self.user_dict = {} + if config and type(config) is CONFIG: + self.config = config + else: + self.config = CONFIG(**{}) + self.load_hosts() + self.load_users() -user_list: [USER] = [home, bs2, ibmuser, pi] -host_list = ['192.168.1.3', '192.168.1.2'] + @property + def _files_to_send(self) -> [str]: + return self.config.files_to_send + + @property + def _hostlist_files(self) -> [str]: + return self.config.host_lists + + @property + def _credentials_files(self) -> [str]: + return self.config.credentials_files + + def load_config(self): + self.load_hosts() + self.load_users() + + def load_hosts(self): + host_list: MANAGER.host_list = [] + for file in self._hostlist_files: + with open(file or "hostlist") as f: + lines = f.read().splitlines() + split_chars = ";,-" + regex_rule = "|".join(split_chars) + for line in lines: + result = re.split(regex_rule, line) + if result: + for entry in result: + host_list.append(HOST(hostname=entry)) + self.host_list = host_list + + def load_users(self): + user_dict = {} + for file in self._credentials_files: + with open(file or "users.json") as f: + data = json.load(f) + for user in data: + user_dict[user] = data[user] + self.user_dict = user_dict + + def print_connections(self): + print("\nPrinting connections:") + print("\tHOSTNAME\tUSER") + for host in self.host_list: + print(f"\t{host.hostname}\t{host.username or 'Ns/Nc'}") + + def test_connections(self, _print=True): + print("Attempting to login:") + for host in self.host_list: + host.client = False + print(f"{host.hostname}:", end="\t") + for user in self.user_dict: + connection = return_connection(hostname=host.hostname, user=user, password=self.user_dict[user]) + if connection: + host.client = connection + print("YES") + if not host.client: print("NO") + if _print: self.print_connections() + + @property + def _accesible_hosts(self) -> [HOST]: + list = [host for host in self.host_list if host.username] + return list + + @property + def _not_accesible_hosts(self) -> [HOST]: + list = [host for host in self.host_list if not host.username] + return list + + def _send_file(self, scp_session: SCPClient, file: str, remote_path: str = "/tmp") -> bool: + try: + scp_session.put(file, remote_path=remote_path) + print() + return True + except Exception as e: + print() + print(e) + return False + + def send_files(self, load_hosts=False): + if load_hosts: + self.load_hosts() + + accessible_hosts = self._accesible_hosts + + if len(accessible_hosts) < 1 and load_hosts: + self.send_files(load_hosts=True) + elif len(accessible_hosts) < 1 and not load_hosts: + print("No accessible hosts available") + for host in accessible_hosts: + print(f"{host.hostname}:") + if not self._files_to_send or len(self._files_to_send) < 1: + print("No files to send") + else: + for file in self._files_to_send: + host.files_sent: dict + if not host.files_sent.get(file): + host.files_sent[file] = False + scp_session = SCPClient(host.client.get_transport(), progress=sending_file_progress) + x = self._send_file(scp_session=scp_session, file=file) + print(x) + print( + f'\t{file} -------------> {["No", "Yes"][x]}') + host.files_sent[file] = True - -for user in user_list: - result_dic['found'][user.name] = [] - -for host in host_list: - for user in user_list: - result = attempt_to_login(hostname=host, user=user) - if result: - session = ITEM(hostname=host, client=result) - result_dic['found'][user.name].append(session) - host_list.remove(host) - session.validations.loged_in = True - -print(result_dic['found']) - -result_dic['discards'] = set(host_list) - -print('discards') -print(result_dic['discards']) - -print('SCP') - -for user in user_list: - for session in result_dic['found'][user.name]: - session: ITEM - - scp = SCPClient(session.client.get_transport()) - for file in config.files_to_copy: - print(f"\t{file}") - scp.put(file, remote_path='/tmp') - - -def test_connections(): +class MENU: pass -def menu(): + +def print_help(): + menu_dict = { + "0": "Exits the script", + "1": "Attempts to login using the loaded credentials", + "2": "Attemps to send the files", + "-1": "Prints help", + "-2": "Reload files" + } + for key in menu_dict: + print(f'{key}:\n\t{menu_dict[key]}') pass + +def menu(manager: MANAGER): + while True: + menu_dict = { + "0": exit, + "1": manager.test_connections, + "2": manager.send_files, + "-1": print_help, + "-2": manager.update_config + } + msg = "\nSelect 1 of the following options:\n\ + 1) Test connections\n\ + 2) Send files\n\ + -1) Help\n\ + -2) Reload files\n\ + 0) Exit" + print(msg) + _input = input() or "" + if _input in menu_dict: + option = menu_dict[_input] + option() + else: + pass + + def main(): - pass -# print(json.dumps(result_dic, indent=2, sort_keys=True)) + # Get ARGV + parser = argparse.ArgumentParser( + description=f'Python script for testing SSH connections and sending files (does not validate given routes) v{version}') + parser.add_argument('-v', '--version', action='store_true', help='Prints the version') + parser.add_argument('-hl', '--hosts_lists', action='append', + help='File that contains a list of hosts (multiple usage allowed)') + parser.add_argument('-f', '--file', action='append', + help='File to send through SSH (multiple usage allowed)') + parser.add_argument('-c', '--credentials_file', action='append', + help='Json file that contains a list of users to use and their password ie: ' \ + '{"u1":"p1","u2":"p2"}') + parser.add_argument('-thi', '--threading_instances', action='store', help='Number of threading instances (NOT ' \ + 'ENABLED)') + args = parser.parse_args() + if args.version: + print(f'Connection tester: v{version}') + else: + args_dic = {"files_to_send": args.file, "host_list": args.hosts_lists, + "credentials_files": args.credentials_file} + conf = CONFIG(**args_dic) + manager = MANAGER(config=conf) -# server = "192.168.1.3" -# username = "fadm" -# password = "" -# ssh = paramiko.SSHClient() -# ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) -# ssh.connect(server, username=username, password=password) -# ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command("uptime") -# print(ssh_stdout.read().decode()) + menu(manager=manager) + + +if __name__ == '__main__': + main() diff --git a/main_factory.py b/main_factory.py deleted file mode 100644 index b1b4361..0000000 --- a/main_factory.py +++ /dev/null @@ -1,331 +0,0 @@ -import argparse -import re -import json -from dataclasses import dataclass, field -import paramiko -from paramiko.ssh_exception import AuthenticationException -from paramiko import SSHClient -from scp import SCPClient -from os import get_terminal_size -from colored import fg, bg, attr - -# import colors -""" -Author: Oriol Filter -Date 01/03/2021 -""" - -version = '0.8.9.3' - - -def sent_message(filename: str, sent: bool) -> None: - print(f'\t[{filename}] ==> [{(fg(10), fg(9))[sent]}{("Sent", "ERROR")[sent]}{attr("reset")}]') - - -# def sending_file_progress(filename, size, sent): -# print("%s\'s progress: %.2f%% \r" % (filename, float(sent) / float(size) * 100)) - -class LOADING: - pass - -def sending_file_progress(filename: bytes, size: int, sent: int) -> None: - # float(sent) / float(size) * 100 - green_scale = [28, 34, 40, 76, 82, 118] - loading_chars = "|/-\\" - _filename = filename.decode() - screen_size = get_terminal_size() - cols = int(screen_size.columns / 100 * 50) - - percent = int(float(sent) / float(size) * 100) - _n = percent % len(loading_chars) - - _color: int - _color_picker = int((percent / 10) * (len(green_scale) / 10)) - - - space_filling = " " * int(100 - percent / 100 * cols) + " " - load_bar = f'{"=" * int(percent / 100 * cols)}=>' - - if int(percent) == 100: - _color = green_scale[len(green_scale)-1] - else: - _color = green_scale[_color_picker] - # print(int(percent / 100 * cols),end="\r") - # print("+"*int(percent / 100 * cols),end="") - # print(" ", end="") - # print("+" * int(100 - percent / 100 * cols), end="\r") - print(f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]', - end='\r') -# - -def return_list(txt: str) -> [str]: - _list = [] - if type(txt) in (tuple, list): - _list = txt - elif type(txt) in (str, int): - _list = [str(txt)] - return _list - - -@dataclass -class CONFIG: - files_to_send: str or [str] - host_files: str or [str] - credentials_files: str or [str] - - _files_to_send: str or [str] = field(default=None, init=False) - _host_files: str or [str] = field(default="./hostlist", init=False) - _credentials_files: str or [str] = field(default="./users.json", init=False) - - def __post_init__(self): - self.files_to_send = self._files_to_send or CONFIG._files_to_send - self.host_files = self._host_files or CONFIG._host_files - self.credentials_files = self._credentials_files or CONFIG._credentials_files - - @property - def files_to_send(self) -> str or [str]: - return self._files_to_send - - @files_to_send.setter - def files_to_send(self, files): - self._files_to_send = return_list(files) - - @property - def host_files(self) -> str or [str]: - return self._host_files - - @host_files.setter - def host_files(self, files): - self._host_files = return_list(files) - - @property - def credentials_files(self) -> str or [str]: - return self._credentials_files - - @credentials_files.setter - def credentials_files(self, files): - self._credentials_files = return_list(files) - - -@dataclass -class HOST: - hostname: str - client: paramiko.SSHClient or bool = False - files_sent: dict = False - - def __post_init__(self): - if not self.files_sent: self.files_sent = {} - - @property - def username(self) -> str or None: - if hasattr(self, "client") and self.client: - return self.client.get_transport().get_username() - - -def return_connection(hostname, user: str, password: str = "") -> SSHClient or bool: - try: - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - client.connect(hostname, username=user, password=password, timeout=2) - return client - except (AuthenticationException, TimeoutError) as e: - pass - return False - - -class MANAGER: - host_list: [HOST] - user_dict: {} - config: CONFIG - - def __init__(self, config: CONFIG = None): - self.update_config(config) - - def update_config(self, config: CONFIG = None): - self.host_list = [] - self.user_dict = {} - if config and type(config) is CONFIG: - self.config = config - else: - self.config = CONFIG(**{}) - self.load_hosts() - self.load_users() - - @property - def _files_to_send(self) -> [str]: - return self.config.files_to_send - - @property - def _hostlist_files(self) -> [str]: - return self.config.host_files - - @property - def _credentials_files(self) -> [str]: - return self.config.credentials_files - - def load_config(self): - self.load_hosts() - self.load_users() - - def load_hosts(self): - host_list: MANAGER.host_list = [] - for file in self._hostlist_files: - with open(file or "hostlist") as f: - lines = f.read().splitlines() - split_chars = ";,-" - regex_rule = "|".join(split_chars) - for line in lines: - result = re.split(regex_rule, line) - if result: - for entry in result: - host_list.append(HOST(hostname=entry)) - self.host_list = host_list - - def load_users(self): - user_dict = {} - for file in self._credentials_files: - with open(file or "users.json") as f: - data = json.load(f) - for user in data: - user_dict[user] = data[user] - self.user_dict = user_dict - - def print_connections(self): - print("\nPrinting connections:") - print("\tHOSTNAME\tUSER") - for host in self.host_list: - print(f"\t{host.hostname}\t{host.username or 'Ns/Nc'}") - - def test_connections(self, _print=True): - print("Attempting to login:") - for host in self.host_list: - host.client = False - print(f"{host.hostname}:", end="\t") - for user in self.user_dict: - connection = return_connection(hostname=host.hostname, user=user, password=self.user_dict[user]) - if connection: - host.client = connection - print("YES") - if not host.client: print("NO") - if _print: self.print_connections() - - @property - def _accesible_hosts(self) -> [HOST]: - list = [host for host in self.host_list if host.username] - return list - - @property - def _not_accesible_hosts(self) -> [HOST]: - list = [host for host in self.host_list if not host.username] - return list - - def _send_file(self, scp_session: SCPClient, file: str, remote_path: str = "/tmp") -> bool: - try: - scp_session.put(file, remote_path=remote_path) - print() - return True - except Exception as e: - print() - print(e) - return False - - def send_files(self, load_hosts=False): - if load_hosts: self.load_hosts() - - accessible_hosts = self._accesible_hosts - - if len(accessible_hosts) < 1 and load_hosts: - self.send_files(load_hosts=True) - elif len(accessible_hosts) < 1 and not load_hosts: - print("No accessible hosts available") - for host in accessible_hosts: - print(f"{host.hostname}:") - if not self._files_to_send or len(self._files_to_send) < 1: - print("No files to send") - else: - for file in self._files_to_send: - host.files_sent: dict - if not host.files_sent.get(file): - host.files_sent[file] = False - scp_session = SCPClient(host.client.get_transport(), progress=sending_file_progress) - x=self._send_file(scp_session=scp_session, file=file) - print(x) - print( - f'\t{file} -------------> {["No", "Yes"][x]}') - host.files_sent[file] = True - - -class MENU: - pass - - -def print_help(): - menu_dict = { - 0: "Exits the script", - 1: "Attempts to login using the loaded credentials", - 2: "Attemps to send the files", - -1: "Prints help", - -2: "Reload files" - } - for key in menu_dict: - print(f'{key}:\n\t{menu_dict[key]}') - pass - - -def menu(manager: MANAGER): - while True: - menu_dict = { - 0: exit, - 1: manager.test_connections, - 2: manager.send_files, - -1: print_help, - -2: manager.update_config - } - msg = "\nSelect 1 of the following options:\n\ - 1) Test connections\n\ - 2) Send files\n\ - -1) Help\n\ - -2) Reload files\n\ - 0) Exit" - print(msg) - _input = int(input() or 999999) - if _input in menu_dict: - option = menu_dict[_input] - option() - else: - pass - - -def main(): - # Get ARGV - parser = argparse.ArgumentParser( - description=f'Python script for testing SSH connections and sending files (does not validate given routes) v{version}') - parser.add_argument('-v', '--version', action='store_true', help='Prints the version') - parser.add_argument('-hf', '--hosts_file', action='append', - help='File that contains a list of hosts (multiple usage allowed)') - parser.add_argument('-f', '--file', action='append', - help='File to send through SSH (multiple usage allowed)') - parser.add_argument('-c', '--credentials_file', action='append', - help='Json file that contains a list of users to use and their password ie: ' \ - '{"u1":"p1","u2":"p2"}') - parser.add_argument('-thi', '--threading_instances', action='store', help='Number of threading instances (NOT ' \ - 'ENABLED)') - args = parser.parse_args() - if args.version: - print(f'Connection tester: v{version}') - else: - args_dic = {"files_to_send": args.file, "host_files": args.hosts_file, - "credentials_files": args.credentials_file} - conf = CONFIG(**args_dic) - manager = MANAGER(config=conf) - # manager.test_connections() - - menu(manager=manager) - - -if __name__ == '__main__': - # screen_size = get_terminal_size() - # print("|"*screen_size.columns) - main() - # sending_file_progress(filename=b"HA", size=10, sent=10) diff --git a/requirements.txt b/requirements.txt index 9e9a00bc7c8e3b3227f491c4294591b15e860a87..f92318ab970c5b53e52016f38741de90d0e21dc2 100644 GIT binary patch delta 35 ncmeyzIFD(9on$^k4nsae5ko3N3WF^W8ZziHm;kZyMDybSp9lyE delta 9 QcmbQo^pA0Z-Nd*9027u39RL6T diff --git a/t4.py b/t4.py index 5eb6233..46b7681 100644 --- a/t4.py +++ b/t4.py @@ -18,17 +18,13 @@ def sending_file_progress(filename: bytes, size: int, sent: int) -> None: _color: int _color_picker = int((percent / 10) * (len(green_scale) / 10)) - space_filling = " " * int(100 - percent / 100 * cols) + " " + space_filling = " " * int(100 - percent / (100 * cols)) + " " load_bar = f'{"=" * int(percent / 100 * cols)}=>' if int(percent) >= 100: _color = green_scale[len(green_scale) - 1] else: _color = green_scale[_color_picker] - # print(int(percent / 100 * cols),end="\r") - # print("+"*int(percent / 100 * cols),end="") - # print(" ", end="") - # print("+" * int(100 - percent / 100 * cols), end="\r") print( f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]', end='\r')