import mimetypes import time as _time from abc import ABC import re from urllib.request import unquote import requests from bs4 import BeautifulSoup as BeautifulSoup from Objects import ALBUM, ENTRY, OjectTypeNotSupported from pathlib import Path from os.path import basename, abspath, normpath, join as joinpath from Tools import progress_bar ''' Classes used to scrap pages and gather their information. ''' class __default(object): def __init__(self): self._found_entries = [] _found_entries: [ENTRY, ALBUM] source = None domain_url = None # supported_entries = [ENTRY, ALBUM] supported_entries = [ENTRY, ALBUM] # def _generate_index(self, sleep=0, re_pattern=None, url=None): # """ # Finds entries and stores them in self._found_entries # :param url: Give an url to generate an index (unused) # :param sleep: used to avoid bombarding the page suddenly # :param re_pattern: used to filter by the entries found. # """ # raise NotImplementedError() def generate_index(self, sleep=0, re_pattern=None, url=None): """ Gets a list of entries generated from the main index :param url: Give an url to generate an index (unused) :param sleep: used to avoid bombarding the page suddenly :param re_pattern: used to filter by the entries found. :return: list of Album items """ self._found_entries = [] self.append_index(sleep=sleep, re_pattern=re_pattern, url=url) def _append_index(self, sleep=0, re_pattern=None, url=None): """ Finds entries and stores them in self._found_entries :param url: Give an url to generate an index (unused) :param sleep: used to avoid bombarding the page suddenly :param re_pattern: used to filter by the entries found. """ raise NotImplementedError() def append_index(self, sleep=0, re_pattern=None, url=None): """ Gets a list of entries generated from the main index :param url: Give an url to generate an index (unused) :param sleep: used to avoid bombarding the page suddenly :param re_pattern: used to filter by the entries found. :return: list of Album items """ self._append_index(sleep=sleep, re_pattern=re_pattern, url=url) # def generate_music_index(self, sleep=0, re_pattern=None, url=None) -> list[ALBUM]: # """ # Gets a list of entries generated from the main index # :param sleep: used to avoid bombarding the page suddenly # :param re_pattern: used to filter by the entries found. # :return: list of Album items # """ # raise NotImplementedError() def __scrap_index(self, url) -> list: """ Returns a list of elements found in the index url given :param url: Url used :return: list of """ raise NotImplementedError() def _inspect_entry(self, obj: ENTRY): raise NotImplementedError() def inspect(self, obj: ENTRY) -> None: """ Checks the type of the object given. If valid: redirects the object given to __inspect_entry If not valid: Raises an Exception. """ if type(obj) not in self.supported_entries: raise OjectTypeNotSupported() else: self._inspect_entry(obj) def _get_files(self, obj) -> list[str]: """ Returns a list of files from the url given :param url: :return: list """ raise NotImplementedError() def get_files(self, obj, images=True) -> list[str]: """ Checks the type of the object given. If valid: redirects the object given to _return_files If not valid: Raises an Exception. """ if type(obj) not in self.supported_entries: raise OjectTypeNotSupported() else: return self._get_files(obj) def download_album(self, album: ALBUM | ENTRY, deposit_folder: str): print(f"\t> {album.name}") # create folder(s) album_folder = normpath(abspath(joinpath(deposit_folder, album.name.replace(":", "_")))) # get files files = self.get_files(album) # skip not available # download files for url in files: self.download_file(url=url, destination_folder=album_folder) print() @staticmethod def download_file(url: str, destination_folder: str, file_name: str = None) -> None: url = unquote(url) file_name = file_name or basename(url) """Downloads the file and places it to the given destination""" mime, encoding = mimetypes.guess_type(url) if mime: Path(destination_folder).mkdir(parents=True, exist_ok=True) _file_path: str = f'{destination_folder}/{file_name}' with requests.get(url, stream=True) as response: with open(abspath(normpath(_file_path)), 'wb') as f: total_length = response.headers.get('content-length') if total_length is None: # no content length header f.write(response.content) else: progress = 0 total_length = int(total_length) for chunk in response.iter_content(chunk_size=1024): progress += len(chunk) f.write(chunk) progress_bar(filename=file_name, total=total_length, current=progress) @property def found_entries(self) -> [ENTRY, ALBUM]: return self._found_entries @found_entries.setter def found_entries(self, x): # validations? if type(x) is not list: x = [x] self._found_entries = x def find_entry(self, keyword) -> [ENTRY, ALBUM]: entry_list: [ENTRY, ALBUM] = [] def __string__cleanup(text: str): text = text.lower() re.sub(r'[^\w]', '', text) return text for entry in self._found_entries: # find? if __string__cleanup(keyword) in __string__cleanup(entry.name): entry_list.append(entry) return entry_list def __download_found(self): raise NotImplementedError def download_found(self): """ Download all found albums """ self.__download_found() class Khinsider(__default, ABC): """ This page only returns Albums """ source = 'khinsider' domain = "khinsider.com" supported_entries = [ENTRY, ALBUM] __domain_url = f"https://downloads.{domain}" __base_index_url = f"{__domain_url}/game-soundtracks/browse" def _append_index(self, sleep=0, re_pattern=None, url=None): if url: self.__scrap_index(url) _time.sleep(sleep) else: for char in "#ABCDEFGHIJKLMNOPQRSTUVWXYZ": self.append_index(sleep=sleep, re_pattern=re_pattern, url=f'{self.__base_index_url}/{char}') def __scrap_index(self, url: str) -> None: print(f"Scrapping index from {url}") rq = requests.get(url) if rq.status_code != 200: raise Exception('F') else: soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p', attrs={'align': "left"}) for link in soup.findAll('a'): if link.contents is None: pass else: _json: dict = { 'name': link.contents[0], 'url': f'{self.__domain_url}{link.get("href")}', 'source': self.source, 'available': True } self._found_entries.append(ALBUM(**_json)) def _album_from_url(self, url: str) -> ALBUM: # https://downloads.khinsider.com/game-soundtracks/album/persona-5-royal _json: dict = { 'name': "", 'url': url, 'source': self.source, 'available': False } rq = requests.get(url) if rq.status_code != 200: available = False raise Exception('Album not available') else: available = True soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p', attrs={ 'align': "left"}) name = any([line.lower().replace("Album name: ".lower(), "").strip() for line in soup.text.splitlines() if "Number of Files:".lower() in line.lower()]) _json['name'] = name _json['available'] = available return ALBUM(**_json) def _inspect_entry(self, obj) -> None: if type(obj) is ALBUM: obj: ALBUM rq = requests.get(obj.url) if rq.status_code != 200: obj.available = False raise Exception('Album not available') else: obj.date_added = None soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p', attrs={ 'align': "left"}) obj.number_of_tracks = \ [line.lower().replace("Number of Files: ".lower(), "").strip() for line in soup.text.splitlines() if "Number of Files:".lower() in line.lower()][0] obj.date_added = \ [line.lower().replace("Date added: ".lower(), "").strip() for line in soup.text.splitlines() if "Date added: ".lower() in line.lower()][0] obj.available = True def _get_files(self, obj, get_images=True) -> list[str]: file_list: [ENTRY, ALBUM, ] = [] if type(obj) is ALBUM: obj: ALBUM rq = requests.get(obj.url) if rq.status_code != 200: obj.available = False raise Exception('F') else: obj.date_added = None music_soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find( name='table', id='songlist').findAll('tr') if get_images: image_soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').findNext( name="table").findAll(name="a") for img in image_soup: _url = img.get('href') file_list.append(_url) for element in music_soup: td = element.find(name='td', attrs={'class': 'clickable-row'}) if td: _url = f"{self.__domain_url}{td.find('a').get('href')}" rq2 = requests.get(_url) if rq2.status_code != 200: raise Exception('F') else: soup2 = [p.find('a') for p in BeautifulSoup(rq2.text, 'html.parser').find(name='div', id='pageContent').findAll( 'p')[2:] if p.find(name='a')] for element2 in soup2: file_list.append(element2.get("href")) return file_list def __download_found(self): for entry in self.found_entries: self.download_album(entry)