import requests from bs4 import BeautifulSoup from typing import Dict, List, Optional from dataclasses import dataclass, asdict import os import sqlite3 import sys import logging import re import unicodedata from queue import Queue from threading import Thread import threading sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from DataGatherers.cache_manager import CacheManager from db_controller import DBController logger = logging.getLogger('ui_feedback') @dataclass class PokemonForm: id: str # This will be our PFIC name: str form_name: Optional[str] sprite_url: str national_dex: int generation: int def create_pokemon_db(): conn = sqlite3.connect('pokemon_forms.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS pokemon_forms ( PFIC TEXT PRIMARY KEY, name TEXT NOT NULL, form_name TEXT, national_dex INTEGER NOT NULL, generation INTEGER NOT NULL ) ''') conn.commit() return conn def create_pokemon_storage_db(): conn = sqlite3.connect('pokemon_forms.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS pokemon_storage ( PFIC TEXT PRIMARY KEY, storable_in_home BOOLEAN NOT NULL, FOREIGN KEY (PFIC) REFERENCES pokemon_forms (PFIC) ) ''') conn.commit() return conn def initialize_db(): create_pokemon_db() create_pokemon_storage_db() def insert_pokemon_form(db_controller, pokemon_form): db_controller.execute_query_with_commit(''' INSERT OR REPLACE INTO pokemon_forms (PFIC, name, form_name, national_dex, generation) VALUES (?, ?, ?, ?, ?) ''', ( pokemon_form.id, pokemon_form.name, pokemon_form.form_name, pokemon_form.national_dex, pokemon_form.generation )) def insert_pokemon_storage(db_controller, pfic: str, storable_in_home: bool): db_controller.execute_query_with_commit(''' INSERT OR REPLACE INTO pokemon_storage (PFIC, storable_in_home) VALUES (?, ?) ''', (pfic, storable_in_home)) class PokemonDatabase: def __init__(self): self.pokemon: Dict[str, List[PokemonForm]] = {} self._lock = threading.Lock() def add_pokemon(self, national_dex: int, name: str, region_code: int, form_index: int, gender_code: int, form_name: Optional[str], sprite_url: str): pokemon_id = format_pokemon_id(national_dex, region_code, form_index, gender_code) pokemon_form = PokemonForm(id=pokemon_id, name=name, form_name=form_name, sprite_url=sprite_url, national_dex=national_dex, generation=region_code) with self._lock: if national_dex not in self.pokemon: self.pokemon[national_dex] = [] self.pokemon[national_dex].append(pokemon_form) def get_pokemon(self, national_dex: Optional[int] = None, region_code: Optional[int] = None, form_index: Optional[int] = None, gender_code: Optional[int] = None) -> List[PokemonForm]: results = [] for dex_forms in self.pokemon.values(): for form in dex_forms: parts = form.id.split('-') if (national_dex is None or int(parts[0]) == national_dex) and \ (region_code is None or int(parts[1]) == region_code) and \ (form_index is None or int(parts[2]) == form_index) and \ (gender_code is None or int(parts[3]) == gender_code): results.append(form) return results def get_pokemon_by_id(self, pokemon_id: str) -> Optional[PokemonForm]: national_dex = int(pokemon_id.split('-')[0]) if national_dex in self.pokemon: for form in self.pokemon[national_dex]: if form.id == pokemon_id: return form return None def format_pokemon_id(national_dex: int, region_code: int, form_index: int, gender_code: int) -> str: return f"{national_dex:04d}-{region_code:02d}-{form_index:03d}-{gender_code}" def get_pokemon_sprites_page(cache: CacheManager): url = "https://pokemondb.net/sprites" return cache.fetch_url(url) def get_pokemon_sprites_page_data(cache: CacheManager, pokemon_name: str): url = f"https://pokemondb.net/sprites/{pokemon_name}" return cache.fetch_url(url) def get_pokemon_dex_page(cache: CacheManager, pokemon_name: str): url = f"https://pokemondb.net/pokedex/{pokemon_name}" return cache.fetch_url(url) def remove_accents(input_str): nfkd_form = unicodedata.normalize('NFKD', input_str) return u"".join([c for c in nfkd_form if not unicodedata.combining(c)]) def compare_forms(a, b): if a == None or b == None: return False if a == b: return True temp_a = a.lower().replace("forme", "").replace("form", "").replace("é", "e").strip() temp_b = b.lower().replace("forme", "").replace("form", "").replace("é", "e").strip() temp_a = temp_a.replace("deputante", "debutante").replace("p'au", "pa'u").replace("blood moon", "bloodmoon") temp_b = temp_b.replace("deputante", "debutante").replace("p'au", "pa'u").replace("blood moon", "bloodmoon") if temp_a == temp_b: return True return False def download_image(url, filename): response = requests.get(url) if response.status_code == 200: with open(filename, 'wb') as f: f.write(response.content) def worker(queue: Queue, db: PokemonDatabase, pokemon_generations: dict, db_controller: DBController, cache: CacheManager, progress_callback=None): while True: try: # Get task from queue task = queue.get() if task is None: # Poison pill to stop worker break index, mon = task process_single_pokemon(index + 1, mon, db, pokemon_generations, db_controller, cache, progress_callback) except Exception as e: logger.error(f"Error processing pokemon: {e}") finally: queue.task_done() def process_single_pokemon(national_dex_index, mon, db, pokemon_generations, db_controller, cache, progress_callback): generation = 1 for gen in pokemon_generations: if pokemon_generations[gen]["min"] <= national_dex_index <= pokemon_generations[gen]["max"]: generation = gen break pokemon_name = mon.get_text(strip=True) logger.info(pokemon_name) if progress_callback: progress_callback(f"Processing {pokemon_name}") pokemon_url_name = pokemon_name.replace("♀", "-f").replace("♂", "-m").replace("'", "").replace(".", "").replace('é', 'e').replace(':', '') pokemon_url_name = pokemon_url_name.replace(" ", "-") sprites_page_data = get_pokemon_sprites_page_data(cache, pokemon_url_name) if not sprites_page_data: return sprites_soup = BeautifulSoup(sprites_page_data, 'html.parser') generation_8_header = sprites_soup.find('h2', string='Generation 8') if not generation_8_header: return generation_8_table = generation_8_header.find_next('table') if not generation_8_table: return generation_8_tbody = generation_8_table.find('tbody') if not generation_8_tbody: return generation_8_rows = generation_8_tbody.find_all('tr') for row in generation_8_rows: row_text = row.get_text(strip=True) if 'Home' in row_text: sprites = row.find_all('span', class_='sprites-table-card') if not sprites: continue form = 0 for sprite in sprites: sprite_img = sprite.find('img') sprite_url = "missing" if sprite_img: sprite_url = sprite_img.get('src') if "shiny" in sprite_url: continue form_name = "None" if sprite.find('small'): smalls = sprite.find_all('small') form_name = "" for small in smalls: form_name += small.get_text(strip=True) + " " form_name = form_name.strip() logger.info(f'{sprite_url}, {form_name}') if form_name != "None": form += 1 gender = 0 if form_name.startswith("Male"): form -= 1 gender = 1 elif form_name.startswith("Female"): form -= 1 gender = 2 dex_page_data = get_pokemon_dex_page(cache, pokemon_url_name) if dex_page_data: dex_soup = BeautifulSoup(dex_page_data, 'html.parser') #Find a heading that has the pokemon name in it dex_header = dex_soup.find('h1', string=pokemon_name) if dex_header: #The next
tag contains the generation number, in the format "{pokemon name} is a {type}(/{2nd_type}) type Pokémon introduced in Generation {generation number}." generation_tag = dex_header.find_next('p') dex_text = generation_tag.get_text() pattern = r'^(.+?) is a (\w+)(?:/(\w+))? type Pokémon introduced in Generation (\d+)\.$' match = re.match(pattern, dex_text) if match: name, type1, type2, gen = match.groups() generation = int(gen) if form_name != "None": next_tag = generation_tag.find_next('p') if next_tag: extra_text = next_tag.get_text() extra_text = remove_accents(extra_text) form_pattern = r'a(?:n)? (\w+) Form(?:,)? introduced in (?:the )?([\w\s:]+)(?:\/([\w\s:]+))?' update_pattern = r'a(?:n)? (\w+) form(?:,)? available in the latest update to ([\w\s:]+)(?:& ([\w\s:]+))?' multiple_forms_pattern = r'has (?:\w+) new (\w+) Form(?:s)?(?:,)? available in (?:the )?([\w\s:]+)(?:& ([\w\s:]+))?' expansion_pass_pattern = r'a(?:n)? (\w+) form(?:,)? introduced in the Crown Tundra Expansion Pass to ([\w\s:]+)(?:& ([\w\s:]+))?' patterns = [form_pattern, update_pattern, multiple_forms_pattern, expansion_pass_pattern] test_form = form_name.replace(pokemon_name, "").replace("Male", "").replace("Female", "").strip() if pokemon_name == "Tauros" and (form_name == "Aqua Breed" or form_name == "Blaze Breed" or form_name == "Combat Breed"): test_form = "Paldean" for pattern in patterns: matches = re.findall(pattern, extra_text, re.IGNORECASE) generation_found = False for i, (regional, game1, game2) in enumerate(matches, 1): if compare_forms(test_form, regional): target_game = game1.replace("Pokemon", "").strip() result = db_controller.execute_query(''' SELECT g.generation FROM games g LEFT JOIN alternate_game_names agn ON g.id = agn.game_id WHERE g.name = ? OR agn.alternate_name = ? LIMIT 1 ''', (target_game, target_game)) if result: generation = result[0][0] generation_found = True break if generation_found: break pokemon_form = PokemonForm( id=format_pokemon_id(national_dex_index, generation, form, gender), name=pokemon_name, form_name=form_name if form_name != "None" else None, sprite_url=sprite_url, national_dex=national_dex_index, generation=generation ) db.add_pokemon( national_dex_index, pokemon_name, generation, form, gender, form_name if form_name != "None" else None, sprite_url ) insert_pokemon_form(db_controller, pokemon_form) storable_in_home = not any(keyword in form_name.lower() for keyword in ['mega', 'gigantamax']) if form_name else True insert_pokemon_storage(db_controller, pokemon_form.id, storable_in_home) def retrieve_all_pokemon_forms(cache: CacheManager, progress_callback=None): db = PokemonDatabase() db_controller = DBController('pokemon_forms.db', max_connections=20) page_data = get_pokemon_sprites_page(cache) if not page_data: return None soup = BeautifulSoup(page_data, 'html.parser') pokemon = soup.find_all('a', class_='infocard') pokemon_generations = { 1: {"min": 1, "max": 151}, 2: {"min": 152, "max": 251}, 3: {"min": 252, "max": 386}, 4: {"min": 387, "max": 493}, 5: {"min": 494, "max": 649}, 6: {"min": 650, "max": 721}, 7: {"min": 722, "max": 809}, 8: {"min": 810, "max": 905}, 9: {"min": 906, "max": 1025}, } # Create a queue and workers num_workers = 1 # Adjust based on your needs task_queue = Queue() workers = [] for _ in range(num_workers): worker_thread = Thread(target=worker, args=(task_queue, db, pokemon_generations, db_controller, cache, progress_callback)) worker_thread.daemon = True worker_thread.start() workers.append(worker_thread) for index, mon in enumerate(pokemon): task_queue.put((index, mon)) # Add poison pills to stop workers for _ in range(num_workers): task_queue.put(None) task_queue.join() for worker_thread in workers: worker_thread.join() logger.info(f"Total Pokémon forms: {sum(len(forms) for forms in db.pokemon.values())}") logger.info(f"Pokémon with multiple forms: {sum(1 for forms in db.pokemon.values() if len(forms) > 1)}") if not os.path.exists('images-new'): os.makedirs('images-new') for pokemon in db.pokemon.values(): for form in pokemon: filename = f"images-new/{form.id}.png" if os.path.exists(filename): logger.info(f"Image for {form.id} already exists, skipping download") else: download_image(form.sprite_url, filename) logger.info(f"Downloaded image for {form.id}") db_controller.close() if __name__ == "__main__": cache = CacheManager() retrieve_all_pokemon_forms(cache) cache.close()