You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

485 lines
17 KiB

import sqlite3
import threading
import json
import os
import networkx as nx
from utility.data import main_line_games
class DBController:
def __init__(self, db_path=':memory:', max_connections=10):
self.db_path = db_path
self.lock = threading.Lock()
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
self.graph = nx.DiGraph()
self.init_database()
def init_database(self):
disk_conn = sqlite3.connect('pokemon_forms.db')
disk_cursor = disk_conn.cursor()
# Create tables in the file-based database
self.create_pokemon_forms_table(disk_cursor)
self.create_games_table(disk_cursor)
self.create_encounters_table(disk_cursor)
# Commit changes to the file-based database
disk_conn.commit()
# Copy the file-based database to the in-memory database
disk_conn.backup(self.conn)
# Close the file-based database connection
disk_conn.close()
if os.path.exists("pokemon_evolution_graph.json"):
with open("pokemon_evolution_graph.json", "r") as f:
data = json.load(f)
self.graph = nx.node_link_graph(data)
def save_changes(self):
with self.lock:
# Count the number of records before backup for verification
self.cursor.execute('SELECT COUNT(*) FROM pokemon_forms')
count = self.cursor.fetchone()[0]
print(f"Records in memory before backup: {count}")
# Back up the master connection to disk
disk_conn = sqlite3.connect('pokemon_forms.db')
with disk_conn:
self.conn.backup(disk_conn)
disk_conn.close()
data = nx.node_link_data(self.graph)
with open("pokemon_evolution_graph.json", "w") as f:
json.dump(data, f)
def close(self):
self.save_changes()
self.conn.close()
def create_pokemon_forms_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS pokemon_forms (
PFIC TEXT PRIMARY KEY,
data JSON NOT NULL
)
''')
def create_encounters_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS encounters (
PFIC TEXT,
game_id INTEGER NOT NULL,
type TEXT NOT NULL,
data JSON NOT NULL,
FOREIGN KEY (PFIC) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (game_id) REFERENCES games (id)
)
''')
def create_games_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS games (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
alt_names TEXT,
generation INTEGER NOT NULL,
data JSON
)
''')
for game in main_line_games:
name = game["Name"]
alt_names = ", ".join(game["AltNames"]) # Convert list to comma-separated string
generation = game["Generation"]
cursor.execute('''
INSERT OR IGNORE INTO games (name, alt_names, generation)
VALUES (?, ?, ?)
''', (name, alt_names, generation))
def add_pokemon_form(self, pfic, name, form_name, national_dex, generation, sprite_url, gender_relevant):
data = {
"name": name,
"form_name": form_name,
"national_dex": national_dex,
"generation": generation,
"sprite_url": sprite_url,
"is_baby_form": False,
"storable_in_home": False,
"gender_relevant": gender_relevant
}
with self.lock:
self.cursor.execute('''
INSERT OR REPLACE INTO pokemon_forms (PFIC, data) VALUES (?, ?)
''', (pfic, json.dumps(data)))
self.conn.commit()
print(f"Added: {pfic}, {name}")
def craft_pokemon_json_query(self, fields_to_include, pfic = None):
query = f"SELECT "
extracts = []
for field in fields_to_include:
if field == "pfic":
extracts.append("PFIC as pfic")
else:
extracts.append(f"JSON_EXTRACT(data, '$.{field}') AS {field}")
query = query + ", ".join(extracts)
query = query + " FROM pokemon_forms"
if pfic is not None:
query = query + f" WHERE PFIC = '{pfic}'"
return query
def get_pokemon_details(self, pfic, fields = None):
if fields == None:
fields = [
"name",
"form_name",
"national_dex",
"generation",
"is_baby_form",
"storable_in_home",
"gender_relevant"
]
query = self.craft_pokemon_json_query(fields, pfic)
self.cursor.execute(query)
results = self.cursor.fetchone()
return dict(results)
def get_pokemon_details_by_name(self, name, fields = None):
if fields == None:
fields = [
"pfic",
"name",
"form_name",
"national_dex",
"generation",
"is_baby_form",
"storable_in_home",
"gender_relevant"
]
query = self.craft_pokemon_json_query(fields)
name = name.replace("'", "''")
query += f" WHERE JSON_EXTRACT(data, '$.name') = '{name}'"
self.cursor.execute(query)
results = self.cursor.fetchall()
return [dict(row) for row in results]
def get_list_of_pokemon_forms(self):
fields = [
"pfic",
"name",
"form_name",
"national_dex",
"generation",
"is_baby_form",
"storable_in_home",
"gender_relevant"
]
query = self.craft_pokemon_json_query(fields)
self.cursor.execute(query)
results = self.cursor.fetchall()
return [dict(row) for row in results]
def update_home_status(self, pfic, status):
self.update_pokemon_field(pfic, "storable_in_home", status)
pass
def update_pokemon_field(self, pfic, field_name, new_value):
# Fetch the existing record
self.cursor.execute('SELECT data FROM pokemon_forms WHERE PFIC = ?', (pfic,))
result = self.cursor.fetchone()
if result:
# Load the JSON data and update the field
data = json.loads(result[0])
data[field_name] = new_value
# Update the record with the modified JSON
updated_data_str = json.dumps(data)
self.cursor.execute('''
UPDATE pokemon_forms
SET data = ?
WHERE PFIC = ?
''', (updated_data_str, pfic))
self.conn.commit()
def update_evolution_graph(self, evolutions):
for key in evolutions:
value = evolutions[key]
from_pfic = value["from_pfic"]
to_pfic = value["to_pfic"]
method = value["method"]
# Add nodes if they do not already exist
if not self.graph.has_node(from_pfic):
self.graph.add_node(from_pfic)
if not self.graph.has_node(to_pfic):
self.graph.add_node(to_pfic)
# Add the edge representing the evolution, with the method as an attribute
self.graph.add_edge(from_pfic, to_pfic, method=method)
def get_evolution_graph(self, pfic):
if self.graph.has_node(pfic) == False:
return []
return list(self.graph.successors(pfic))
def get_previous_evolution(self, pfic):
if self.graph.has_node(pfic) == False:
return None, None
predecessor = next(self.graph.predecessors(pfic), None)
if predecessor:
method = self.graph[predecessor][pfic]["method"]
return predecessor, method
else:
return None, None
def get_evolution_paths(self, start_node):
paths = []
if self.graph.has_node(start_node) == False:
return paths
# Define a recursive function to traverse the graph
def traverse(current_node, current_path, is_root=False):
if is_root:
# Add the current node to the path as a tuple (node, None)
current_path.append((current_node, None))
# Get successors of the current node
successors = list(self.graph.successors(current_node))
if not successors:
# If there are no successors, add the current path to paths list
paths.append(current_path.copy())
else:
# Traverse each successor and add edge metadata
for successor in successors:
method = self.graph[current_node][successor]["method"]
# Add the successor node and method as a tuple (successor, method)
current_path.append((successor, method))
# Recur for the successor
traverse(successor, current_path)
# Backtrack (remove the last node and edge metadata)
current_path.pop()
# Remove the initial node tuple when backtracking fully
if is_root:
current_path.pop()
# Start traversal from the start_node
traverse(start_node, [], True)
return paths
def get_full_evolution_paths(self, start_node):
"""
Get all evolution paths starting from a given node, including predecessors and successors.
:param start_node: The starting node (e.g., a specific Pokemon form).
:return: A dictionary containing predecessors and successors paths.
"""
full_paths = {
"predecessors": [],
"successors": []
}
if self.graph.has_node(start_node) == False:
return full_paths
# Traverse predecessors
def traverse_predecessors(current_node, current_path, is_root=False):
#if not is_root:
# Add the current node to the path
#current_path.append(current_node)
# Get predecessors of the current node
predecessors = list(self.graph.predecessors(current_node))
if not predecessors:
# If there are no predecessors, add the current path to the list
full_paths["predecessors"].append(current_path.copy())
else:
# Traverse each predecessor
for predecessor in predecessors:
method = self.graph[predecessor][current_node]["method"]
# Add the edge metadata as a tuple (predecessor, method)
current_path.append((predecessor, method))
# Recur for the predecessor
traverse_predecessors(predecessor, current_path)
# Backtrack (remove the last node and edge metadata)
current_path.pop()
#current_path.pop()
# Traverse successors
def traverse_successors(current_node, current_path, is_root=False):
if is_root:
# Add the current node to the path as a tuple (node, None)
predecessor = next(self.graph.predecessors(current_node), None)
if predecessor:
method = self.graph[predecessor][current_node]["method"]
current_path.append((current_node, method))
else:
current_path.append((current_node, None))
# Get successors of the current node
successors = list(self.graph.successors(current_node))
if not successors:
# If there are no successors, add the current path to paths list
full_paths["successors"].append(current_path.copy())
else:
# Traverse each successor and add edge metadata
for successor in successors:
method = self.graph[current_node][successor]["method"]
# Add the successor node and method as a tuple (successor, method)
current_path.append((successor, method))
# Recur for the successor
traverse_successors(successor, current_path)
# Backtrack (remove the last node and edge metadata)
current_path.pop()
if is_root:
# Remove the initial node tuple when backtracking fully
current_path.pop()
# Start traversal from the start_node for both predecessors and successors
traverse_predecessors(start_node, [], True)
traverse_successors(start_node, [], True)
return full_paths
def propagate_gender_relevance(self, gender_relevant_nodes):
"""
Propagate gender relevance through the evolution graph and update the SQLite database.
:param db_path: Path to the SQLite database file.
:param gender_relevant_nodes: A set of nodes that are initially marked as gender-relevant.
"""
# Traverse from each gender-relevant end node backward to propagate relevance
for node in gender_relevant_nodes:
# Use breadth-first search or depth-first search to traverse backward
visited = set()
stack = [node]
while stack:
current_node = stack.pop()
if current_node not in visited:
visited.add(current_node)
# Update the gender_relevant flag in the database
self.update_pokemon_field(current_node, "gender_relevant", True)
# Add predecessors to the stack to keep traversing backward
if self.graph.has_node(current_node):
predecessors = list(self.graph.predecessors(current_node))
stack.extend(predecessors)
self.save_changes()
def get_gender_specific_evolutions(self):
"""
Get a list of nodes that have evolution methods indicating gender relevance (i.e., '(male)' or '(female)').
:return: A list of nodes involved in gender-specific evolutions.
"""
gender_specific_nodes = []
for from_node, to_node, edge_data in self.graph.edges(data=True):
method = edge_data.get("method", "")
if method and ("(male)" in method.lower() or "(female)" in method.lower()):
# Add both nodes involved in this gender-specific evolution
gender_specific_nodes.extend([from_node, to_node])
return list(set(gender_specific_nodes)) # Return unique nodes
def get_gender_relevant_pokemon(self):
self.cursor.execute(f"SELECT PFIC FROM pokemon_forms WHERE JSON_EXTRACT(data, '$.gender_relevant') = true")
results = self.cursor.fetchall()
return [row['PFIC'] for row in results]
def get_game_id_by_name(self, name):
self.cursor.execute('''
SELECT id, name, generation FROM games
WHERE name LIKE ? OR alt_names LIKE ?
''', (f"%{name}%", f"%{name}%"))
# Fetch and print the results
result = self.cursor.fetchone()
print(f"ID: {result[0]}, Name: {result[1]}, Generation: {result[2]}")
return dict(result)
def get_games_by_name(self, name):
self.cursor.execute('''
SELECT id, name, generation FROM games
WHERE name LIKE ? OR alt_names LIKE ?
''', (f"%{name}%", f"%{name}%"))
# Fetch and print the results
results = self.cursor.fetchall()
return [dict(row) for row in results]
def get_game_by_id(self, id):
self.cursor.execute('''
SELECT * FROM games
WHERE id = ?
''', (id))
# Fetch and print the results
result = self.cursor.fetchone()
return dict(result)
def get_games_by_generation(self, generation):
self.cursor.execute('''
SELECT id, name FROM games
WHERE generation = ?
''', (generation,))
# Fetch and print the results
results = self.cursor.fetchall()
for row in results:
print(f"ID: {row[0]}, Name: {row[1]}")
return [dict(row) for row in results]
def update_encounter_locations(self, data):
for encounter in data:
with self.lock:
pfic = encounter["pfic"]
game_id = encounter["game_id"]["id"]
type = encounter["type"]
data = encounter["data"] if "data" in encounter else None
self.cursor.execute('''
INSERT OR REPLACE INTO encounters (PFIC, game_id, type, data) VALUES (?, ?, ?, ?)
''', (pfic, game_id, type, json.dumps(data)))
self.conn.commit()
print(f"Added: {pfic}")
pass
def get_encounters(self, pfic):
self.cursor.execute('''
SELECT * FROM encounters
WHERE PFIC = ?
''', (pfic))
# Fetch and print the results
results = self.cursor.fetchall()
return [dict(row) for row in results]