import sqlite3 import threading import json import os import networkx as nx from utility.data import main_line_games class DBController: def __init__(self, db_path=':memory:', max_connections=10): self.db_path = db_path self.lock = threading.Lock() self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn.row_factory = sqlite3.Row self.cursor = self.conn.cursor() self.graph = nx.DiGraph() self.init_database() def init_database(self): disk_conn = sqlite3.connect('pokemon_forms.db') disk_cursor = disk_conn.cursor() # Create tables in the file-based database self.create_pokemon_forms_table(disk_cursor) self.create_games_table(disk_cursor) self.create_encounters_table(disk_cursor) # Commit changes to the file-based database disk_conn.commit() # Copy the file-based database to the in-memory database disk_conn.backup(self.conn) # Close the file-based database connection disk_conn.close() if os.path.exists("pokemon_evolution_graph.json"): with open("pokemon_evolution_graph.json", "r") as f: data = json.load(f) self.graph = nx.node_link_graph(data) def save_changes(self): with self.lock: # Count the number of records before backup for verification self.cursor.execute('SELECT COUNT(*) FROM pokemon_forms') count = self.cursor.fetchone()[0] print(f"Records in memory before backup: {count}") # Back up the master connection to disk disk_conn = sqlite3.connect('pokemon_forms.db') with disk_conn: self.conn.backup(disk_conn) disk_conn.close() data = nx.node_link_data(self.graph) with open("pokemon_evolution_graph.json", "w") as f: json.dump(data, f) def close(self): self.save_changes() self.conn.close() def create_pokemon_forms_table(self, cursor): cursor.execute(''' CREATE TABLE IF NOT EXISTS pokemon_forms ( PFIC TEXT PRIMARY KEY, data JSON NOT NULL ) ''') def create_encounters_table(self, cursor): cursor.execute(''' CREATE TABLE IF NOT EXISTS encounters ( PFIC TEXT, game_id INTEGER NOT NULL, type TEXT NOT NULL, data JSON NOT NULL, FOREIGN KEY (PFIC) REFERENCES pokemon_forms (PFIC), FOREIGN KEY (game_id) REFERENCES games (id) ) ''') def create_games_table(self, cursor): cursor.execute(''' CREATE TABLE IF NOT EXISTS games ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, alt_names TEXT, generation INTEGER NOT NULL, data JSON ) ''') for game in main_line_games: name = game["Name"] alt_names = ", ".join(game["AltNames"]) # Convert list to comma-separated string generation = game["Generation"] cursor.execute(''' INSERT OR IGNORE INTO games (name, alt_names, generation) VALUES (?, ?, ?) ''', (name, alt_names, generation)) def add_pokemon_form(self, pfic, name, form_name, national_dex, generation, sprite_url, gender_relevant): data = { "name": name, "form_name": form_name, "national_dex": national_dex, "generation": generation, "sprite_url": sprite_url, "is_baby_form": False, "storable_in_home": False, "gender_relevant": gender_relevant } with self.lock: self.cursor.execute(''' INSERT OR REPLACE INTO pokemon_forms (PFIC, data) VALUES (?, ?) ''', (pfic, json.dumps(data))) self.conn.commit() print(f"Added: {pfic}, {name}") def craft_pokemon_json_query(self, fields_to_include, pfic = None): query = f"SELECT " extracts = [] for field in fields_to_include: if field == "pfic": extracts.append("PFIC as pfic") else: extracts.append(f"JSON_EXTRACT(data, '$.{field}') AS {field}") query = query + ", ".join(extracts) query = query + " FROM pokemon_forms" if pfic is not None: query = query + f" WHERE PFIC = '{pfic}'" return query def get_pokemon_details(self, pfic, fields = None): if fields == None: fields = [ "name", "form_name", "national_dex", "generation", "is_baby_form", "storable_in_home", "gender_relevant" ] query = self.craft_pokemon_json_query(fields, pfic) self.cursor.execute(query) results = self.cursor.fetchone() return dict(results) def get_pokemon_details_by_name(self, name, fields = None): if fields == None: fields = [ "pfic", "name", "form_name", "national_dex", "generation", "is_baby_form", "storable_in_home", "gender_relevant" ] query = self.craft_pokemon_json_query(fields) name = name.replace("'", "''") query += f" WHERE JSON_EXTRACT(data, '$.name') = '{name}'" self.cursor.execute(query) results = self.cursor.fetchall() return [dict(row) for row in results] def get_list_of_pokemon_forms(self): fields = [ "pfic", "name", "form_name", "national_dex", "generation", "is_baby_form", "storable_in_home", "gender_relevant" ] query = self.craft_pokemon_json_query(fields) self.cursor.execute(query) results = self.cursor.fetchall() return [dict(row) for row in results] def update_home_status(self, pfic, status): self.update_pokemon_field(pfic, "storable_in_home", status) pass def update_pokemon_field(self, pfic, field_name, new_value): # Fetch the existing record self.cursor.execute('SELECT data FROM pokemon_forms WHERE PFIC = ?', (pfic,)) result = self.cursor.fetchone() if result: # Load the JSON data and update the field data = json.loads(result[0]) data[field_name] = new_value # Update the record with the modified JSON updated_data_str = json.dumps(data) self.cursor.execute(''' UPDATE pokemon_forms SET data = ? WHERE PFIC = ? ''', (updated_data_str, pfic)) self.conn.commit() def update_evolution_graph(self, evolutions): for key in evolutions: value = evolutions[key] from_pfic = value["from_pfic"] to_pfic = value["to_pfic"] method = value["method"] # Add nodes if they do not already exist if not self.graph.has_node(from_pfic): self.graph.add_node(from_pfic) if not self.graph.has_node(to_pfic): self.graph.add_node(to_pfic) # Add the edge representing the evolution, with the method as an attribute self.graph.add_edge(from_pfic, to_pfic, method=method) def get_evolution_graph(self, pfic): if self.graph.has_node(pfic) == False: return [] return list(self.graph.successors(pfic)) def get_previous_evolution(self, pfic): if self.graph.has_node(pfic) == False: return None, None predecessor = next(self.graph.predecessors(pfic), None) if predecessor: method = self.graph[predecessor][pfic]["method"] return predecessor, method else: return None, None def get_evolution_paths(self, start_node): paths = [] if self.graph.has_node(start_node) == False: return paths # Define a recursive function to traverse the graph def traverse(current_node, current_path, is_root=False): if is_root: # Add the current node to the path as a tuple (node, None) current_path.append((current_node, None)) # Get successors of the current node successors = list(self.graph.successors(current_node)) if not successors: # If there are no successors, add the current path to paths list paths.append(current_path.copy()) else: # Traverse each successor and add edge metadata for successor in successors: method = self.graph[current_node][successor]["method"] # Add the successor node and method as a tuple (successor, method) current_path.append((successor, method)) # Recur for the successor traverse(successor, current_path) # Backtrack (remove the last node and edge metadata) current_path.pop() # Remove the initial node tuple when backtracking fully if is_root: current_path.pop() # Start traversal from the start_node traverse(start_node, [], True) return paths def get_full_evolution_paths(self, start_node): """ Get all evolution paths starting from a given node, including predecessors and successors. :param start_node: The starting node (e.g., a specific Pokemon form). :return: A dictionary containing predecessors and successors paths. """ full_paths = { "predecessors": [], "successors": [] } if self.graph.has_node(start_node) == False: return full_paths # Traverse predecessors def traverse_predecessors(current_node, current_path, is_root=False): #if not is_root: # Add the current node to the path #current_path.append(current_node) # Get predecessors of the current node predecessors = list(self.graph.predecessors(current_node)) if not predecessors: # If there are no predecessors, add the current path to the list full_paths["predecessors"].append(current_path.copy()) else: # Traverse each predecessor for predecessor in predecessors: method = self.graph[predecessor][current_node]["method"] # Add the edge metadata as a tuple (predecessor, method) current_path.append((predecessor, method)) # Recur for the predecessor traverse_predecessors(predecessor, current_path) # Backtrack (remove the last node and edge metadata) current_path.pop() #current_path.pop() # Traverse successors def traverse_successors(current_node, current_path, is_root=False): if is_root: # Add the current node to the path as a tuple (node, None) predecessor = next(self.graph.predecessors(current_node), None) if predecessor: method = self.graph[predecessor][current_node]["method"] current_path.append((current_node, method)) else: current_path.append((current_node, None)) # Get successors of the current node successors = list(self.graph.successors(current_node)) if not successors: # If there are no successors, add the current path to paths list full_paths["successors"].append(current_path.copy()) else: # Traverse each successor and add edge metadata for successor in successors: method = self.graph[current_node][successor]["method"] # Add the successor node and method as a tuple (successor, method) current_path.append((successor, method)) # Recur for the successor traverse_successors(successor, current_path) # Backtrack (remove the last node and edge metadata) current_path.pop() if is_root: # Remove the initial node tuple when backtracking fully current_path.pop() # Start traversal from the start_node for both predecessors and successors traverse_predecessors(start_node, [], True) traverse_successors(start_node, [], True) return full_paths def propagate_gender_relevance(self, gender_relevant_nodes): """ Propagate gender relevance through the evolution graph and update the SQLite database. :param db_path: Path to the SQLite database file. :param gender_relevant_nodes: A set of nodes that are initially marked as gender-relevant. """ # Traverse from each gender-relevant end node backward to propagate relevance for node in gender_relevant_nodes: # Use breadth-first search or depth-first search to traverse backward visited = set() stack = [node] while stack: current_node = stack.pop() if current_node not in visited: visited.add(current_node) # Update the gender_relevant flag in the database self.update_pokemon_field(current_node, "gender_relevant", True) # Add predecessors to the stack to keep traversing backward if self.graph.has_node(current_node): predecessors = list(self.graph.predecessors(current_node)) stack.extend(predecessors) self.save_changes() def get_gender_specific_evolutions(self): """ Get a list of nodes that have evolution methods indicating gender relevance (i.e., '(male)' or '(female)'). :return: A list of nodes involved in gender-specific evolutions. """ gender_specific_nodes = [] for from_node, to_node, edge_data in self.graph.edges(data=True): method = edge_data.get("method", "") if method and ("(male)" in method.lower() or "(female)" in method.lower()): # Add both nodes involved in this gender-specific evolution gender_specific_nodes.extend([from_node, to_node]) return list(set(gender_specific_nodes)) # Return unique nodes def get_gender_relevant_pokemon(self): self.cursor.execute(f"SELECT PFIC FROM pokemon_forms WHERE JSON_EXTRACT(data, '$.gender_relevant') = true") results = self.cursor.fetchall() return [row['PFIC'] for row in results] def get_game_id_by_name(self, name): self.cursor.execute(''' SELECT id, name, generation FROM games WHERE name LIKE ? OR alt_names LIKE ? ''', (f"%{name}%", f"%{name}%")) # Fetch and print the results result = self.cursor.fetchone() print(f"ID: {result[0]}, Name: {result[1]}, Generation: {result[2]}") return dict(result) def get_games_by_name(self, name): self.cursor.execute(''' SELECT id, name, generation FROM games WHERE name LIKE ? OR alt_names LIKE ? ''', (f"%{name}%", f"%{name}%")) # Fetch and print the results results = self.cursor.fetchall() return [dict(row) for row in results] def get_games_by_generation(self, generation): self.cursor.execute(''' SELECT id, name FROM games WHERE generation = ? ''', (generation,)) # Fetch and print the results results = self.cursor.fetchall() for row in results: print(f"ID: {row[0]}, Name: {row[1]}") return [dict(row) for row in results] def update_encounter_locations(self, data): for encounter in data: with self.lock: pfic = encounter["pfic"] game_id = encounter["game_id"]["id"] type = encounter["type"] data = encounter["data"] if "data" in encounter else None self.cursor.execute(''' INSERT OR REPLACE INTO encounters (PFIC, game_id, type, data) VALUES (?, ?, ?, ?) ''', (pfic, game_id, type, json.dumps(data))) self.conn.commit() print(f"Added: {pfic}") pass