feature-new-db-implementation #1
Merged
Quildra
merged 21 commits from feature-new-db-implementation into master 1 year ago
15 changed files with 2773 additions and 101 deletions
@ -1,2 +1,8 @@ |
|||
# OriginDex-DataManager |
|||
|
|||
## Notes: |
|||
- Look into generating evolutions by just looking backwards from the current subject. |
|||
- Builbasaur sees nothing, it doesn't evolve from anything |
|||
- Ivysaur sees Bulbasaur |
|||
- Venusaur sees Ivysaur |
|||
- This should help deal with lines that have branching evolutions determines by gender, like Gallade and shouldn't impact the other less complex evolutions. |
|||
@ -0,0 +1,507 @@ |
|||
import sqlite3 |
|||
import threading |
|||
import json |
|||
import os |
|||
import networkx as nx |
|||
|
|||
from utility.data import main_line_games |
|||
|
|||
class DBController: |
|||
def __init__(self, db_path=':memory:', max_connections=10): |
|||
self.db_path = db_path |
|||
self.lock = threading.Lock() |
|||
self.conn = sqlite3.connect(db_path, check_same_thread=False) |
|||
self.conn.row_factory = sqlite3.Row |
|||
self.cursor = self.conn.cursor() |
|||
self.graph = nx.DiGraph() |
|||
self.init_database() |
|||
|
|||
def init_database(self): |
|||
disk_conn = sqlite3.connect('pokemon_forms.db') |
|||
disk_cursor = disk_conn.cursor() |
|||
|
|||
# Create tables in the file-based database |
|||
self.create_pokemon_forms_table(disk_cursor) |
|||
self.create_games_table(disk_cursor) |
|||
self.create_encounters_table(disk_cursor) |
|||
|
|||
# Commit changes to the file-based database |
|||
disk_conn.commit() |
|||
|
|||
# Copy the file-based database to the in-memory database |
|||
disk_conn.backup(self.conn) |
|||
|
|||
# Close the file-based database connection |
|||
disk_conn.close() |
|||
|
|||
if os.path.exists("pokemon_evolution_graph.json"): |
|||
with open("pokemon_evolution_graph.json", "r") as f: |
|||
data = json.load(f) |
|||
self.graph = nx.node_link_graph(data) |
|||
|
|||
def save_changes(self): |
|||
with self.lock: |
|||
# Count the number of records before backup for verification |
|||
self.cursor.execute('SELECT COUNT(*) FROM pokemon_forms') |
|||
count = self.cursor.fetchone()[0] |
|||
print(f"Records in memory before backup: {count}") |
|||
|
|||
# Back up the master connection to disk |
|||
disk_conn = sqlite3.connect('pokemon_forms.db') |
|||
with disk_conn: |
|||
self.conn.backup(disk_conn) |
|||
disk_conn.close() |
|||
|
|||
data = nx.node_link_data(self.graph) |
|||
with open("pokemon_evolution_graph.json", "w") as f: |
|||
json.dump(data, f) |
|||
|
|||
def close(self): |
|||
self.save_changes() |
|||
self.conn.close() |
|||
|
|||
def create_pokemon_forms_table(self, cursor): |
|||
cursor.execute(''' |
|||
CREATE TABLE IF NOT EXISTS pokemon_forms ( |
|||
PFIC TEXT PRIMARY KEY, |
|||
data JSON NOT NULL |
|||
) |
|||
''') |
|||
|
|||
def create_encounters_table(self, cursor): |
|||
cursor.execute(''' |
|||
CREATE TABLE IF NOT EXISTS encounters ( |
|||
PFIC TEXT, |
|||
game_id INTEGER NOT NULL, |
|||
type TEXT NOT NULL, |
|||
data JSON NOT NULL, |
|||
FOREIGN KEY (PFIC) REFERENCES pokemon_forms (PFIC), |
|||
FOREIGN KEY (game_id) REFERENCES games (id) |
|||
) |
|||
''') |
|||
|
|||
def create_games_table(self, cursor): |
|||
cursor.execute(''' |
|||
CREATE TABLE IF NOT EXISTS games ( |
|||
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|||
name TEXT NOT NULL UNIQUE, |
|||
alt_names TEXT, |
|||
generation INTEGER NOT NULL, |
|||
mark TEXT NOT NULL, |
|||
data JSON |
|||
) |
|||
''') |
|||
|
|||
for game in main_line_games: |
|||
name = game["Name"] |
|||
alt_names = ", ".join(game["AltNames"]) # Convert list to comma-separated string |
|||
generation = game["Generation"] |
|||
mark = game["Mark"] |
|||
|
|||
cursor.execute(''' |
|||
INSERT OR IGNORE INTO games (name, alt_names, generation, mark) |
|||
VALUES (?, ?, ?, ?) |
|||
''', (name, alt_names, generation, mark)) |
|||
|
|||
def add_pokemon_form(self, pfic, name, form_name, national_dex, generation, sprite_url, gender_relevant): |
|||
data = { |
|||
"name": name, |
|||
"form_name": form_name, |
|||
"national_dex": national_dex, |
|||
"generation": generation, |
|||
"sprite_url": sprite_url, |
|||
"is_baby_form": False, |
|||
"storable_in_home": False, |
|||
"gender_relevant": gender_relevant |
|||
} |
|||
|
|||
with self.lock: |
|||
self.cursor.execute(''' |
|||
INSERT OR REPLACE INTO pokemon_forms (PFIC, data) VALUES (?, ?) |
|||
''', (pfic, json.dumps(data))) |
|||
self.conn.commit() |
|||
print(f"Added: {pfic}, {name}") |
|||
|
|||
def craft_pokemon_json_query(self, fields_to_include, pfic = None): |
|||
query = f"SELECT " |
|||
extracts = [] |
|||
for field in fields_to_include: |
|||
if field == "pfic": |
|||
extracts.append("PFIC as pfic") |
|||
else: |
|||
extracts.append(f"JSON_EXTRACT(data, '$.{field}') AS {field}") |
|||
query = query + ", ".join(extracts) |
|||
query = query + " FROM pokemon_forms" |
|||
|
|||
if pfic is not None: |
|||
query = query + f" WHERE PFIC = '{pfic}'" |
|||
|
|||
return query |
|||
|
|||
def get_pokemon_details(self, pfic, fields = None): |
|||
if fields == None: |
|||
fields = [ |
|||
"name", |
|||
"form_name", |
|||
"national_dex", |
|||
"generation", |
|||
"is_baby_form", |
|||
"storable_in_home", |
|||
"gender_relevant" |
|||
] |
|||
query = self.craft_pokemon_json_query(fields, pfic) |
|||
self.cursor.execute(query) |
|||
results = self.cursor.fetchone() |
|||
return dict(results) |
|||
|
|||
def get_pokemon_details_by_name(self, name, fields = None): |
|||
if fields == None: |
|||
fields = [ |
|||
"pfic", |
|||
"name", |
|||
"form_name", |
|||
"national_dex", |
|||
"generation", |
|||
"is_baby_form", |
|||
"storable_in_home", |
|||
"gender_relevant" |
|||
] |
|||
query = self.craft_pokemon_json_query(fields) |
|||
name = name.replace("'", "''") |
|||
query += f" WHERE JSON_EXTRACT(data, '$.name') = '{name}'" |
|||
self.cursor.execute(query) |
|||
results = self.cursor.fetchall() |
|||
return [dict(row) for row in results] |
|||
|
|||
def get_list_of_pokemon_forms(self): |
|||
fields = [ |
|||
"pfic", |
|||
"name", |
|||
"form_name", |
|||
"national_dex", |
|||
"generation", |
|||
"is_baby_form", |
|||
"storable_in_home", |
|||
"gender_relevant" |
|||
] |
|||
|
|||
query = self.craft_pokemon_json_query(fields) |
|||
self.cursor.execute(query) |
|||
results = self.cursor.fetchall() |
|||
|
|||
return [dict(row) for row in results] |
|||
|
|||
def update_home_status(self, pfic, status): |
|||
self.update_pokemon_field(pfic, "storable_in_home", status) |
|||
pass |
|||
|
|||
def update_mark(self, pfic, data): |
|||
self.update_pokemon_field(pfic, "mark", data) |
|||
pass |
|||
|
|||
def update_pokemon_field(self, pfic, field_name, new_value): |
|||
# Fetch the existing record |
|||
self.cursor.execute('SELECT data FROM pokemon_forms WHERE PFIC = ?', (pfic,)) |
|||
result = self.cursor.fetchone() |
|||
|
|||
if result: |
|||
# Load the JSON data and update the field |
|||
data = json.loads(result[0]) |
|||
data[field_name] = new_value |
|||
|
|||
# Update the record with the modified JSON |
|||
updated_data_str = json.dumps(data) |
|||
self.cursor.execute(''' |
|||
UPDATE pokemon_forms |
|||
SET data = ? |
|||
WHERE PFIC = ? |
|||
''', (updated_data_str, pfic)) |
|||
self.conn.commit() |
|||
|
|||
def update_evolution_graph(self, evolutions): |
|||
for key in evolutions: |
|||
value = evolutions[key] |
|||
from_pfic = value["from_pfic"] |
|||
to_pfic = value["to_pfic"] |
|||
method = value["method"] |
|||
|
|||
# Add nodes if they do not already exist |
|||
if not self.graph.has_node(from_pfic): |
|||
self.graph.add_node(from_pfic) |
|||
|
|||
if not self.graph.has_node(to_pfic): |
|||
self.graph.add_node(to_pfic) |
|||
|
|||
# Add the edge representing the evolution, with the method as an attribute |
|||
self.graph.add_edge(from_pfic, to_pfic, method=method) |
|||
|
|||
def get_evolution_graph(self, pfic): |
|||
if self.graph.has_node(pfic) == False: |
|||
return [] |
|||
|
|||
return list(self.graph.successors(pfic)) |
|||
|
|||
def get_previous_evolution(self, pfic): |
|||
if self.graph.has_node(pfic) == False: |
|||
return None, None |
|||
|
|||
predecessor = next(self.graph.predecessors(pfic), None) |
|||
|
|||
if predecessor: |
|||
method = self.graph[predecessor][pfic]["method"] |
|||
return predecessor, method |
|||
else: |
|||
return None, None |
|||
|
|||
def get_evolution_paths(self, start_node): |
|||
paths = [] |
|||
|
|||
if self.graph.has_node(start_node) == False: |
|||
return paths |
|||
|
|||
# Define a recursive function to traverse the graph |
|||
def traverse(current_node, current_path, is_root=False): |
|||
if is_root: |
|||
# Add the current node to the path as a tuple (node, None) |
|||
current_path.append((current_node, None)) |
|||
|
|||
# Get successors of the current node |
|||
successors = list(self.graph.successors(current_node)) |
|||
|
|||
if not successors: |
|||
# If there are no successors, add the current path to paths list |
|||
paths.append(current_path.copy()) |
|||
else: |
|||
# Traverse each successor and add edge metadata |
|||
for successor in successors: |
|||
method = self.graph[current_node][successor]["method"] |
|||
# Add the successor node and method as a tuple (successor, method) |
|||
current_path.append((successor, method)) |
|||
|
|||
# Recur for the successor |
|||
traverse(successor, current_path) |
|||
|
|||
# Backtrack (remove the last node and edge metadata) |
|||
current_path.pop() |
|||
|
|||
# Remove the initial node tuple when backtracking fully |
|||
if is_root: |
|||
current_path.pop() |
|||
|
|||
# Start traversal from the start_node |
|||
traverse(start_node, [], True) |
|||
|
|||
return paths |
|||
|
|||
def get_full_evolution_paths(self, start_node): |
|||
""" |
|||
Get all evolution paths starting from a given node, including predecessors and successors. |
|||
:param start_node: The starting node (e.g., a specific Pokemon form). |
|||
:return: A dictionary containing predecessors and successors paths. |
|||
""" |
|||
full_paths = { |
|||
"predecessors": [], |
|||
"successors": [] |
|||
} |
|||
|
|||
if self.graph.has_node(start_node) == False: |
|||
return full_paths |
|||
|
|||
# Traverse predecessors |
|||
def traverse_predecessors(current_node, current_path, is_root=False): |
|||
#if not is_root: |
|||
# Add the current node to the path |
|||
#current_path.append(current_node) |
|||
|
|||
# Get predecessors of the current node |
|||
predecessors = list(self.graph.predecessors(current_node)) |
|||
|
|||
if not predecessors: |
|||
# If there are no predecessors, add the current path to the list |
|||
full_paths["predecessors"].append(current_path.copy()) |
|||
else: |
|||
# Traverse each predecessor |
|||
for predecessor in predecessors: |
|||
method = self.graph[predecessor][current_node]["method"] |
|||
# Add the edge metadata as a tuple (predecessor, method) |
|||
current_path.append((predecessor, method)) |
|||
|
|||
# Recur for the predecessor |
|||
traverse_predecessors(predecessor, current_path) |
|||
|
|||
# Backtrack (remove the last node and edge metadata) |
|||
current_path.pop() |
|||
#current_path.pop() |
|||
|
|||
# Traverse successors |
|||
def traverse_successors(current_node, current_path, is_root=False): |
|||
if is_root: |
|||
# Add the current node to the path as a tuple (node, None) |
|||
predecessor = next(self.graph.predecessors(current_node), None) |
|||
if predecessor: |
|||
method = self.graph[predecessor][current_node]["method"] |
|||
current_path.append((current_node, method)) |
|||
else: |
|||
current_path.append((current_node, None)) |
|||
|
|||
# Get successors of the current node |
|||
successors = list(self.graph.successors(current_node)) |
|||
|
|||
if not successors: |
|||
# If there are no successors, add the current path to paths list |
|||
full_paths["successors"].append(current_path.copy()) |
|||
else: |
|||
# Traverse each successor and add edge metadata |
|||
for successor in successors: |
|||
method = self.graph[current_node][successor]["method"] |
|||
# Add the successor node and method as a tuple (successor, method) |
|||
current_path.append((successor, method)) |
|||
|
|||
# Recur for the successor |
|||
traverse_successors(successor, current_path) |
|||
|
|||
# Backtrack (remove the last node and edge metadata) |
|||
current_path.pop() |
|||
|
|||
if is_root: |
|||
# Remove the initial node tuple when backtracking fully |
|||
current_path.pop() |
|||
|
|||
# Start traversal from the start_node for both predecessors and successors |
|||
traverse_predecessors(start_node, [], True) |
|||
traverse_successors(start_node, [], True) |
|||
|
|||
return full_paths |
|||
|
|||
def propagate_gender_relevance(self, gender_relevant_nodes): |
|||
""" |
|||
Propagate gender relevance through the evolution graph and update the SQLite database. |
|||
:param db_path: Path to the SQLite database file. |
|||
:param gender_relevant_nodes: A set of nodes that are initially marked as gender-relevant. |
|||
""" |
|||
|
|||
# Traverse from each gender-relevant end node backward to propagate relevance |
|||
for node in gender_relevant_nodes: |
|||
# Use breadth-first search or depth-first search to traverse backward |
|||
visited = set() |
|||
stack = [node] |
|||
|
|||
while stack: |
|||
current_node = stack.pop() |
|||
if current_node not in visited: |
|||
visited.add(current_node) |
|||
|
|||
# Update the gender_relevant flag in the database |
|||
self.update_pokemon_field(current_node, "gender_relevant", True) |
|||
|
|||
# Add predecessors to the stack to keep traversing backward |
|||
if self.graph.has_node(current_node): |
|||
predecessors = list(self.graph.predecessors(current_node)) |
|||
stack.extend(predecessors) |
|||
|
|||
self.save_changes() |
|||
|
|||
def get_gender_specific_evolutions(self): |
|||
""" |
|||
Get a list of nodes that have evolution methods indicating gender relevance (i.e., '(male)' or '(female)'). |
|||
:return: A list of nodes involved in gender-specific evolutions. |
|||
""" |
|||
gender_specific_nodes = [] |
|||
|
|||
for from_node, to_node, edge_data in self.graph.edges(data=True): |
|||
method = edge_data.get("method", "") |
|||
if method and ("(male)" in method.lower() or "(female)" in method.lower()): |
|||
# Add both nodes involved in this gender-specific evolution |
|||
gender_specific_nodes.extend([from_node, to_node]) |
|||
|
|||
return list(set(gender_specific_nodes)) # Return unique nodes |
|||
|
|||
def get_gender_relevant_pokemon(self): |
|||
self.cursor.execute(f"SELECT PFIC FROM pokemon_forms WHERE JSON_EXTRACT(data, '$.gender_relevant') = true") |
|||
results = self.cursor.fetchall() |
|||
return [row['PFIC'] for row in results] |
|||
|
|||
def get_game_id_by_name(self, name): |
|||
# First try: exact match against the `name` column |
|||
self.cursor.execute(''' |
|||
SELECT id, name, generation FROM games |
|||
WHERE name = ? |
|||
''', (name,)) |
|||
|
|||
# Fetch the result |
|||
result = self.cursor.fetchone() |
|||
|
|||
# If no exact match found, try matching using `LIKE` with both `name` and `alt_names` |
|||
if not result: |
|||
self.cursor.execute(''' |
|||
SELECT id, name, generation FROM games |
|||
WHERE name LIKE ? OR alt_names LIKE ? |
|||
''', (f"%{name}%", f"%{name}%")) |
|||
|
|||
# Fetch the result from the second query |
|||
result = self.cursor.fetchone() |
|||
|
|||
print(f"ID: {result[0]}, Name: {result[1]}, Generation: {result[2]}") |
|||
|
|||
return dict(result) |
|||
|
|||
def get_games_by_name(self, name): |
|||
self.cursor.execute(''' |
|||
SELECT id, name, generation FROM games |
|||
WHERE name LIKE ? OR alt_names LIKE ? |
|||
''', (f"%{name}%", f"%{name}%")) |
|||
|
|||
# Fetch and print the results |
|||
results = self.cursor.fetchall() |
|||
return [dict(row) for row in results] |
|||
|
|||
def get_game_by_id(self, id): |
|||
self.cursor.execute(''' |
|||
SELECT * FROM games |
|||
WHERE id = ? |
|||
''', (id,)) |
|||
|
|||
# Fetch and print the results |
|||
result = self.cursor.fetchone() |
|||
return dict(result) |
|||
|
|||
def get_games_by_generation(self, generation): |
|||
self.cursor.execute(''' |
|||
SELECT id, name FROM games |
|||
WHERE generation = ? |
|||
''', (generation,)) |
|||
|
|||
# Fetch and print the results |
|||
results = self.cursor.fetchall() |
|||
for row in results: |
|||
print(f"ID: {row[0]}, Name: {row[1]}") |
|||
|
|||
return [dict(row) for row in results] |
|||
|
|||
def update_encounter_locations(self, data): |
|||
for encounter in data: |
|||
with self.lock: |
|||
pfic = encounter["pfic"] |
|||
game_id = encounter["game_id"]["id"] |
|||
type = encounter["type"] |
|||
data = encounter["data"] if "data" in encounter else None |
|||
self.cursor.execute(''' |
|||
INSERT OR REPLACE INTO encounters (PFIC, game_id, type, data) VALUES (?, ?, ?, ?) |
|||
''', (pfic, game_id, type, json.dumps(data))) |
|||
self.conn.commit() |
|||
print(f"Added: {pfic}") |
|||
pass |
|||
|
|||
def get_encounters(self, pfic, type=None): |
|||
query = ''' |
|||
SELECT * FROM encounters |
|||
WHERE PFIC = ? |
|||
''' |
|||
if type: |
|||
query += f"AND type = '{type}'" |
|||
|
|||
self.cursor.execute(query, (pfic,)) |
|||
|
|||
# Fetch and print the results |
|||
results = self.cursor.fetchall() |
|||
return [dict(row) for row in results] |
|||
@ -0,0 +1,2 @@ |
|||
from database.db_controller import DBController |
|||
db = DBController() |
|||
@ -0,0 +1,204 @@ |
|||
from PyQt6.QtCore import QObject, pyqtSignal, QRunnable |
|||
import json |
|||
|
|||
from cache import cache |
|||
from db import db |
|||
|
|||
from utility.functions import get_display_name, get_shiftable_forms |
|||
|
|||
class CalculateOriginMarkWorkerSignals(QObject): |
|||
finished = pyqtSignal(dict) |
|||
|
|||
class CalculateOriginMarkWorker(QRunnable): |
|||
def __init__(self): |
|||
super().__init__() |
|||
self.signals = CalculateOriginMarkWorkerSignals() |
|||
self.marks = {} |
|||
|
|||
def run(self): |
|||
try: |
|||
gathered_data = self.calculate_marks() |
|||
self.signals.finished.emit(gathered_data) |
|||
except Exception as e: |
|||
print(f"Error gathering Pokémon home storage status: {e}") |
|||
|
|||
def calculate_marks(self): |
|||
all_pokemon_forms = db.get_list_of_pokemon_forms() |
|||
for form_entry in all_pokemon_forms: |
|||
if form_entry["storable_in_home"] == False: |
|||
continue |
|||
|
|||
print(f"Determining mark for {get_display_name(form_entry)}") |
|||
|
|||
target_generation = form_entry["generation"] |
|||
pfic = form_entry["pfic"] |
|||
|
|||
#Rule 1 |
|||
# 1. If a pokemon form has a previous evolution from within the same generation, |
|||
# use the mark of the previous evolution. This should be recursive within the same generation. |
|||
print("Checking Rule 1") |
|||
chain = db.get_full_evolution_paths(pfic) |
|||
if chain and (len(chain["predecessors"]) > 0 or len(chain["successors"]) > 0): |
|||
base_form_in_generation = None |
|||
last_pfic = pfic |
|||
current_pfic = pfic |
|||
while True: |
|||
current_pfic, _ = db.get_previous_evolution(current_pfic) |
|||
if current_pfic == None: |
|||
base_form_in_generation = last_pfic |
|||
break |
|||
chain_pokemon_data = db.get_pokemon_details(current_pfic) |
|||
if chain_pokemon_data["generation"] == target_generation: |
|||
base_form_in_generation = current_pfic |
|||
else: |
|||
base_form_in_generation = last_pfic |
|||
break |
|||
last_pfic = current_pfic |
|||
|
|||
if base_form_in_generation and base_form_in_generation != pfic: |
|||
print(f"Base form in generation for {get_display_name(form_entry)} is {base_form_in_generation}") |
|||
mark_id = self.determine_origin_mark(base_form_in_generation, target_generation) |
|||
if mark_id != None: |
|||
self.marks[pfic] = mark_id |
|||
continue |
|||
elif base_form_in_generation == pfic: |
|||
mark_id = self.determine_origin_mark(pfic, target_generation) |
|||
if mark_id != None: |
|||
self.marks[pfic] = mark_id |
|||
continue |
|||
|
|||
#Rule 2 |
|||
# If a pokemon form has no previous evolution from within the same generation, |
|||
# look at the encounters of the pokemon form from this generation and use the mark of the earliest |
|||
# game you can encounter that form in from that generation |
|||
print("Checking Rule 2") |
|||
mark_id = self.determine_origin_mark(pfic, target_generation) |
|||
if mark_id != None: |
|||
self.marks[pfic] = mark_id |
|||
continue |
|||
|
|||
#Rule 3 |
|||
# If there are no encounters for the pokemon form from this generation, |
|||
# look to see if a previous evolution has an encounter from this generation, and use the mark of the earliest |
|||
# game from this generation that the previous evolution is encounterable in. |
|||
print("Checking Rule 3") |
|||
mark_id = self.test_evolve_encounters(pfic, target_generation) |
|||
if mark_id != None: |
|||
self.marks[pfic] = mark_id |
|||
continue |
|||
|
|||
#Rule 3b |
|||
# Check to see if this is a sub-form pokemon, and if so, use the mark of the base form. |
|||
random_encounters = db.get_encounters(pfic, "random") |
|||
static_encounters = db.get_encounters(pfic, "static") |
|||
encounters = [] |
|||
encounters.extend(random_encounters) |
|||
encounters.extend(static_encounters) |
|||
count = 0 |
|||
if encounters: |
|||
for encounter in encounters: |
|||
game_info = db.get_game_by_id(encounter["game_id"]) |
|||
if game_info["generation"] == target_generation: |
|||
count += 1 |
|||
if count == 0: |
|||
|
|||
shiftable_forms = get_shiftable_forms(pfic) |
|||
if len(shiftable_forms) > 0: |
|||
form_found = False |
|||
for shiftable_form in shiftable_forms: |
|||
mark_id = self.determine_origin_mark(shiftable_form["to_pfic"], target_generation) |
|||
if mark_id != None: |
|||
self.marks[pfic] = mark_id |
|||
form_found = True |
|||
break |
|||
if form_found: |
|||
continue |
|||
|
|||
#Rule 4 |
|||
# If there are no encounters for the pokemon form or its evolution line from this generation, |
|||
# use the mark of the earliest game of the generation is marked as being introducted in. |
|||
if encounters: |
|||
earliest_game = 100 |
|||
for encounter in encounters: |
|||
game_info = db.get_game_by_id(encounter["game_id"]) |
|||
if game_info["id"] <= earliest_game: |
|||
earliest_game = game_info["id"] |
|||
if earliest_game < 100: |
|||
form_info = db.get_pokemon_details(pfic) |
|||
mark_id = game_info["mark"] |
|||
if mark_id == None: |
|||
print(f"No mark found for {get_display_name(form_info)}") |
|||
pass |
|||
else: |
|||
print(f"Mark for {get_display_name(form_info)} is {mark_id}") |
|||
self.marks[pfic] = mark_id |
|||
continue |
|||
|
|||
event_encounters = db.get_encounters(pfic, "event") |
|||
if event_encounters: |
|||
earliest_game = 100 |
|||
for encounter in event_encounters: |
|||
game_info = game_info = db.get_game_by_id(encounter["game_id"]) |
|||
if game_info["id"] <= earliest_game: |
|||
earliest_game = game_info["id"] |
|||
if earliest_game < 100: |
|||
form_info = db.get_pokemon_details(pfic) |
|||
mark_id = game_info["mark"] |
|||
if mark_id == None: |
|||
print(f"No mark found for {get_display_name(form_info)}") |
|||
else: |
|||
print(f"Mark for {get_display_name(form_info)} is {mark_id}") |
|||
self.marks[pfic] = mark_id |
|||
continue |
|||
|
|||
return self.marks |
|||
|
|||
def determine_origin_mark(self, pfic, target_generation): |
|||
shiftable_forms = get_shiftable_forms(pfic) |
|||
if len(shiftable_forms) > 0: |
|||
for shiftable_form in shiftable_forms: |
|||
mark_id = self.determine_origin_mark(shiftable_form["to_pfic"], target_generation) |
|||
return mark_id |
|||
encounters = db.get_encounters(pfic) |
|||
if encounters: |
|||
generation_encounters = [] |
|||
for encounter in encounters: |
|||
game_info = db.get_game_by_id(encounter["game_id"]) |
|||
encounter["game"] = game_info |
|||
if encounter["game"]["generation"] == target_generation: |
|||
generation_encounters.append(encounter) |
|||
if len(generation_encounters) > 0: |
|||
generation_encounters = sorted(generation_encounters, key=lambda x: x["game"]["generation"]) |
|||
form_info = db.get_pokemon_details(pfic) |
|||
game_info = generation_encounters[0]["game"] |
|||
mark_id = game_info["mark"] |
|||
if mark_id == None: |
|||
#self.logger.info(f"No mark found for {form_info[0]} {form_info[1]}") |
|||
print(f"No mark found for {get_display_name(form_info)}") |
|||
else: |
|||
#self.logger.info(f"Mark for {form_info[0]} {form_info[1]} is {mark_id}") |
|||
print(f"Mark for {get_display_name(form_info)} is {mark_id}") |
|||
return mark_id |
|||
return None |
|||
|
|||
def test_evolve_encounters(self, pfic, target_generation): |
|||
evolve_encounters = db.get_encounters(pfic, "evolve") |
|||
if evolve_encounters: |
|||
available_encounters = [] |
|||
for encounter in evolve_encounters: |
|||
game_info = db.get_game_by_id(encounter["game_id"]) |
|||
if game_info["generation"] == target_generation: |
|||
available_encounters.append(encounter) |
|||
|
|||
if len(available_encounters) > 0: |
|||
available_encounters = sorted(available_encounters, key=lambda x: x.game_id) |
|||
data = json.loads(available_encounters[0]["data"]) |
|||
mark_id = self.determine_origin_mark(data["from_pfic"], target_generation) |
|||
if mark_id != None: |
|||
return mark_id |
|||
|
|||
mark_id = self.test_evolve_encounters(data["from_pfic"], target_generation) |
|||
if mark_id != None: |
|||
return mark_id |
|||
|
|||
return None |
|||
@ -0,0 +1,707 @@ |
|||
from PyQt6.QtCore import QObject, pyqtSignal, QRunnable |
|||
from bs4 import BeautifulSoup, NavigableString |
|||
from pattern.en import singularize |
|||
from fuzzywuzzy import fuzz |
|||
import re |
|||
|
|||
from cache import cache |
|||
from db import db |
|||
|
|||
from utility.data import default_forms, regional_descriptors, days, times, rods |
|||
from utility.functions import is_mainline_game, compare_pokemon_forms, find_match_in_string_array, extract_bracketed_text |
|||
from utility.pokemon_word_ninja import PokemonWordNinja |
|||
|
|||
class GatherEncountersWorkerSignals(QObject): |
|||
finished = pyqtSignal(list) |
|||
|
|||
class GatherEncountersWorker(QRunnable): |
|||
def __init__(self): |
|||
super().__init__() |
|||
self.signals = GatherEncountersWorkerSignals() |
|||
self.default_forms_set = set(default_forms) |
|||
self.splitter = PokemonWordNinja() |
|||
self.encounters_to_ignore = [ |
|||
"trade", |
|||
"time capsule", |
|||
"unobtainable", |
|||
"tradeversion", |
|||
"poké transfer", |
|||
"friend safari", |
|||
"unavailable", |
|||
"pokémon home", |
|||
"union circle", |
|||
"pokémon bank", |
|||
"pal park", |
|||
"transfer from dream radar", |
|||
"global link event", |
|||
"pokémon channel", |
|||
"pokémon colosseum bonus disc" |
|||
] |
|||
self.encounters = [] |
|||
|
|||
def run(self): |
|||
try: |
|||
gathered_data = self.gather_encounter_data() |
|||
self.signals.finished.emit(gathered_data) |
|||
except Exception as e: |
|||
print(f"Error gathering Pokémon forms: {e}") |
|||
|
|||
def gather_encounter_data(self): |
|||
all_pokemon_forms = db.get_list_of_pokemon_forms() |
|||
|
|||
for form_entry in all_pokemon_forms: |
|||
form = form_entry["form_name"] |
|||
name = form_entry["name"] |
|||
pfic = form_entry["pfic"] |
|||
|
|||
print(f'Processing {name}') |
|||
self.splitter.add_custom_word(name) |
|||
|
|||
if form and name in form: |
|||
form = form.replace(name, "").strip() |
|||
|
|||
if form and form.startswith("Female"): |
|||
form = form.replace("Female", "").strip() |
|||
|
|||
if form and form.startswith("Male"): |
|||
form = form.replace("Male", "").strip() |
|||
|
|||
if form and form in default_forms: |
|||
form = None |
|||
|
|||
if name == "Unown" and (form != "!" and form != "?"): |
|||
form = None |
|||
|
|||
if name == "Tauros" and form == "Combat Breed": |
|||
form = "Paldean Form" |
|||
|
|||
if name == "Alcremie": |
|||
form = None |
|||
|
|||
if name == "Minior": |
|||
form = None |
|||
|
|||
if name.lower() == "ho-oh": |
|||
name = "Ho-Oh" |
|||
|
|||
if form == "": |
|||
form = None |
|||
|
|||
search_form = form |
|||
|
|||
encounter_data = self.get_locations_from_bulbapedia(name, search_form) |
|||
if encounter_data == None: |
|||
continue |
|||
|
|||
for encounter in encounter_data: |
|||
if len(encounter_data[encounter]) == 0: |
|||
break |
|||
|
|||
for location in encounter_data[encounter]: |
|||
if location == "": |
|||
continue |
|||
test_location = location["location"].strip().lower() |
|||
test_location_text = BeautifulSoup(test_location, 'html.parser').get_text().lower() |
|||
if "evolve" in test_location_text: |
|||
remaining, details = self.extract_additional_information(location["tag"]) |
|||
evolve_info = self.extract_evolve_information(remaining, form_entry["form_name"]) |
|||
|
|||
if evolve_info: |
|||
#logger.info(f"Evolve Info: {evolve_info}") |
|||
self.save_evolve_encounter(pfic, encounter, details["days"], details["times"], evolve_info["evolve_from"]) |
|||
elif "event" in test_location_text: |
|||
#logger.info(f"Event: {location['location']}") |
|||
self.save_event_encounter(pfic, encounter) |
|||
else: |
|||
remaining, details = self.extract_additional_information(location["tag"]) |
|||
routes, remaining = self.extract_routes(remaining) |
|||
#logger.info(f"Routes: {routes}") |
|||
#logger.info(f"Remaining: {remaining.strip()}") |
|||
#logger.info(f"Details: {details}") |
|||
|
|||
if len(details["times"]) > 0: |
|||
#logger.info("Stupid Data") |
|||
pass |
|||
|
|||
for route in routes: |
|||
route_name = f"Route {route}" |
|||
self.save_encounter(pfic, encounter, route_name, details["days"], details["times"], details["dual_slot"], details["static_encounter"], details["static_encounter_count"], details["extra_text"], details["stars"], details["Rods"], details["Fishing"], details["starter"] ) |
|||
|
|||
if remaining != "": |
|||
remaining_locations = remaining.replace(" and ", ",").split(",") |
|||
for remaining_location in remaining_locations: |
|||
if remaining_location.strip() == "": |
|||
continue |
|||
|
|||
ignore_location = False |
|||
for ignore in self.encounters_to_ignore: |
|||
if ignore in remaining_location.lower(): |
|||
ignore_location = True |
|||
break |
|||
|
|||
if ignore_location: |
|||
continue |
|||
|
|||
self.save_encounter(pfic, encounter, remaining_location.strip(), details["days"], details["times"], details["dual_slot"], details["static_encounter"], details["static_encounter_count"], details["extra_text"], details["stars"], details["Rods"], details["Fishing"], details["starter"] ) |
|||
|
|||
return self.encounters |
|||
|
|||
|
|||
def get_locations_from_bulbapedia(self, pokemon_name, form, force_refresh = False): |
|||
url = f"https://bulbapedia.bulbagarden.net/wiki/{pokemon_name}_(Pokémon)" |
|||
page_data = cache.fetch_url(url) |
|||
if not page_data: |
|||
return None |
|||
|
|||
cache_key = f'locations_{url}_data' |
|||
|
|||
if force_refresh: |
|||
cache.purge(cache_key) |
|||
|
|||
cached_entry = cache.get(cache_key) |
|||
if cached_entry != None: |
|||
return cached_entry |
|||
|
|||
soup = BeautifulSoup(page_data, 'html.parser') |
|||
if not soup: |
|||
return None |
|||
|
|||
# Try different methods to find the locations table |
|||
locations_table = None |
|||
possible_headers = ['Game locations', 'In side games', 'In spin-off games'] |
|||
|
|||
for header in possible_headers: |
|||
span = soup.find('span', id=header.replace(' ', '_')) |
|||
if span: |
|||
locations_table = span.find_next('table', class_='roundy') |
|||
if locations_table: |
|||
break |
|||
|
|||
if not locations_table: |
|||
print(f"Warning: Couldn't find locations table for {pokemon_name}") |
|||
return None |
|||
|
|||
raw_game_locations = {} |
|||
|
|||
generation_tbody = locations_table.find('tbody', recursive=False) |
|||
generation_rows = generation_tbody.find_all('tr', recursive=False) |
|||
for generation_row in generation_rows: |
|||
random_nested_td = generation_row.find('td', recursive=False) |
|||
if not random_nested_td: |
|||
continue |
|||
random_nested_table = random_nested_td.find('table', recursive=False) |
|||
if not random_nested_table: |
|||
continue |
|||
random_nested_tbody = random_nested_table.find('tbody', recursive=False) |
|||
random_nested_rows = random_nested_tbody.find_all('tr', recursive=False) |
|||
|
|||
for nested_row in random_nested_rows: |
|||
if 'Generation' in nested_row.get_text(strip=True): |
|||
continue |
|||
|
|||
games_container_td = nested_row.find('td', recursive=False) |
|||
if not games_container_td: |
|||
continue |
|||
games_container_table = games_container_td.find('table', recursive=False) |
|||
if not games_container_table: |
|||
continue |
|||
games_container_tbody = games_container_table.find('tbody', recursive=False) |
|||
games_container_rows = games_container_tbody.find_all('tr', recursive=False) |
|||
for games_container_row in games_container_rows: |
|||
games = games_container_row.find_all('th') |
|||
for game in games: |
|||
raw_game = game.get_text(strip=True) |
|||
if is_mainline_game(raw_game) == None: |
|||
continue |
|||
locations_container_td = games_container_row.find('td', recursive=False) |
|||
if not locations_container_td: |
|||
continue |
|||
locations_container_table = locations_container_td.find('table', recursive=False) |
|||
if not locations_container_table: |
|||
continue |
|||
locations_container_tbody = locations_container_table.find('tbody', recursive=False) |
|||
locations = locations_container_tbody.find_all('td') |
|||
for location in locations: |
|||
groups = self.split_td_contents(location) |
|||
for group in groups: |
|||
if raw_game not in raw_game_locations: |
|||
raw_game_locations[raw_game] = [] |
|||
raw_game_locations[raw_game].append(group) |
|||
|
|||
# Process events |
|||
events_section = soup.find('span', id='In_events') |
|||
event_tables = self.process_event_tables(events_section) if events_section else {} |
|||
|
|||
game_locations = {} |
|||
for raw_game, raw_locations in raw_game_locations.items(): |
|||
encounters = self.process_game_locations(raw_game, raw_locations, form) |
|||
if encounters and len(encounters) > 0: |
|||
game_locations[raw_game] = encounters |
|||
|
|||
# Process event tables |
|||
for variant in event_tables: |
|||
if (variant == pokemon_name and form is None) or (form and form in variant): |
|||
self.process_event_table(event_tables[variant], game_locations) |
|||
|
|||
cache.set(cache_key, game_locations) |
|||
return game_locations |
|||
|
|||
def split_td_contents(self, td): |
|||
groups = [] |
|||
current_group = [] |
|||
for content in td.contents: |
|||
if isinstance(content, NavigableString): |
|||
text = content.strip() |
|||
if text: |
|||
current_group.append(content) |
|||
elif content.name == 'br': |
|||
if current_group: |
|||
groups.append(''.join(str(item) for item in current_group)) |
|||
current_group = [] |
|||
else: |
|||
current_group.append(content) |
|||
if current_group: |
|||
groups.append(''.join(str(item) for item in current_group)) |
|||
return groups |
|||
|
|||
def process_game_locations(self, raw_game, raw_locations, form): |
|||
locations = [] |
|||
|
|||
for raw_location in raw_locations: |
|||
raw_text = raw_location |
|||
forms = self.parse_form_information(raw_location) |
|||
if form is None: |
|||
if len(forms) > 0: |
|||
for form_info in forms: |
|||
main_form = form_info["main_form"] |
|||
if default_forms and main_form and main_form in self.default_forms_set: |
|||
main_form = None |
|||
|
|||
if main_form and (main_form != "All Forms" and main_form != "Kantonian Form" and main_form != "All Sizes"): |
|||
continue |
|||
|
|||
locations.append({"location": raw_text, "tag": raw_location}) |
|||
else: |
|||
locations.append({"location": raw_text, "tag": raw_location}) |
|||
elif len(forms) > 0: |
|||
for form_info in forms: |
|||
if self.form_matches(form_info, form, default_forms): |
|||
locations.append({"location": raw_text, "tag": raw_location}) |
|||
else: |
|||
form_info = {"main_form": None, "sub_form": None, "region": None} |
|||
if self.form_matches(form_info, form, default_forms): |
|||
locations.append({"location": raw_text, "tag": raw_location}) |
|||
|
|||
return locations if locations else None |
|||
|
|||
def process_event_tables(self, events_section): |
|||
event_tables = {} |
|||
if events_section: |
|||
next_element = events_section.parent.find_next_sibling() |
|||
while next_element and next_element.name != 'h3': |
|||
if next_element.name == 'h5': |
|||
variant = next_element.text.strip() |
|||
table = next_element.find_next_sibling('table', class_='roundy') |
|||
if table: |
|||
event_tables[variant] = table |
|||
next_element = next_element.find_next_sibling() |
|||
return event_tables |
|||
|
|||
def parse_form_information(self, html_content): |
|||
soup = BeautifulSoup(html_content, 'html.parser') |
|||
|
|||
#TODO: This wont work for lines that have several small blocks in one line. |
|||
#TODO: Adjust this to handle more than one small block, see Basculin for example |
|||
small_tag = soup.find('small') |
|||
|
|||
forms = [] |
|||
# Form info is in bold inside a small tag. |
|||
if small_tag: |
|||
bold_tags = small_tag.find_all('b') |
|||
for bold_tag in bold_tags: |
|||
form_text = bold_tag.get_text(strip=True) |
|||
|
|||
# Remove parentheses |
|||
form_text = form_text.strip('()') |
|||
|
|||
if "/" in form_text: |
|||
last_word = singularize(form_text.split()[-1]) |
|||
form_text = form_text.replace(last_word, "").strip() |
|||
parts = form_text.split('/') |
|||
for part in parts: |
|||
main_form = part.strip() + " " + last_word |
|||
info = { |
|||
"main_form": main_form, |
|||
"sub_form": None |
|||
} |
|||
forms.append(info) |
|||
continue |
|||
|
|||
# Split the text into main form and breed (if present) |
|||
parts = form_text.split('(') |
|||
main_form = parts[0].strip() |
|||
|
|||
# "Factor"s are not actual forms, they are properties of the pokemon you can encoutner. |
|||
if main_form and "factor" in main_form.lower(): |
|||
continue |
|||
|
|||
breed = parts[1].strip(')') if len(parts) > 1 else None |
|||
|
|||
info = { |
|||
"main_form": main_form, |
|||
"sub_form": breed |
|||
} |
|||
|
|||
for region in regional_descriptors: |
|||
if region in main_form.lower(): |
|||
info["region"] = region |
|||
break |
|||
|
|||
forms.append(info) |
|||
else: #..... Gimmighoul |
|||
headings = soup.find_all('b') |
|||
if len(headings) > 0: |
|||
for heading in headings: |
|||
if heading.parent.name == 'sup': |
|||
continue |
|||
if "form" not in heading.get_text(strip=True).lower(): |
|||
continue |
|||
main_form = heading.get_text(strip=True) |
|||
info = { |
|||
"main_form": main_form, |
|||
"sub_form": None |
|||
} |
|||
|
|||
for region in regional_descriptors: |
|||
if region in main_form.lower(): |
|||
info["region"] = region |
|||
break |
|||
|
|||
forms.append(info) |
|||
|
|||
return forms |
|||
|
|||
def form_matches(self, form_info, form, default_forms): |
|||
main_form = form_info["main_form"] |
|||
sub_form = form_info["sub_form"] |
|||
try: |
|||
region = form_info['region'] if 'region' in form_info else None |
|||
except KeyError: |
|||
region = None |
|||
|
|||
if default_forms and main_form and main_form in default_forms: |
|||
main_form = None |
|||
|
|||
if form.lower() in ["spring form", "summer form", "autumn form", "winter form"] and main_form == None: |
|||
return True |
|||
|
|||
if main_form is None: |
|||
return False |
|||
|
|||
if main_form in ["All Forms", "All Sizes"]: |
|||
return True |
|||
|
|||
if region == None and main_form in ["Kantonian Form"]: |
|||
return True |
|||
|
|||
main_form_match = compare_pokemon_forms(form, main_form) or fuzz.partial_ratio(form.lower(), main_form.lower()) >= 95 |
|||
sub_form_match = compare_pokemon_forms(form, sub_form) or (sub_form and fuzz.partial_ratio(form.lower(), sub_form.lower()) >= 95) |
|||
|
|||
if not main_form_match and not sub_form_match and region: |
|||
region_match = compare_pokemon_forms(form, region) or fuzz.partial_ratio(form.lower(), region.lower()) >= 95 |
|||
return region_match |
|||
|
|||
return main_form_match or sub_form_match |
|||
|
|||
def extract_routes(self, s): |
|||
# Find all route numbers, including those after "and" or separated by commas |
|||
route_pattern = r'Routes?\s+((?:\d+(?:,?\s+(?:and\s+)?)?)+)' |
|||
route_match = re.search(route_pattern, s, re.IGNORECASE) |
|||
|
|||
if route_match: |
|||
# Extract all numbers from the matched group |
|||
numbers = re.findall(r'\d+', route_match.group(1)) |
|||
|
|||
# Remove the extracted part from the original string |
|||
remaining = s[:route_match.start()] + s[route_match.end():].lstrip(', ') |
|||
|
|||
return numbers, remaining |
|||
else: |
|||
return [], s |
|||
|
|||
def extract_additional_information(self, s): |
|||
details = {} |
|||
details["days"] = [] |
|||
details["times"] = [] |
|||
details["dual_slot"] = None |
|||
details["static_encounter_count"] = 0 |
|||
details["static_encounter"] = False |
|||
details["starter"] = False |
|||
details["extra_text"] = [] |
|||
details["stars"] = [] |
|||
details["Fishing"] = False |
|||
details["Rods"] = [] |
|||
|
|||
if s is None: |
|||
return "", details |
|||
|
|||
soup = BeautifulSoup(s, 'html.parser') |
|||
full_text = soup.get_text() |
|||
sup_tags = soup.find_all('sup') |
|||
sup_text = [] |
|||
|
|||
if "first partner" in full_text.lower(): |
|||
details["starter"] = True |
|||
|
|||
for sup_tag in sup_tags: |
|||
text = sup_tag.get_text(strip=True) |
|||
|
|||
if find_match_in_string_array(text, days): |
|||
details["days"].append(text) |
|||
sup_text.append(text) |
|||
|
|||
if find_match_in_string_array(text, times): |
|||
details["times"].append(text) |
|||
sup_text.append(text) |
|||
|
|||
bracket_text = extract_bracketed_text(full_text) |
|||
|
|||
for text in bracket_text: |
|||
text = text.strip() |
|||
text_lower = text.lower() |
|||
|
|||
game = is_mainline_game(text_lower) |
|||
if game != None: |
|||
details["dual_slot"] = game["Name"] |
|||
text = re.sub(game["Name"], '', text_lower, flags=re.IGNORECASE) |
|||
|
|||
match = find_match_in_string_array(text_lower, days) |
|||
if match: |
|||
details["days"].append(match) |
|||
text = re.sub(match, '', text_lower, flags=re.IGNORECASE) |
|||
|
|||
match = find_match_in_string_array(text_lower, times) |
|||
if match: |
|||
details["times"].append(match) |
|||
text = re.sub(match, '', text_lower, flags=re.IGNORECASE) |
|||
|
|||
if "only one" in text_lower: |
|||
details["static_encounter_count"] = 1 |
|||
details["static_encounter"] = True |
|||
text = re.sub(r'only one', '', text_lower, flags=re.IGNORECASE) |
|||
elif "only two" in text_lower: |
|||
details["static_encounter_count"] = 2 |
|||
details["static_encounter"] = True |
|||
text = re.sub(r'only two', '', text_lower, flags=re.IGNORECASE) |
|||
|
|||
if "rod" in text_lower: |
|||
match = find_match_in_string_array(text_lower, rods) |
|||
if match: |
|||
details["Fishing"] = True |
|||
details["Rods"].append(match) |
|||
text = re.sub(match, '', text_lower, flags=re.IGNORECASE) |
|||
|
|||
if "★" in text: |
|||
star_parts = re.findall(r'\d★,*', text) |
|||
for part in star_parts: |
|||
details["stars"].append(part.replace(',', '').strip()) |
|||
text = re.sub(r'\d★,*', '', text) |
|||
|
|||
if text.strip() != "": |
|||
details["extra_text"].append(text.strip()) |
|||
sup_text.append(text.strip()) |
|||
|
|||
if len(sup_text) > 0: |
|||
for text in sup_text: |
|||
full_text = full_text.replace(text, "") |
|||
|
|||
if len(bracket_text) > 0: |
|||
for text in bracket_text: |
|||
full_text = full_text.replace(text, "") |
|||
full_text = full_text.replace('(', "").replace(')', "") |
|||
|
|||
return full_text.strip(), details |
|||
else: |
|||
return full_text, details |
|||
|
|||
def extract_evolve_information(self, s: str, search_form): |
|||
details = {} |
|||
if s is None or s == "": |
|||
return details |
|||
|
|||
s = s.replace("Evolve", "") |
|||
|
|||
parts = s.split(" ") |
|||
|
|||
if len(parts) >= 1: |
|||
target_pokemon = parts[0].strip() |
|||
|
|||
form = None |
|||
if "♀" in target_pokemon: |
|||
target_pokemon = target_pokemon.replace("♀", "").strip() |
|||
form = "Female" |
|||
|
|||
if "♂" in target_pokemon: |
|||
target_pokemon = target_pokemon.replace("♂", "").strip() |
|||
form = "Male" |
|||
|
|||
results = db.get_pokemon_details_by_name(target_pokemon) |
|||
|
|||
if results: |
|||
for result in results: |
|||
if compare_pokemon_forms(result["form_name"], form): |
|||
details["evolve_from"] = result["pfic"] |
|||
|
|||
if results and "evolve_from" not in details: |
|||
for result in results: |
|||
if compare_pokemon_forms(result["form_name"], search_form if search_form != form else "Male"): |
|||
details["evolve_from"] = result["pfic"] |
|||
|
|||
return details |
|||
|
|||
def save_evolve_encounter(self, pfic, game, days, times, from_pfic): |
|||
game_id = db.get_game_id_by_name(game) |
|||
|
|||
encounter = { |
|||
"pfic": pfic, |
|||
"game_id": game_id, |
|||
"type": "evolve", |
|||
"data": { |
|||
"day": None, |
|||
"time": None, |
|||
"from_pfic": from_pfic, |
|||
} |
|||
} |
|||
|
|||
if len(days) > 0: |
|||
for day in days: |
|||
encounter["data"]["day"] = day |
|||
encounter["data"]["time"] = None |
|||
self.encounters.append(encounter) |
|||
|
|||
elif len(times) > 0: |
|||
for time in times: |
|||
encounter["data"]["day"] = None |
|||
encounter["data"]["time"] = time |
|||
self.encounters.append(encounter) |
|||
else: |
|||
encounter["data"]["day"] = None |
|||
encounter["data"]["time"] = None |
|||
self.encounters.append(encounter) |
|||
|
|||
def save_event_encounter(self, pfic, game): |
|||
game_id = db.get_game_id_by_name(game) |
|||
|
|||
encounter = { |
|||
"pfic": pfic, |
|||
"game_id": game_id, |
|||
"type": "event" |
|||
} |
|||
|
|||
self.encounters.append(encounter) |
|||
|
|||
def save_encounter(self, pfic, game, location, days, times, dual_slot, static_encounter, static_encounter_count, extra_text, stars, rods, fishing, starter): |
|||
game_id = db.get_game_id_by_name(game) |
|||
extra_text_str = ' '.join(extra_text) if extra_text else None |
|||
stars_str = ','.join(sorted(stars)) if stars else None |
|||
rods_str = ','.join(sorted(rods)) if rods else None |
|||
|
|||
encounter_type = "random" |
|||
|
|||
if starter: |
|||
encounter_type = "starter" |
|||
|
|||
if static_encounter: |
|||
encounter_type = "static" |
|||
|
|||
encounter = { |
|||
"pfic": pfic, |
|||
"game_id": game_id, |
|||
"type": encounter_type, |
|||
"data": { |
|||
"location": location, |
|||
"day": None, |
|||
"time": None, |
|||
"dual_slot": dual_slot, |
|||
"extra_text": extra_text_str, |
|||
"stars": stars_str, |
|||
"rods": rods_str, |
|||
"fishing": fishing |
|||
} |
|||
} |
|||
|
|||
if static_encounter: |
|||
encounter["data"]["static_encounter_count"] = static_encounter_count |
|||
|
|||
if len(days) > 0: |
|||
for day in days: |
|||
encounter["data"]["day"] = day |
|||
encounter["data"]["time"] = None |
|||
self.encounters.append(encounter) |
|||
|
|||
elif len(times) > 0: |
|||
for time in times: |
|||
encounter["data"]["day"] = None |
|||
encounter["data"]["time"] = time |
|||
self.encounters.append(encounter) |
|||
|
|||
else: |
|||
encounter["data"]["day"] = None |
|||
encounter["data"]["time"] = None |
|||
self.encounters.append(encounter) |
|||
|
|||
def process_event_tables(self, events_section): |
|||
event_tables = {} |
|||
if events_section: |
|||
next_element = events_section.parent.find_next_sibling() |
|||
while next_element and next_element.name != 'h3': |
|||
if next_element.name == 'h5': |
|||
variant = next_element.text.strip() |
|||
table = next_element.find_next_sibling('table', class_='roundy') |
|||
if table: |
|||
event_tables[variant] = table |
|||
next_element = next_element.find_next_sibling() |
|||
return event_tables |
|||
|
|||
def process_event_table(self, table, game_locations): |
|||
for row in table.find_all('tr')[1:]: # Skip header row |
|||
cells = row.find_all('td') |
|||
if len(cells) >= 6: # Ensure all required columns are present |
|||
# Extract game names as a list |
|||
game_links = cells[0].find_all('a') |
|||
individual_games = [] |
|||
|
|||
for link in game_links: |
|||
# Replace specific known prefixes |
|||
game_name = link['title'].replace("Pokémon ", "").replace("Versions", "").replace(" Version", "").replace(" (Japanese)", "") |
|||
|
|||
# Split on " and ", which is used for combined games |
|||
parsed_names = game_name.split(" and ") |
|||
|
|||
# Add the parsed names to the list |
|||
individual_games.extend(parsed_names) |
|||
|
|||
# Print extracted game names for debugging |
|||
print(f"Extracted game names from row: {individual_games}") |
|||
|
|||
# Filter games to include only those in all_games |
|||
matching_games = [] |
|||
|
|||
for game in individual_games: |
|||
match = is_mainline_game(game) |
|||
if match: |
|||
matching_games.append(game) |
|||
|
|||
# Print matching games for debugging |
|||
print(f"Matching games after filtering: {matching_games}") |
|||
|
|||
if matching_games: |
|||
location = cells[2].text.strip() |
|||
distribution_period = cells[5].text.strip() |
|||
for game in matching_games: |
|||
if game not in game_locations: |
|||
game_locations[game] = [] |
|||
game_locations[game].append({ |
|||
"location": f"Event: {location}", |
|||
"tag": str(cells[2]) |
|||
}) |
|||
@ -0,0 +1,406 @@ |
|||
from typing import Optional |
|||
from PyQt6.QtCore import QObject, pyqtSignal, QRunnable |
|||
from bs4 import BeautifulSoup, Tag |
|||
from fuzzywuzzy import fuzz |
|||
from number_parser import parse_ordinal |
|||
from cache import cache |
|||
from db import db |
|||
|
|||
import re |
|||
|
|||
from utility.functions import get_form_name, get_display_name, parse_pfic |
|||
from utility.data import non_evolution_forms, alcremie_forms |
|||
|
|||
class GatherEvolutionsWorkerSignals(QObject): |
|||
finished = pyqtSignal(dict) |
|||
|
|||
class GatherEvolutions(QRunnable): |
|||
def __init__(self): |
|||
super().__init__() |
|||
self.signals = GatherEvolutionsWorkerSignals() |
|||
self.base_url = "https://bulbapedia.bulbagarden.net/wiki/" |
|||
|
|||
self.evolution_methods = set() |
|||
|
|||
def run(self): |
|||
try: |
|||
gathered_data = self.gather_evolution_data() |
|||
self.signals.finished.emit(gathered_data) |
|||
except Exception as e: |
|||
print(f"Error gathering Pokémon home storage status: {e}") |
|||
|
|||
def gather_evolution_data(self, force_refresh = False): |
|||
all_pokemon_forms = db.get_list_of_pokemon_forms() |
|||
evolutions = {} |
|||
|
|||
for pokemon_form in all_pokemon_forms: |
|||
print(f"Processing {get_display_name(pokemon_form)}'s evolutions") |
|||
pokemon_name = pokemon_form["name"] |
|||
form = get_form_name(pokemon_form) |
|||
|
|||
if pokemon_form["form_name"] and any(s in pokemon_form["form_name"] for s in non_evolution_forms): |
|||
continue |
|||
|
|||
cache_record_name = f"chain_{pokemon_name}_{form}" |
|||
if force_refresh: |
|||
cache.purge(cache_record_name) |
|||
|
|||
cached_entry = cache.get(cache_record_name) |
|||
if cached_entry != None: |
|||
evolutions = evolutions | cached_entry |
|||
continue |
|||
|
|||
#form = get_form_name(pokemon_form, not pokemon_form["gender_relevant"]) |
|||
search_form = form |
|||
if search_form and pokemon_name in search_form: |
|||
search_form = search_form.replace(pokemon_name, "").strip() |
|||
|
|||
gender = None |
|||
if search_form and "male" in search_form.lower(): |
|||
gender = search_form |
|||
search_form = None |
|||
|
|||
if pokemon_name == "Flabébé": |
|||
# Bulbapedia doesn't detail out Flabébé's evolution chain fully. as its exactly the same for each form, but the coloured form remains constant |
|||
# through the evolution line, Red->Red->Red, Yellow->Yellow->Yellow etc. |
|||
search_form = None |
|||
|
|||
url = f"https://bulbapedia.bulbagarden.net/wiki/{pokemon_name}_(Pokémon)" |
|||
page_data = cache.fetch_url(url) |
|||
if not page_data: |
|||
continue |
|||
|
|||
soup = BeautifulSoup(page_data, 'html.parser') |
|||
evolution_section = soup.find('span', id='Evolution_data') |
|||
if not evolution_section: |
|||
continue |
|||
|
|||
evolution_table = None |
|||
evolution_table = evolution_section.parent.find_next('table') |
|||
if form: |
|||
form_without_form = form.replace('Form', '').replace('form', '').strip() |
|||
for tag in evolution_section.parent.find_next_siblings(): |
|||
if tag.name == 'h4' and form_without_form in tag.get_text(strip=True): |
|||
evolution_table = tag.find_next('table') |
|||
break |
|||
if tag.name == 'h3': |
|||
break |
|||
if not evolution_table: |
|||
continue |
|||
|
|||
evolution_tree = None |
|||
if pokemon_name == "Eevee": |
|||
evolution_tree = self.parse_eevee_evolution_chain(evolution_table, pokemon_form) |
|||
else: |
|||
evolution_tree = self.parse_evolution_chain(evolution_table, pokemon_form) |
|||
|
|||
if evolution_tree["pokemon"] == "Milcery": |
|||
evolution_tree["evolves_to"] = [] |
|||
for alcremie_form in alcremie_forms: |
|||
node = { |
|||
"pokemon": "Alcremie", |
|||
"form": alcremie_form, |
|||
"requirement": None, |
|||
"method": "Complicated", |
|||
"evolves_to": [], |
|||
"stage": 1 |
|||
} |
|||
evolution_tree["evolves_to"].append(node) |
|||
|
|||
cacheable_container = {} |
|||
if evolution_tree: |
|||
self.traverse_and_store(evolution_tree, cacheable_container, gender) |
|||
|
|||
cache.set(cache_record_name, cacheable_container) |
|||
evolutions = evolutions | cacheable_container |
|||
|
|||
print(self.evolution_methods) |
|||
return evolutions |
|||
|
|||
def traverse_and_store(self, node, evolutions, gender): |
|||
"""Helper function to traverse evolution tree and store evolutions.""" |
|||
from_pfic = self.get_pokemon_form_by_name(node["pokemon"], node["form"], gender=gender) |
|||
if not from_pfic: |
|||
return |
|||
|
|||
for next_stage in node["evolves_to"]: |
|||
to_pfic = self.get_pokemon_form_by_name(next_stage["pokemon"], next_stage["form"], gender=gender) |
|||
if to_pfic: |
|||
composite_key = f"{from_pfic}->{to_pfic}" |
|||
evolution_info = { |
|||
"from_pfic": from_pfic, |
|||
"to_pfic": to_pfic, |
|||
"method": next_stage["method"] |
|||
} |
|||
evolutions[composite_key] = (evolution_info) |
|||
self.traverse_and_store(next_stage, evolutions, gender) |
|||
|
|||
def parse_evolution_chain(self, table, pokemon_form, force_refresh = False): |
|||
cache_record_name = f"evo_{pokemon_form['pfic']}" |
|||
if force_refresh: |
|||
cache.purge(cache_record_name) |
|||
|
|||
cached_entry = cache.get(cache_record_name) |
|||
if cached_entry is not None: |
|||
return cached_entry |
|||
|
|||
form = get_form_name(pokemon_form, not pokemon_form["gender_relevant"]) |
|||
|
|||
tbody = table.find('tbody', recursive=False) |
|||
if not tbody: |
|||
return None |
|||
|
|||
rows = tbody.find_all('tr', recursive=False) |
|||
main_row = rows[0] |
|||
branch_rows = rows[1:] |
|||
|
|||
def create_stage(td): |
|||
pokemon_name = self.extract_pokemon_name(td) |
|||
evolution_form = self.extract_evolution_form(td, pokemon_name) |
|||
stage = self.extract_stage_form(td).replace("Evolution", "").replace("evolution", "").strip() |
|||
numberical_stage = -1 |
|||
if stage == "Unevolved" or stage == "Baby form": |
|||
numberical_stage = 0 |
|||
elif stage == "Castoff": |
|||
numberical_stage = 1 |
|||
else: |
|||
numberical_stage = parse_ordinal(stage) |
|||
return { |
|||
"pokemon": pokemon_name, |
|||
"form": evolution_form, |
|||
"requirement": None, |
|||
"method": None, |
|||
"evolves_to": [], |
|||
"stage": numberical_stage |
|||
} |
|||
|
|||
# Parse main evolution chain |
|||
pending_method = None |
|||
pending_method_form = None |
|||
root = None |
|||
current_stage = None |
|||
|
|||
for td in main_row.find_all('td', recursive=False): |
|||
if td.find('table'): |
|||
new_stage = create_stage(td) |
|||
new_stage["method"] = pending_method |
|||
new_stage["requirement"] = pending_method_form |
|||
pending_method = None |
|||
if root is None: |
|||
root = new_stage # Assign the root node |
|||
if current_stage: |
|||
current_stage["evolves_to"].append(new_stage) |
|||
current_stage = new_stage |
|||
else: |
|||
pending_method, pending_method_form = self.extract_evolution_method(td) |
|||
|
|||
# Parse branching evolutions |
|||
for row in branch_rows: |
|||
branch_method = None |
|||
pending_method_form = None |
|||
branch_stage = None |
|||
|
|||
for td in row.find_all('td', recursive=False): |
|||
if td.find('table'): |
|||
new_stage = create_stage(td) |
|||
new_stage["method"] = branch_method |
|||
new_stage["requirement"] = pending_method_form |
|||
branch_method = None |
|||
|
|||
if branch_stage: |
|||
branch_stage["evolves_to"].append(new_stage) |
|||
else: |
|||
# Find which main chain Pokémon this branch evolves from |
|||
attached = False |
|||
for main_stage in self.find_stages(root): |
|||
if self.should_attach_branch(main_stage, new_stage): |
|||
main_stage["evolves_to"].append(new_stage) |
|||
attached = True |
|||
break |
|||
|
|||
if not attached: |
|||
print(f"Warning: Could not find a suitable attachment point for branch {new_stage['pokemon']}") |
|||
branch_stage = new_stage |
|||
else: |
|||
branch_method, pending_method_form = self.extract_evolution_method(td) |
|||
|
|||
cache.set(cache_record_name, root) |
|||
return root |
|||
|
|||
def should_attach_branch(self, main_stage, branch_stage): |
|||
# Ensure the main_stage is a valid node to attach a branch |
|||
if main_stage["stage"] == branch_stage["stage"] - 1: |
|||
return True |
|||
# You can add more logic to determine if branch_stage should connect to main_stage |
|||
# For instance, check if they are forms of the same evolution or based on other criteria |
|||
return False |
|||
|
|||
def find_stages(self, node): |
|||
"""Helper function to find all stages in the evolution chain recursively.""" |
|||
stages = [node] |
|||
for stage in node["evolves_to"]: |
|||
stages.extend(self.find_stages(stage)) |
|||
return stages |
|||
|
|||
def extract_pokemon_name(self, td: Tag) -> Optional[str]: |
|||
name_tag = self.find_name_tag(td) |
|||
if name_tag: |
|||
return name_tag.get_text(strip=True) |
|||
return None |
|||
|
|||
def find_name_tag(self, td: Tag) -> Optional[Tag]: |
|||
table = td.find('table') |
|||
name_tag = table.find('a', class_='selflink') |
|||
if name_tag: |
|||
return name_tag |
|||
name_tag = table.find('a', title=True, class_=lambda x: x != 'image') |
|||
return name_tag |
|||
|
|||
def extract_stage_form(self, td: Tag) -> Optional[str]: |
|||
stage_tag = td.find('table').find('small') |
|||
if stage_tag: |
|||
return stage_tag.get_text(strip=True) |
|||
return None |
|||
|
|||
def extract_evolution_form(self, td: Tag, name: str) -> Optional[str]: |
|||
name_tag = self.find_name_tag(td) |
|||
if name_tag: |
|||
name_row = name_tag.parent |
|||
small_tags = name_row.find_all('small') |
|||
if len(small_tags) > 1: |
|||
return small_tags[0].get_text(strip=True) |
|||
return None |
|||
|
|||
def extract_evolution_method(self, td: Tag) -> str: |
|||
# Extract evolution method from the TD |
|||
text = td.get_text() |
|||
form = None |
|||
if text and "(male)" in text.lower(): |
|||
form = "male" |
|||
elif text and "(female)" in text.lower(): |
|||
form = "female" |
|||
|
|||
return td.get_text(strip=True), form |
|||
|
|||
def parse_eevee_evolution_chain(self, table, pokemon_form): |
|||
tbody = table.find('tbody', recursive=False) |
|||
if not tbody: |
|||
return [] |
|||
|
|||
def create_stage(td): |
|||
pokemon_name = self.extract_pokemon_name(td) |
|||
stage = self.extract_stage_form(td) |
|||
return { |
|||
"pokemon": pokemon_name, |
|||
"form": None, |
|||
"method": None, |
|||
"evolves_to": [] |
|||
} |
|||
|
|||
rows = tbody.find_all('tr', recursive=False) |
|||
eevee_row = rows[1] |
|||
method_row = rows[2] |
|||
eeveelutions_row = rows[3] |
|||
|
|||
eevee_td = eevee_row.find('td', recursive=False) |
|||
eevee_stage = create_stage(eevee_td) |
|||
#pokemon_name, stage = self.parse_pokemon_subtable(eevee_td) |
|||
#eevee_stage = { |
|||
# "pokemon":pokemon_name, |
|||
# "method": None, |
|||
# "stage": stage, |
|||
# "form": None, |
|||
# "next_stage": None, |
|||
# "previous_stage": None, |
|||
# "branches": [], |
|||
# "pfic": pokemon_form["pfic"] |
|||
#} |
|||
|
|||
methods = [] |
|||
for method in method_row.find_all('td', recursive=False): |
|||
methods.append(self.extract_evolution_method(method)) |
|||
|
|||
eeveelutions = [] |
|||
index = 0 |
|||
for eeveelution in eeveelutions_row.find_all('td', recursive=False): |
|||
#pokemon_name, stage = self.parse_pokemon_subtable(eeveelution) |
|||
#eeveelution_stage = { |
|||
# "pokemon":pokemon_name, |
|||
# "method": methods[index], |
|||
# "stage": stage, |
|||
# "form": None, |
|||
# "next_stage": None, |
|||
# "previous_stage": None, |
|||
# "branches": [], |
|||
# "pfic": pokemon_form["pfic"] |
|||
#} |
|||
eeveelution_stage = create_stage(eeveelution) |
|||
#eeveelution_stage["previous_stage"] = eevee_stage # Set the back link to Eevee |
|||
eeveelutions.append(eeveelution_stage) |
|||
index += 1 |
|||
|
|||
eevee_stage["evolves_to"] = eeveelutions # Set the branches directly, not as a nested list |
|||
|
|||
return eevee_stage |
|||
|
|||
def parse_pokemon_subtable(self, td): |
|||
if td.find('table'): |
|||
# This TD contains Pokemon information |
|||
pokemon_name = self.extract_pokemon_name(td) |
|||
stage = self.extract_stage_form(td) |
|||
return pokemon_name, stage |
|||
return None, None |
|||
|
|||
def get_pokemon_form_by_name(self, name: str, form: Optional[str] = None, threshold: int = 80, gender: Optional[str] = None): |
|||
fields = [ |
|||
"pfic", |
|||
"name", |
|||
"form_name" |
|||
] |
|||
results = db.get_pokemon_details_by_name(name, fields) |
|||
#results = db_controller.execute_query('SELECT PFIC, name, form_name FROM pokemon_forms WHERE name = ?', (name,)) |
|||
|
|||
if not results: |
|||
return None |
|||
|
|||
results.sort(key=lambda x: parse_pfic(x["pfic"])) |
|||
|
|||
if form is None and gender is None: |
|||
if len(results) > 1: |
|||
if results[0]["form_name"] == None: |
|||
return results[0]["pfic"] |
|||
else: |
|||
return self.get_pokemon_form_by_name(name, "Male", threshold=100, gender=gender) |
|||
else: |
|||
return results[0]["pfic"] # Return the PFIC of the first result if no form is specified |
|||
|
|||
if gender: |
|||
gendered_form = self.get_pokemon_form_by_name(name, gender, threshold=100) |
|||
if gendered_form: |
|||
return gendered_form |
|||
|
|||
stripped_form = self.strip_pokemon_name(name, form) |
|||
|
|||
for entry in results: |
|||
stripped_db_form = self.strip_pokemon_name(entry["name"], entry["form_name"]) |
|||
if self.fuzzy_match_form(stripped_form, stripped_db_form, threshold): |
|||
return entry["pfic"] |
|||
|
|||
# Some times we get a form for a pokemon that doesn't really have one. |
|||
#if len(results) > 1 and form != None and gender and threshold != 100: |
|||
# return results[0]["pfic"] |
|||
|
|||
return None |
|||
|
|||
def strip_pokemon_name(self, pokemon_name: str, form_name: str) -> str: |
|||
if form_name: |
|||
form_name = form_name.replace("Form", "").strip() |
|||
form_name = re.sub(f'{re.escape(pokemon_name)}\\s*', '', form_name, flags=re.IGNORECASE).strip() |
|||
form_name = form_name.replace(" ", " ") |
|||
return form_name |
|||
return form_name |
|||
|
|||
def fuzzy_match_form(self, form1: str, form2: str, threshold: int = 80) -> bool: |
|||
if form1 is None or form2 is None: |
|||
return form1 == form2 |
|||
return fuzz.ratio(form1.lower(), form2.lower()) >= threshold |
|||
@ -0,0 +1,169 @@ |
|||
from PyQt6.QtCore import QObject, pyqtSignal, QRunnable |
|||
from bs4 import BeautifulSoup |
|||
from cache import cache |
|||
|
|||
from utility.data import regions, default_forms |
|||
from utility.functions import get_objects_by_number, compare_pokemon_forms |
|||
from db import db |
|||
|
|||
class GatherHomeStorageStatusWorkerSignals(QObject): |
|||
finished = pyqtSignal(list) |
|||
|
|||
class GatherHomeStorageStatus(QRunnable): |
|||
def __init__(self): |
|||
super().__init__() |
|||
self.signals = GatherHomeStorageStatusWorkerSignals() |
|||
self.base_url = "https://www.serebii.net/pokemonhome/" |
|||
|
|||
def run(self): |
|||
try: |
|||
gathered_data = self.gather_home_storage_data() |
|||
self.signals.finished.emit(gathered_data) |
|||
except Exception as e: |
|||
print(f"Error gathering Pokémon home storage status: {e}") |
|||
|
|||
def gather_home_storage_data(self): |
|||
all_pokemon_forms = db.get_list_of_pokemon_forms() |
|||
pfics_that_can_go_to_home = [] |
|||
pokemon_by_national_dex = {} |
|||
|
|||
for region in regions: |
|||
pokemon_list = self.scrape_region_for_pokemon(region) |
|||
for pokemon in pokemon_list: |
|||
national_dex = int(pokemon['number']) |
|||
if national_dex not in pokemon_by_national_dex: |
|||
pokemon_by_national_dex[national_dex] = [] |
|||
pokemon_by_national_dex[national_dex].append(pokemon) |
|||
|
|||
default_forms_set = set(default_forms) |
|||
|
|||
for pokemon_form in all_pokemon_forms: |
|||
storable_in_home = False |
|||
name = pokemon_form["name"] |
|||
national_dex = pokemon_form["national_dex"] |
|||
working_form = pokemon_form["form_name"] |
|||
|
|||
if national_dex not in pokemon_by_national_dex: |
|||
continue |
|||
|
|||
if working_form and name in working_form: |
|||
working_form = working_form.replace(name, "").strip() |
|||
|
|||
if working_form: |
|||
working_form = working_form.replace("Female", "").replace("female", "").strip() |
|||
working_form = working_form.replace("Male", "").replace("male", "").strip() |
|||
|
|||
# serebii doesn't list gender in the table so we have to assume based on form name. |
|||
if working_form and ("male" in working_form.lower() or "working_form" in working_form.lower()): |
|||
working_form = None |
|||
|
|||
if name == "Unown" and (working_form not in ["!", "?"]): |
|||
working_form = None |
|||
|
|||
if name == "Tauros" and working_form == "Combat Breed": |
|||
working_form = "Paldean Form" |
|||
|
|||
# serebii just gave up on Alcremie. It has 36 uniquie forms all storable in home. |
|||
if name == "Alcremie": |
|||
working_form = None |
|||
|
|||
if working_form == "": |
|||
working_form = None |
|||
|
|||
for pokemon in pokemon_by_national_dex[national_dex]: |
|||
if working_form: |
|||
parts = pokemon['name'].split(" ") |
|||
if len(parts) > 1 and parts[0] == working_form: |
|||
storable_in_home = True |
|||
break |
|||
|
|||
brackets = self.extract_bracketed_text(pokemon['name']) |
|||
if brackets: |
|||
for bracket in brackets: |
|||
if name in bracket: |
|||
bracket = bracket.replace(name, "").strip() |
|||
if compare_pokemon_forms(working_form, bracket): |
|||
storable_in_home = True |
|||
break |
|||
|
|||
if not storable_in_home and working_form in default_forms_set: |
|||
working_form = None |
|||
|
|||
if working_form == None and name.lower() in pokemon['name'].lower(): |
|||
storable_in_home = True |
|||
break |
|||
|
|||
if storable_in_home: |
|||
pfics_that_can_go_to_home.append(pokemon_form["pfic"]) |
|||
|
|||
return pfics_that_can_go_to_home |
|||
|
|||
|
|||
def scrape_region_for_pokemon(self, region): |
|||
cached_entry = cache.get(f"home_{region}") |
|||
if cached_entry is not None: |
|||
return cached_entry |
|||
|
|||
url = f"{self.base_url}{region.lower()}pokemon.shtml" |
|||
response = cache.fetch_url(url) |
|||
if not response: |
|||
return [] |
|||
|
|||
soup = BeautifulSoup(response, 'html.parser') |
|||
table = soup.find('table', class_='dextable') |
|||
if table is None: |
|||
return [] |
|||
|
|||
pokemon_list = [] |
|||
|
|||
rows = table.find_all('tr')[2:] # Skip the header row and the game intro row |
|||
for row in rows: |
|||
cells = row.find_all('td') |
|||
if len(cells) <= 5: # Ensure we have enough cells to check depositability. if only 5 then its not depositable in any game. |
|||
continue |
|||
|
|||
number = cells[0].text.strip().lstrip('#') |
|||
name = cells[2].text.strip() |
|||
|
|||
# Get the image URL |
|||
img_url = cells[1].find('img')['src'] |
|||
full_img_url = f"https://www.serebii.net{img_url}" |
|||
|
|||
pokemon_list.append({ |
|||
'number': number, |
|||
'name': name, |
|||
'image_url': full_img_url |
|||
}) |
|||
|
|||
cache.set(f"home_{region}", pokemon_list) |
|||
|
|||
return pokemon_list |
|||
|
|||
def extract_bracketed_text(self, string): |
|||
results = [] |
|||
stack = [] |
|||
start_index = -1 |
|||
|
|||
for i, char in enumerate(string): |
|||
if char == '(': |
|||
if not stack: |
|||
start_index = i |
|||
stack.append(i) |
|||
elif char == ')': |
|||
if stack: |
|||
stack.pop() |
|||
if not stack: |
|||
results.append(string[start_index + 1:i]) |
|||
start_index = -1 |
|||
else: |
|||
#logger.warning(f"Warning: Unmatched closing parenthesis at position {i}") |
|||
pass |
|||
|
|||
# Handle any remaining unclosed brackets |
|||
if stack: |
|||
#logger.warning(f"Warning: {len(stack)} unmatched opening parentheses") |
|||
for unmatched_start in stack: |
|||
results.append(string[unmatched_start + 1:]) |
|||
|
|||
return results |
|||
|
|||
@ -0,0 +1,57 @@ |
|||
import wordninja |
|||
import re |
|||
from typing import List |
|||
from utility.data import POKEMON_PROPER_NOUNS |
|||
|
|||
class PokemonWordNinja: |
|||
def __init__(self, custom_word_list: List[str] = None): |
|||
custom_words = POKEMON_PROPER_NOUNS |
|||
if custom_word_list: |
|||
custom_words = custom_words | set(custom_word_list) |
|||
|
|||
self.custom_words = [] |
|||
self.placeholder_map = {} |
|||
self.word_to_placeholder_map = {} |
|||
if custom_words: |
|||
# Store custom words with original capitalization, sorted by length |
|||
self.custom_words = sorted(custom_words, key=len, reverse=True) |
|||
for word in self.custom_words: |
|||
# Generate a unique placeholder |
|||
placeholder = f"__PLACEHOLDER_{hash(word)}__" |
|||
self.placeholder_map[placeholder] = word |
|||
self.word_to_placeholder_map[word] = placeholder |
|||
|
|||
def add_custom_word(self, word: str): |
|||
words = self.custom_words |
|||
words.append(word) |
|||
self.custom_words = sorted(words, key=len, reverse=True) |
|||
placeholder = f"__PLACEHOLDER_{hash(word)}__" |
|||
self.placeholder_map[placeholder] = word |
|||
self.word_to_placeholder_map[word] = placeholder |
|||
|
|||
def split(self, text: str) -> str: |
|||
working_text = text |
|||
working_text = working_text.replace("-", " ") |
|||
|
|||
# First handle exact custom words to preserve capitalization |
|||
for word in self.custom_words: |
|||
# Use word boundaries to make sure we only match full words |
|||
pattern = re.compile(r'\b' + re.escape(word) + r'\b', re.IGNORECASE) |
|||
if pattern.search(working_text): |
|||
placeholder = self.word_to_placeholder_map[word] |
|||
working_text = pattern.sub(placeholder, working_text) |
|||
|
|||
# Clean up spaces |
|||
working_text = ' '.join(working_text.split()) |
|||
|
|||
# For remaining text, use wordninja |
|||
parts = [] |
|||
for part in working_text.split(): |
|||
if part in self.placeholder_map: |
|||
# Replace placeholder with the original word |
|||
parts.append(self.placeholder_map[part]) |
|||
else: |
|||
split_parts = wordninja.split(part) |
|||
parts.extend(split_parts) |
|||
|
|||
return ' '.join(parts) |
|||
Loading…
Reference in new issue