323 lines
10 KiB
Python
323 lines
10 KiB
Python
import argparse
|
|
import re
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
import paramiko
|
|
from paramiko.ssh_exception import AuthenticationException
|
|
from paramiko import SSHClient
|
|
from scp import SCPClient
|
|
from os import get_terminal_size
|
|
from colored import fg, bg, attr
|
|
|
|
# import colors
|
|
"""
|
|
Author: Oriol Filter
|
|
Date 06/03/2021
|
|
"""
|
|
|
|
version = '0.9'
|
|
|
|
|
|
def sent_message(filename: str, sent: bool) -> None:
|
|
print(f'\t[{filename}] ==> [{(fg(10), fg(9))[sent]}{("Sent", "ERROR")[sent]}{attr("reset")}]')
|
|
|
|
|
|
class LOADING:
|
|
pass
|
|
|
|
|
|
def sending_file_progress(filename: bytes, size: int, sent: int) -> None:
|
|
# float(sent) / float(size) * 100
|
|
green_scale = [28, 34, 40, 76, 82, 118]
|
|
loading_chars = "|/-\\"
|
|
_filename = filename.decode()
|
|
screen_size = get_terminal_size()
|
|
cols = int(screen_size.columns / 100 * 50)
|
|
|
|
percent = int(float(sent) / float(size) * 100)
|
|
_n = percent % len(loading_chars)
|
|
|
|
_color: int
|
|
_color_picker = int((percent / 10) * (len(green_scale) / 10))
|
|
|
|
space_filling = " " * int(100 - percent / (100 * cols)) + " "
|
|
load_bar = f'{"=" * int(percent / 100 * cols)}=>'
|
|
|
|
if int(percent) >= 100:
|
|
_color = green_scale[len(green_scale) - 1]
|
|
else:
|
|
_color = green_scale[_color_picker]
|
|
print(
|
|
f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]',
|
|
end='\r')
|
|
|
|
|
|
def return_list(txt: str) -> [str]:
|
|
_list = []
|
|
if type(txt) in (tuple, list):
|
|
_list = txt
|
|
elif type(txt) in (str, int):
|
|
_list = [str(txt)]
|
|
return _list
|
|
|
|
|
|
@dataclass
|
|
class CONFIG:
|
|
files_to_send: str or [str]
|
|
host_lists: str or [str]
|
|
credentials_files: str or [str]
|
|
|
|
_files_to_send: str or [str] = field(default=None, init=False)
|
|
_host_lists: str or [str] = field(default="./hostlist", init=False)
|
|
_credentials_files: str or [str] = field(default="./users.json", init=False)
|
|
|
|
def __post_init__(self):
|
|
self.files_to_send = self._files_to_send or CONFIG._files_to_send
|
|
self.host_lists = self._host_lists or CONFIG._host_lists
|
|
self.credentials_files = self._credentials_files or CONFIG._credentials_files
|
|
|
|
@property
|
|
def files_to_send(self) -> str or [str]:
|
|
return self._files_to_send
|
|
|
|
@files_to_send.setter
|
|
def files_to_send(self, files):
|
|
self._files_to_send = return_list(files)
|
|
|
|
@property
|
|
def host_lists(self) -> str or [str]:
|
|
return self._host_lists
|
|
|
|
@host_lists.setter
|
|
def host_lists(self, files):
|
|
self._host_lists = return_list(files)
|
|
|
|
@property
|
|
def credentials_files(self) -> str or [str]:
|
|
return self._credentials_files
|
|
|
|
@credentials_files.setter
|
|
def credentials_files(self, files):
|
|
self._credentials_files = return_list(files)
|
|
|
|
|
|
@dataclass
|
|
class HOST:
|
|
hostname: str
|
|
client: paramiko.SSHClient or bool = False
|
|
files_sent: dict = False
|
|
|
|
def __post_init__(self):
|
|
if not self.files_sent: self.files_sent = {}
|
|
|
|
@property
|
|
def username(self) -> str or None:
|
|
if hasattr(self, "client") and self.client:
|
|
return self.client.get_transport().get_username()
|
|
|
|
|
|
def return_connection(hostname, user: str, password: str = "") -> SSHClient or bool:
|
|
try:
|
|
client = paramiko.SSHClient()
|
|
client.load_system_host_keys()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
client.connect(hostname, username=user, password=password, timeout=2)
|
|
return client
|
|
except (AuthenticationException, TimeoutError) as e:
|
|
pass
|
|
return False
|
|
|
|
|
|
class MANAGER:
|
|
host_list: [HOST]
|
|
user_dict: {}
|
|
config: CONFIG
|
|
|
|
def __init__(self, config: CONFIG = None):
|
|
self.update_config(config)
|
|
|
|
def update_config(self, config: CONFIG = None):
|
|
self.host_list = []
|
|
self.user_dict = {}
|
|
if config and type(config) is CONFIG:
|
|
self.config = config
|
|
else:
|
|
self.config = CONFIG(**{})
|
|
self.load_hosts()
|
|
self.load_users()
|
|
|
|
@property
|
|
def _files_to_send(self) -> [str]:
|
|
return self.config.files_to_send
|
|
|
|
@property
|
|
def _hostlist_files(self) -> [str]:
|
|
return self.config.host_lists
|
|
|
|
@property
|
|
def _credentials_files(self) -> [str]:
|
|
return self.config.credentials_files
|
|
|
|
def load_config(self):
|
|
self.load_hosts()
|
|
self.load_users()
|
|
|
|
def load_hosts(self):
|
|
host_list: MANAGER.host_list = []
|
|
for file in self._hostlist_files:
|
|
with open(file or "hostlist") as f:
|
|
lines = f.read().splitlines()
|
|
split_chars = ";,-"
|
|
regex_rule = "|".join(split_chars)
|
|
for line in lines:
|
|
result = re.split(regex_rule, line)
|
|
if result:
|
|
for entry in result:
|
|
host_list.append(HOST(hostname=entry))
|
|
self.host_list = host_list
|
|
|
|
def load_users(self):
|
|
user_dict = {}
|
|
for file in self._credentials_files:
|
|
with open(file or "users.json") as f:
|
|
data = json.load(f)
|
|
for user in data:
|
|
user_dict[user] = data[user]
|
|
self.user_dict = user_dict
|
|
|
|
def print_connections(self):
|
|
print("\nPrinting connections:")
|
|
print("\tHOSTNAME\tUSER")
|
|
for host in self.host_list:
|
|
print(f"\t{host.hostname}\t{host.username or 'Ns/Nc'}")
|
|
|
|
def test_connections(self, _print=True):
|
|
print("Attempting to login:")
|
|
for host in self.host_list:
|
|
host.client = False
|
|
print(f"{host.hostname}:", end="\t")
|
|
for user in self.user_dict:
|
|
connection = return_connection(hostname=host.hostname, user=user, password=self.user_dict[user])
|
|
if connection:
|
|
host.client = connection
|
|
print("YES")
|
|
if not host.client: print("NO")
|
|
if _print: self.print_connections()
|
|
|
|
@property
|
|
def _accesible_hosts(self) -> [HOST]:
|
|
list = [host for host in self.host_list if host.username]
|
|
return list
|
|
|
|
@property
|
|
def _not_accesible_hosts(self) -> [HOST]:
|
|
list = [host for host in self.host_list if not host.username]
|
|
return list
|
|
|
|
def _send_file(self, scp_session: SCPClient, file: str, remote_path: str = "/tmp") -> bool:
|
|
try:
|
|
scp_session.put(file, remote_path=remote_path)
|
|
print()
|
|
return True
|
|
except Exception as e:
|
|
print()
|
|
print(e)
|
|
return False
|
|
|
|
def send_files(self, load_hosts=False):
|
|
if load_hosts:
|
|
self.load_hosts()
|
|
|
|
accessible_hosts = self._accesible_hosts
|
|
|
|
if len(accessible_hosts) < 1 and load_hosts:
|
|
self.send_files(load_hosts=True)
|
|
elif len(accessible_hosts) < 1 and not load_hosts:
|
|
print("No accessible hosts available")
|
|
for host in accessible_hosts:
|
|
print(f"{host.hostname}:")
|
|
if not self._files_to_send or len(self._files_to_send) < 1:
|
|
print("No files to send")
|
|
else:
|
|
for file in self._files_to_send:
|
|
host.files_sent: dict
|
|
if not host.files_sent.get(file):
|
|
host.files_sent[file] = False
|
|
scp_session = SCPClient(host.client.get_transport(), progress=sending_file_progress)
|
|
x = self._send_file(scp_session=scp_session, file=file)
|
|
print(x)
|
|
print(
|
|
f'\t{file} -------------> {["No", "Yes"][x]}')
|
|
host.files_sent[file] = True
|
|
|
|
|
|
class MENU:
|
|
pass
|
|
|
|
|
|
def print_help():
|
|
menu_dict = {
|
|
"0": "Exits the script",
|
|
"1": "Attempts to login using the loaded credentials",
|
|
"2": "Attemps to send the files",
|
|
"-1": "Prints help",
|
|
"-2": "Reload files"
|
|
}
|
|
for key in menu_dict:
|
|
print(f'{key}:\n\t{menu_dict[key]}')
|
|
pass
|
|
|
|
|
|
def menu(manager: MANAGER):
|
|
while True:
|
|
menu_dict = {
|
|
"0": exit,
|
|
"1": manager.test_connections,
|
|
"2": manager.send_files,
|
|
"-1": print_help,
|
|
"-2": manager.update_config
|
|
}
|
|
msg = "\nSelect 1 of the following options:\n\
|
|
1) Test connections\n\
|
|
2) Send files\n\
|
|
-1) Help\n\
|
|
-2) Reload files\n\
|
|
0) Exit"
|
|
print(msg)
|
|
_input = input() or ""
|
|
if _input in menu_dict:
|
|
option = menu_dict[_input]
|
|
option()
|
|
else:
|
|
pass
|
|
|
|
|
|
def main():
|
|
# Get ARGV
|
|
parser = argparse.ArgumentParser(
|
|
description=f'Python script for testing SSH connections and sending files (does not validate given routes) v{version}')
|
|
parser.add_argument('-v', '--version', action='store_true', help='Prints the version')
|
|
parser.add_argument('-hl', '--hosts_lists', action='append',
|
|
help='File that contains a list of hosts (multiple usage allowed)')
|
|
parser.add_argument('-f', '--file', action='append',
|
|
help='File to send through SSH (multiple usage allowed)')
|
|
parser.add_argument('-c', '--credentials_file', action='append',
|
|
help='Json file that contains a list of users to use and their password ie: ' \
|
|
'{"u1":"p1","u2":"p2"}')
|
|
parser.add_argument('-thi', '--threading_instances', action='store', help='Number of threading instances (NOT ' \
|
|
'ENABLED)')
|
|
args = parser.parse_args()
|
|
if args.version:
|
|
print(f'Connection tester: v{version}')
|
|
else:
|
|
args_dic = {"files_to_send": args.file, "host_list": args.hosts_lists,
|
|
"credentials_files": args.credentials_file}
|
|
conf = CONFIG(**args_dic)
|
|
manager = MANAGER(config=conf)
|
|
|
|
menu(manager=manager)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|