From 53802615a1c7786c4be30d533f02e882b88294f1 Mon Sep 17 00:00:00 2001 From: OriolFilter Date: Sun, 10 Apr 2022 04:24:09 +0200 Subject: [PATCH] Base version --- Example.py | 14 +++ Objects.py | 44 ++++++++ Scrapper.py | 310 ++++++++++++++++++++++++++++++++++++++++++++++++++++ Tools.py | 67 ++++++++++++ 4 files changed, 435 insertions(+) create mode 100644 Example.py create mode 100644 Objects.py create mode 100644 Scrapper.py create mode 100644 Tools.py diff --git a/Example.py b/Example.py new file mode 100644 index 0000000..dcf9826 --- /dev/null +++ b/Example.py @@ -0,0 +1,14 @@ +from Scrapper import Khinsider + +x = Khinsider() + +search_list = [ + "Pokemon", + "Digimon", +] + +x.generate_index() +for word in search_list: + for i in x.find_entry(keyword=word): + print(i.name) + x.download_album(i, f"./tmp") diff --git a/Objects.py b/Objects.py new file mode 100644 index 0000000..c5e778c --- /dev/null +++ b/Objects.py @@ -0,0 +1,44 @@ +## Filse used to store generic objects +from dataclasses import dataclass +from Tools import fix_url + + +class OjectTypeNotSupported(Exception): + """ + Raised when the object given it's not supported + """ + +@dataclass +class ENTRY: + """ + album_id used when consulting the database + source_id used when consulting the database + name of the album + url of the album + available used when checking if sources from database still available + # file_list: list of urls from the files to download ?? idk if + """ + name: str = None + url: str = None + source: str = None + date_added: str = None + release_date: str = None + available: bool = False + + def __post_init__(self): + # print(type(self.url)) + # print(self.url) + self.name = " ".join(str(self.name).split()) + self.url = fix_url(self.url) + self.source = str(self.source) + self.available = bool(self.available) + if self.date_added: self.date_added = str(self.date_added) # Eventually change to date object + if self.release_date: self.release_date = str(self.release_date) + +@dataclass +class ALBUM(ENTRY): + number_of_tracks: int = None + + def __post_init__(self): + super().__post_init__() + if self.number_of_tracks: self.number_of_tracks = int(self.number_of_tracks) diff --git a/Scrapper.py b/Scrapper.py new file mode 100644 index 0000000..220faec --- /dev/null +++ b/Scrapper.py @@ -0,0 +1,310 @@ +import mimetypes +import time as _time +from abc import ABC +import re +from urllib.request import unquote + +import requests +from bs4 import BeautifulSoup as BeautifulSoup +from Objects import ALBUM, ENTRY, OjectTypeNotSupported +from pathlib import Path +from os.path import basename, abspath, normpath, join as joinpath +from Tools import progress_bar + +''' +Classes used to scrap pages and gather their information. +''' + + +class __default(object): + def __init__(self): + self._found_entries = [] + + _found_entries: [ENTRY, ALBUM] + source = None + domain_url = None + # supported_entries = [ENTRY, ALBUM] + supported_entries = [ENTRY, ALBUM] + + # def _generate_index(self, sleep=0, re_pattern=None, url=None): + # """ + # Finds entries and stores them in self._found_entries + # :param url: Give an url to generate an index (unused) + # :param sleep: used to avoid bombarding the page suddenly + # :param re_pattern: used to filter by the entries found. + # """ + # raise NotImplementedError() + + def generate_index(self, sleep=0, re_pattern=None, url=None): + """ + Gets a list of entries generated from the main index + :param url: Give an url to generate an index (unused) + :param sleep: used to avoid bombarding the page suddenly + :param re_pattern: used to filter by the entries found. + :return: list of Album items + """ + self._found_entries = [] + self.append_index(sleep=sleep, re_pattern=re_pattern, url=url) + + def _append_index(self, sleep=0, re_pattern=None, url=None): + """ + Finds entries and stores them in self._found_entries + :param url: Give an url to generate an index (unused) + :param sleep: used to avoid bombarding the page suddenly + :param re_pattern: used to filter by the entries found. + """ + raise NotImplementedError() + + def append_index(self, sleep=0, re_pattern=None, url=None): + """ + Gets a list of entries generated from the main index + :param url: Give an url to generate an index (unused) + :param sleep: used to avoid bombarding the page suddenly + :param re_pattern: used to filter by the entries found. + :return: list of Album items + """ + self._append_index(sleep=sleep, re_pattern=re_pattern, url=url) + + # def generate_music_index(self, sleep=0, re_pattern=None, url=None) -> list[ALBUM]: + # """ + # Gets a list of entries generated from the main index + # :param sleep: used to avoid bombarding the page suddenly + # :param re_pattern: used to filter by the entries found. + # :return: list of Album items + # """ + # raise NotImplementedError() + + def __scrap_index(self, url) -> list: + """ + Returns a list of elements found in the index url given + :param url: Url used + :return: list of + """ + raise NotImplementedError() + + def _inspect_entry(self, obj: ENTRY): + raise NotImplementedError() + + def inspect(self, obj: ENTRY) -> None: + """ + Checks the type of the object given. + If valid: redirects the object given to __inspect_entry + If not valid: Raises an Exception. + """ + if type(obj) not in self.supported_entries: + raise OjectTypeNotSupported() + else: + self._inspect_entry(obj) + + def _get_files(self, obj) -> list[str]: + """ + Returns a list of files from the url given + :param url: + :return: list + """ + raise NotImplementedError() + + def get_files(self, obj, images=True) -> list[str]: + """ + Checks the type of the object given. + If valid: redirects the object given to _return_files + If not valid: Raises an Exception. + """ + if type(obj) not in self.supported_entries: + raise OjectTypeNotSupported() + else: + return self._get_files(obj) + + def download_album(self, album: ALBUM | ENTRY, deposit_folder: str): + print(f"\t> {album.name}") + # create folder(s) + album_folder = normpath(abspath(joinpath(deposit_folder, album.name.replace(":", "_")))) + # get files + files = self.get_files(album) + # skip not available + # download files + for url in files: + self.download_file(url=url, destination_folder=album_folder) + print() + + @staticmethod + def download_file(url: str, destination_folder: str, file_name: str = None) -> None: + url = unquote(url) + file_name = file_name or basename(url) + """Downloads the file and places it to the given destination""" + mime, encoding = mimetypes.guess_type(url) + if mime: + Path(destination_folder).mkdir(parents=True, exist_ok=True) + _file_path: str = f'{destination_folder}/{file_name}' + + with requests.get(url, stream=True) as response: + with open(abspath(normpath(_file_path)), 'wb') as f: + total_length = response.headers.get('content-length') + if total_length is None: # no content length header + f.write(response.content) + else: + progress = 0 + total_length = int(total_length) + for chunk in response.iter_content(chunk_size=1024): + progress += len(chunk) + f.write(chunk) + progress_bar(filename=file_name, total=total_length, current=progress) + + @property + def found_entries(self) -> [ENTRY, ALBUM]: + return self._found_entries + + @found_entries.setter + def found_entries(self, x): + # validations? + if type(x) is not list: + x = [x] + self._found_entries = x + + def find_entry(self, keyword) -> [ENTRY, ALBUM]: + entry_list: [ENTRY, ALBUM] = [] + + def __string__cleanup(text: str): + text = text.lower() + re.sub(r'[^\w]', '', text) + return text + for entry in self._found_entries: + # find? + if __string__cleanup(keyword) in __string__cleanup(entry.name): + entry_list.append(entry) + return entry_list + + def __download_found(self): + raise NotImplementedError + + def download_found(self): + """ + Download all found albums + """ + self.__download_found() + + +class Khinsider(__default, ABC): + """ + This page only returns Albums + """ + source = 'khinsider' + domain = "khinsider.com" + supported_entries = [ENTRY, ALBUM] + __domain_url = f"https://downloads.{domain}" + __base_index_url = f"{__domain_url}/game-soundtracks/browse" + + def _append_index(self, sleep=0, re_pattern=None, url=None): + if url: + self.__scrap_index(url) + _time.sleep(sleep) + else: + for char in "#ABCDEFGHIJKLMNOPQRSTUVWXYZ": + self.append_index(sleep=sleep, re_pattern=re_pattern, url=f'{self.__base_index_url}/{char}') + + def __scrap_index(self, url: str) -> None: + print(f"Scrapping index from {url}") + rq = requests.get(url) + if rq.status_code != 200: + raise Exception('F') + else: + soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p', + attrs={'align': "left"}) + for link in soup.findAll('a'): + if link.contents is None: + pass + else: + _json: dict = { + 'name': link.contents[0], + 'url': f'{self.__domain_url}{link.get("href")}', + 'source': self.source, + 'available': True + } + self._found_entries.append(ALBUM(**_json)) + + def _album_from_url(self, url: str) -> ALBUM: + # https://downloads.khinsider.com/game-soundtracks/album/persona-5-royal + _json: dict = { + 'name': "", + 'url': url, + 'source': self.source, + 'available': False + } + rq = requests.get(url) + if rq.status_code != 200: + available = False + raise Exception('Album not available') + else: + available = True + soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p', attrs={ + 'align': "left"}) + + name = any([line.lower().replace("Album name: ".lower(), "").strip() for line in soup.text.splitlines() if + "Number of Files:".lower() in line.lower()]) + + _json['name'] = name + _json['available'] = available + return ALBUM(**_json) + + def _inspect_entry(self, obj) -> None: + if type(obj) is ALBUM: + obj: ALBUM + + rq = requests.get(obj.url) + if rq.status_code != 200: + obj.available = False + raise Exception('Album not available') + else: + + obj.date_added = None + soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p', attrs={ + 'align': "left"}) + obj.number_of_tracks = \ + [line.lower().replace("Number of Files: ".lower(), "").strip() for line in soup.text.splitlines() if + "Number of Files:".lower() in line.lower()][0] + obj.date_added = \ + [line.lower().replace("Date added: ".lower(), "").strip() for line in soup.text.splitlines() if + "Date added: ".lower() in line.lower()][0] + obj.available = True + + def _get_files(self, obj, get_images=True) -> list[str]: + file_list: [ENTRY, ALBUM, ] = [] + if type(obj) is ALBUM: + obj: ALBUM + + rq = requests.get(obj.url) + if rq.status_code != 200: + obj.available = False + raise Exception('F') + else: + obj.date_added = None + + music_soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find( + name='table', id='songlist').findAll('tr') + + if get_images: + image_soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').findNext( + name="table").findAll(name="a") + for img in image_soup: + _url = img.get('href') + file_list.append(_url) + + for element in music_soup: + td = element.find(name='td', attrs={'class': 'clickable-row'}) + if td: + _url = f"{self.__domain_url}{td.find('a').get('href')}" + rq2 = requests.get(_url) + if rq2.status_code != 200: + raise Exception('F') + else: + soup2 = [p.find('a') for p in + BeautifulSoup(rq2.text, 'html.parser').find(name='div', id='pageContent').findAll( + 'p')[2:] if p.find(name='a')] + for element2 in soup2: + file_list.append(element2.get("href")) + + return file_list + + def __download_found(self): + for entry in self.found_entries: + self.download_album(entry) diff --git a/Tools.py b/Tools.py new file mode 100644 index 0000000..80ca0b1 --- /dev/null +++ b/Tools.py @@ -0,0 +1,67 @@ +from dataclasses import dataclass +from urllib.parse import quote, urlparse, urlunparse +from os import get_terminal_size +from colored import fg, bg, attr + + +@dataclass +class RGB: + r: int = 0 + g: int = 0 + b: int = 0 + + +def fix_url(url: str): + parts = urlparse(str(url)) + return urlunparse(parts._replace(path=quote(parts.path))) + + +def progress_bar(filename: bytes or str, current: int, total: int) -> None: + _filename: str + if type(filename) is str: + _filename = filename + else: + _filename = filename.decode() + + def return_diff_color(c1: RGB, c2: RGB, percent: int) -> RGB: + def return_diff(n1, n2, _percent=100) -> int: + if n1 > n2: + return n1 - int((n1 - n2) * (_percent / 100)) + elif n1 < n2: + return n1 + int((n2 - n1) * (_percent / 100)) + return n1 + + new_rgb = RGB(r=return_diff(c1.r, c2.r, percent), g=return_diff(c1.g, c2.g, percent), + b=return_diff(c1.b, c2.b, percent)) + return new_rgb + + def color_to_hex(color: RGB) -> str: + def return_hex_number(n: int): + hnum = hex(int(n)) + return f'{str(hnum).replace("0x", "").zfill(2)}' + + r: str = return_hex_number(color.r) + g: str = return_hex_number(color.g) + b: str = return_hex_number(color.b) + return f"{r}{g}{b}" + + base_color = RGB(r=68, g=121, b=84) + end_color = RGB(r=0, g=255, b=68) + + loading_chars = "|/-\\" + try: + screen_size = get_terminal_size().columns() + except Exception: + screen_size = 120 + available_colums = int(screen_size / 100 * 50) + + percent = int(float(current) / float(total) * 100) + _n = percent % len(loading_chars) + + load_bar = "=" * int((available_colums / 100) * percent) + '=>' + space_filling = " " * int(available_colums-len(load_bar)) + + _color = f'#{color_to_hex(return_diff_color(base_color, end_color, percent))}' + print( + f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} {[" "," ",""][len(str(percent))-1]}[{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]', + end='\r')