Send_files/main.py
2022-03-10 06:36:52 +01:00

347 lines
11 KiB
Python

import argparse
import re
import json
from dataclasses import dataclass, field
import paramiko
from paramiko.ssh_exception import AuthenticationException
from paramiko import SSHClient
from scp import SCPClient
from os import get_terminal_size
from colored import fg, bg, attr
# import colors
"""
Author: Oriol Filter
Date 06/03/2021
"""
version = '0.9'
@dataclass
class RGB:
r: int = 0
g: int = 0
b: int = 0
def sent_message(filename: str, sent: bool) -> None:
print(f'\t[{filename}] ==> [{(fg(10), fg(9))[sent]}{("Sent", "ERROR")[sent]}{attr("reset")}]')
class LOADING:
pass
def sending_file_progress(filename: bytes, size: int, sent: int) -> None:
def return_diff_color(c1: RGB, c2: RGB, percent: int) -> RGB:
def return_diff(n1, n2, percent=100) -> int:
if n1 > n2:
return n1 - int((n1 - n2) * (percent / 100))
elif n2 > n1:
return n2 - int((n2 - n1) * (percent / 100))
return n1
new_rgb = RGB(r=return_diff(c1.r, c2.r, percent), g=return_diff(c1.g, c2.g, percent),
b=return_diff(c1.b, c2.b, percent))
return new_rgb
def color_to_hex(color: RGB) -> str:
def return_hex_number(n: int):
hnum = hex(int(n))
return f'{str(hnum).replace("0x", "").zfill(2)}'
r: str = return_hex_number(color.r)
g: str = return_hex_number(color.g)
b: str = return_hex_number(color.b)
return f"{r}{g}{b}"
base_color= RGB(r=68, g=121, b=84)
end_color= RGB(r=0, g=255, b=68)
loading_chars = "|/-\\"
_filename = filename.decode()
screen_size = get_terminal_size()
available_colums = int(screen_size.columns / 100 * 50)
percent = int(float(sent) / float(size) * 100)
_n = percent % len(loading_chars)
space_filling = " " * int((available_colums / 100) * (100 - percent))
load_bar = "=" * int((available_colums / 100) * percent) + '=>'
_color = f'#{color_to_hex(return_diff_color(base_color,end_color,percent))}'
print(
f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]',
end='\r')
def return_list(txt: str) -> [str]:
_list = []
if type(txt) in (tuple, list):
_list = txt
elif type(txt) in (str, int):
_list = [str(txt)]
return _list
@dataclass
class CONFIG:
files_to_send: str or [str]
host_lists: str or [str]
credentials_files: str or [str]
_files_to_send: str or [str] = field(default=None, init=False)
_host_lists: str or [str] = field(default="./hostlist", init=False)
_credentials_files: str or [str] = field(default="./users.json", init=False)
def __post_init__(self):
self.files_to_send = self._files_to_send or CONFIG._files_to_send
self.host_lists = self._host_lists or CONFIG._host_lists
self.credentials_files = self._credentials_files or CONFIG._credentials_files
@property
def files_to_send(self) -> str or [str]:
return self._files_to_send
@files_to_send.setter
def files_to_send(self, files):
self._files_to_send = return_list(files)
@property
def host_lists(self) -> str or [str]:
return self._host_lists
@host_lists.setter
def host_lists(self, files):
self._host_lists = return_list(files)
@property
def credentials_files(self) -> str or [str]:
return self._credentials_files
@credentials_files.setter
def credentials_files(self, files):
self._credentials_files = return_list(files)
@dataclass
class HOST:
hostname: str
client: paramiko.SSHClient or bool = False
files_sent: dict = False
def __post_init__(self):
if not self.files_sent: self.files_sent = {}
@property
def username(self) -> str or None:
if hasattr(self, "client") and self.client:
return self.client.get_transport().get_username()
def return_connection(hostname, user: str, password: str = "") -> SSHClient or bool:
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, username=user, password=password, timeout=2)
return client
except (AuthenticationException, TimeoutError) as e:
pass
return False
class MANAGER:
host_list: [HOST]
user_dict: {}
config: CONFIG
def __init__(self, config: CONFIG = None):
self.update_config(config)
def update_config(self, config: CONFIG = None):
self.host_list = []
self.user_dict = {}
if config and type(config) is CONFIG:
self.config = config
else:
self.config = CONFIG(**{})
self.load_hosts()
self.load_users()
@property
def _files_to_send(self) -> [str]:
return self.config.files_to_send
@property
def _hostlist_files(self) -> [str]:
return self.config.host_lists
@property
def _credentials_files(self) -> [str]:
return self.config.credentials_files
def load_config(self):
self.load_hosts()
self.load_users()
def load_hosts(self):
host_list: MANAGER.host_list = []
for file in self._hostlist_files:
with open(file or "hostlist") as f:
lines = f.read().splitlines()
split_chars = ";,-"
regex_rule = "|".join(split_chars)
for line in lines:
result = re.split(regex_rule, line)
if result:
for entry in result:
host_list.append(HOST(hostname=entry))
self.host_list = host_list
def load_users(self):
user_dict = {}
for file in self._credentials_files:
with open(file or "users.json") as f:
data = json.load(f)
for user in data:
user_dict[user] = data[user]
self.user_dict = user_dict
def print_connections(self):
print("\nPrinting connections:")
print("\tHOSTNAME\tUSER")
for host in self.host_list:
print(f"\t{host.hostname}\t{host.username or 'Ns/Nc'}")
def test_connections(self, _print=True):
print("Attempting to login:")
for host in self.host_list:
host.client = False
print(f"{host.hostname}:", end="\t")
for user in self.user_dict:
connection = return_connection(hostname=host.hostname, user=user, password=self.user_dict[user])
if connection:
host.client = connection
print("YES")
if not host.client: print("NO")
if _print: self.print_connections()
@property
def _accesible_hosts(self) -> [HOST]:
list = [host for host in self.host_list if host.username]
return list
@property
def _not_accesible_hosts(self) -> [HOST]:
list = [host for host in self.host_list if not host.username]
return list
def _send_file(self, scp_session: SCPClient, file: str, remote_path: str = "/tmp") -> bool:
try:
scp_session.put(file, remote_path=remote_path)
print()
return True
except Exception as e:
print()
print(e)
return False
def send_files(self, load_hosts=False):
if load_hosts:
self.load_hosts()
accessible_hosts = self._accesible_hosts
if len(accessible_hosts) < 1 and load_hosts:
self.send_files(load_hosts=True)
elif len(accessible_hosts) < 1 and not load_hosts:
print("No accessible hosts available")
for host in accessible_hosts:
print(f"{host.hostname}:")
if not self._files_to_send or len(self._files_to_send) < 1:
print("No files to send")
else:
for file in self._files_to_send:
host.files_sent: dict
if not host.files_sent.get(file):
host.files_sent[file] = False
scp_session = SCPClient(host.client.get_transport(), progress=sending_file_progress)
x = self._send_file(scp_session=scp_session, file=file)
print(x)
print(
f'\t{file} -------------> {["No", "Yes"][x]}')
host.files_sent[file] = True
class MENU:
pass
def print_help():
menu_dict = {
"0": "Exits the script",
"1": "Attempts to login using the loaded credentials",
"2": "Attemps to send the files",
"-1": "Prints help",
"-2": "Reload files"
}
for key in menu_dict:
print(f'{key}:\n\t{menu_dict[key]}')
pass
def menu(manager: MANAGER):
while True:
menu_dict = {
"0": exit,
"1": manager.test_connections,
"2": manager.send_files,
"-1": print_help,
"-2": manager.update_config
}
msg = "\nSelect 1 of the following options:\n\
1) Test connections\n\
2) Send files\n\
-1) Help\n\
-2) Reload files\n\
0) Exit"
print(msg)
_input = input() or ""
if _input in menu_dict:
option = menu_dict[_input]
option()
else:
pass
def main():
# Get ARGV
parser = argparse.ArgumentParser(
description=f'Python script for testing SSH connections and sending files (does not validate given routes) v{version}')
parser.add_argument('-v', '--version', action='store_true', help='Prints the version')
parser.add_argument('-hl', '--hosts_lists', action='append',
help='File that contains a list of hosts (multiple usage allowed)')
parser.add_argument('-f', '--file', action='append',
help='File to send through SSH (multiple usage allowed)')
parser.add_argument('-c', '--credentials_file', action='append',
help='Json file that contains a list of users to use and their password ie: ' \
'{"u1":"p1","u2":"p2"}')
parser.add_argument('-thi', '--threading_instances', action='store', help='Number of threading instances (NOT ' \
'ENABLED)')
args = parser.parse_args()
if args.version:
print(f'Connection tester: v{version}')
else:
args_dic = {"files_to_send": args.file, "host_lists": args.hosts_lists,
"credentials_files": args.credentials_file}
conf = CONFIG(**args_dic)
manager = MANAGER(config=conf)
menu(manager=manager)
if __name__ == '__main__':
main()