You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
15 KiB

import sqlite3
import sys
import os
from event_system import event_system
def parse_pfic(pfic):
parts = pfic.split('-')
return tuple(int(part) if part.isdigit() else part for part in parts)
class DBController:
def __init__(self):
self.conn = sqlite3.connect(':memory:', check_same_thread=False) # Use in-memory database for runtime
self.cursor = self.conn.cursor()
event_system.add_listener('get_pokemon_data', self.load_pokemon_details)
event_system.add_listener('get_pokemon_list', self.send_pokemon_list)
event_system.add_listener('get_home_storable', self.get_home_storable)
event_system.add_listener('get_evolution_chain', self.get_evolution_chain)
event_system.add_listener('get_evolution_parent', self.get_evolution_parent)
event_system.add_listener('get_encounter_locations', self.get_encounter_locations)
event_system.add_listener('get_game_generation', self.get_game_generation)
event_system.add_listener('get_pokemon_with_encounters', self.get_pokemon_with_encounters)
event_system.add_listener('refresh_in_memory_db', self.refresh_in_memory_db)
event_system.add_listener('clear_encounters_for_pokemon', self.clear_encounters_for_pokemon)
event_system.add_listener('get_game_id_for_name', self.get_game_id_for_name)
def init_database(self):
disk_conn = sqlite3.connect('pokemon_forms.db')
disk_cursor = disk_conn.cursor()
# Create tables in the file-based database
self.create_games_table(disk_cursor)
self.create_pokemon_forms_table(disk_cursor)
self.create_pokemon_storage_table(disk_cursor)
self.create_evolution_chains_table(disk_cursor)
self.create_exclusive_encounter_groups_table(disk_cursor)
self.create_encounters_table(disk_cursor)
self.create_mark_table(disk_cursor)
# Commit changes to the file-based database
disk_conn.commit()
# Copy the file-based database to the in-memory database
disk_conn.backup(self.conn)
# Close the file-based database connection
disk_conn.close()
def create_pokemon_forms_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS pokemon_forms (
PFIC TEXT PRIMARY KEY,
name TEXT NOT NULL,
form_name TEXT,
national_dex INTEGER NOT NULL,
generation INTEGER NOT NULL,
is_baby_form BOOLEAN
)
''')
def create_pokemon_storage_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS pokemon_storage (
PFIC TEXT PRIMARY KEY,
storable_in_home BOOLEAN NOT NULL,
FOREIGN KEY (PFIC) REFERENCES pokemon_forms (PFIC)
)
''')
def create_evolution_chains_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS evolution_chains (
from_pfic TEXT,
to_pfic TEXT,
method TEXT,
PRIMARY KEY (from_pfic, to_pfic),
FOREIGN KEY (from_pfic) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (to_pfic) REFERENCES pokemon_forms (PFIC)
)
''')
def create_exclusive_encounter_groups_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS exclusive_encounter_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_name TEXT NOT NULL,
description TEXT,
game_id INTEGER,
FOREIGN KEY (game_id) REFERENCES games (id)
)
''')
def create_encounter_exclusive_group_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS encounter_exclusive_groups (
encounter_id INTEGER,
group_id INTEGER,
FOREIGN KEY (encounter_id) REFERENCES encounters (id),
FOREIGN KEY (group_id) REFERENCES exclusive_encounter_groups (id),
PRIMARY KEY (encounter_id, group_id)
)
''')
def create_encounters_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS encounters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pfic TEXT,
game_id INTEGER,
location TEXT,
day TEXT,
time TEXT,
dual_slot TEXT,
static_encounter BOOLEAN,
static_encounter_count INTEGER,
extra_text TEXT,
stars TEXT,
fishing BOOLEAN,
rods TEXT,
starter BOOLEAN,
FOREIGN KEY (pfic) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (game_id) REFERENCES games (id)
)
''')
def create_mark_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS marks (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
icon_path TEXT NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS mark_game_associations (
mark_id INTEGER,
game_id INTEGER,
FOREIGN KEY (mark_id) REFERENCES marks (id),
FOREIGN KEY (game_id) REFERENCES games (id),
PRIMARY KEY (mark_id, game_id)
)
''')
marks = [
("Game Boy", "images/marks/GB_icon_HOME.png", ["Red", "Blue", "Yellow", "Gold", "Silver", "Crystal"]),
("Kalos", "images/marks/Blue_pentagon_HOME.png", ["X", "Y", "Omega Ruby", "Alpha Sapphire"]),
("Alola", "images/marks/Black_clover_HOME.png", ["Sun", "Moon", "Ultra Sun", "Ultra Moon"]),
("Let's Go", "images/marks/Let's_Go_icon_HOME.png", ["Let's Go Pikachu", "Let's Go Eevee"]),
("Galar", "images/marks/Galar_symbol_HOME.png", ["Sword", "Shield"]),
("Sinnoh", "images/marks/BDSP_icon_HOME.png", ["Brilliant Diamond", "Shining Pearl"]),
("Hisui", "images/marks/Arceus_mark_HOME.png", ["Legends Arceus"]),
("Paldea", "images/marks/Paldea_icon_HOME.png", ["Scarlet", "Violet"]),
]
# Check if marks already exist
cursor.execute('SELECT COUNT(*) FROM marks')
marks_count = cursor.fetchone()[0]
if marks_count == 0:
for mark in marks:
cursor.execute('''
INSERT OR IGNORE INTO marks (name, icon_path)
VALUES (?, ?)
''', (mark[0], mark[1]))
mark_id = cursor.lastrowid
for game_name in mark[2]:
cursor.execute('''
INSERT OR IGNORE INTO mark_game_associations (mark_id, game_id)
SELECT ?, id FROM games WHERE name = ?
''', (mark_id, game_name))
def create_games_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS games (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
generation INTEGER NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS alternate_game_names (
id INTEGER PRIMARY KEY,
game_id INTEGER NOT NULL,
alternate_name TEXT NOT NULL,
FOREIGN KEY (game_id) REFERENCES games (id),
UNIQUE (alternate_name COLLATE NOCASE)
)
''')
games = [
("Red", 1, ["Red Version"]),
("Blue", 1, ["Blue Version"]),
("Yellow", 1, ["Yellow Version"]),
("Gold", 2, ["Gold Version"]),
("Silver", 2, ["Silver Version"]),
("Crystal", 2, ["Crystal Version"]),
("Ruby", 3, ["Ruby Version"]),
("Sapphire", 3, ["Sapphire Version"]),
("Emerald", 3, ["Emerald Version"]),
("FireRed", 3, ["Fire Red", "Fire-Red"]),
("LeafGreen", 3, ["Leaf Green", "Leaf-Green"]),
("Diamond", 4, ["Diamond Version"]),
("Pearl", 4, ["Pearl Version"]),
("Platinum", 4, ["Platinum Version"]),
("HeartGold", 4, ["Heart Gold", "Heart-Gold"]),
("SoulSilver", 4, ["Soul Silver", "Soul-Silver"]),
("Black", 5, ["Black Version"]),
("White", 5, ["White Version"]),
("Black 2", 5, ["Black Version 2", "Black-2"]),
("White 2", 5, ["White Version 2", "White-2"]),
("X", 6, ["X Version"]),
("Y", 6, ["Y Version"]),
("Omega Ruby", 6, ["Omega Ruby Version", "Omega-Ruby"]),
("Alpha Sapphire", 6, ["Alpha Sapphire Version", "Alpha-Sapphire"]),
("Sun", 7, ["Sun Version"]),
("Moon", 7, ["Moon Version"]),
("Ultra Sun", 7, ["Ultra Sun Version", "Ultra-Sun"]),
("Ultra Moon", 7, ["Ultra Moon Version", "Ultra-Moon"]),
("Let's Go Pikachu", 7, ["Let's Go, Pikachu!", "Lets Go Pikachu"]),
("Let's Go Eevee", 7, ["Let's Go, Eevee!", "Lets Go Eevee"]),
("Sword", 8, ["Sword Version"]),
("Shield", 8, ["Shield Version"]),
("Expansion Pass", 8, ["Expansion Pass (Sword)", "Expansion Pass (Shield)"]),
("Brilliant Diamond", 8, ["Brilliant Diamond Version", "Brilliant-Diamond"]),
("Shining Pearl", 8, ["Shining Pearl Version", "Shining-Pearl"]),
("Legends Arceus", 8, ["Legends: Arceus", "Legends-Arceus"]),
("Scarlet", 9, ["Scarlet Version"]),
("Violet", 9, ["Violet Version"]),
("The Teal Mask", 9, ["The Teal Mask Version", "The Teal Mask (Scarlet)", "The Teal Mask (Violet)"]),
("The Hidden Treasure of Area Zero", 9, ["The Hidden Treasure of Area Zero Version", "The Hidden Treasure of Area Zero (Scarlet)", "The Hidden Treasure of Area Zero (Violet)"]),
("Pokémon Home", 98, ["Pokémon HOME"]),
("Pokémon Go", 99, ["Pokémon GO"]),
]
for game in games:
cursor.execute('''
INSERT OR IGNORE INTO games (name, generation)
VALUES (?, ?)
''', (game[0], game[1]))
game_id = cursor.lastrowid
# Insert alternate names
for alt_name in game[2]:
cursor.execute('''
INSERT OR IGNORE INTO alternate_game_names (game_id, alternate_name)
VALUES (?, ?)
''', (game_id, alt_name))
def load_pokemon_details(self, pfic):
self.cursor.execute('''
SELECT pf.name, pf.form_name, pf.national_dex, pf.generation, ps.storable_in_home, pf.is_baby_form
FROM pokemon_forms pf
LEFT JOIN pokemon_storage ps ON pf.PFIC = ps.PFIC
WHERE pf.PFIC = ?
''', (pfic,))
return self.cursor.fetchone()
def send_pokemon_list(self, data):
# Fetch pokemon list from database
self.cursor.execute('''
SELECT pf.PFIC, pf.name, pf.form_name, pf.national_dex
FROM pokemon_forms pf
''')
pokemon_data = self.cursor.fetchall()
# Sort the pokemon_data based on PFIC
pokemon_data.sort(key=lambda x: parse_pfic(x[0]))
return pokemon_data
def get_home_storable(self, pfic):
self.cursor.execute('SELECT storable_in_home FROM pokemon_storage WHERE PFIC = ?', (pfic,))
result = self.cursor.fetchone()
home_storable = result[0] if result else False
return home_storable
def get_evolution_chain(self, pfic):
def follow_chain(start_pfic, visited=None):
if visited is None:
visited = set()
chain = []
stack = [(start_pfic, None)]
while stack:
current_pfic, method = stack.pop()
if current_pfic in visited:
continue
visited.add(current_pfic)
self.cursor.execute('SELECT name, form_name FROM pokemon_forms WHERE PFIC = ?', (current_pfic,))
name, form_name = self.cursor.fetchone()
chain.append((current_pfic, name, form_name, method))
# Get previous evolutions
self.cursor.execute('SELECT from_pfic, method FROM evolution_chains WHERE to_pfic = ?', (current_pfic,))
prev_evolutions = self.cursor.fetchall()
for prev_pfic, prev_method in prev_evolutions:
if prev_pfic not in visited:
stack.append((prev_pfic, prev_method))
# Get next evolutions
self.cursor.execute('SELECT to_pfic, method FROM evolution_chains WHERE from_pfic = ?', (current_pfic,))
next_evolutions = self.cursor.fetchall()
for next_pfic, next_method in next_evolutions:
if next_pfic not in visited:
stack.append((next_pfic, next_method))
return chain
return follow_chain(pfic)
def get_evolution_parent(self, pfic):
self.cursor.execute('SELECT from_pfic FROM evolution_chains WHERE to_pfic = ?', (pfic,))
return self.cursor.fetchone()
def get_encounter_locations(self, pfic):
self.cursor.execute('''
SELECT g.name, e.location, e.day, e.time, e.dual_slot, e.static_encounter_count, e.static_encounter, e.extra_text, e.stars, e.rods, e.fishing
FROM encounters e
JOIN games g ON e.game_id = g.id
WHERE e.pfic = ?
ORDER BY g.name, e.location
''', (pfic,))
return self.cursor.fetchall()
def get_game_generation(self, game):
self.cursor.execute('''
SELECT g.generation
FROM games g
LEFT JOIN alternate_game_names agn ON g.id = agn.game_id
WHERE g.name = ? OR agn.alternate_name = ?
LIMIT 1
''', (game, game))
result = self.cursor.fetchone()
return result[0] if result else None
def get_game_id_for_name(self, game):
# First, try to find the game in the main games table
self.cursor.execute('''
SELECT id
FROM games
WHERE name = ?
''', (game,))
result = self.cursor.fetchone()
if result:
return result[0]
# If not found, check the alternate_game_names table
self.cursor.execute('''
SELECT game_id
FROM alternate_game_names
WHERE alternate_name = ?
''', (game,))
result = self.cursor.fetchone()
return result[0] if result else None
def get_pokemon_with_encounters(self, data):
self.cursor.execute('''
SELECT DISTINCT pfic
FROM encounters
''')
pokemon_with_encounters = set(row[0] for row in self.cursor.fetchall())
return pokemon_with_encounters
def refresh_in_memory_db(self, temp_conn):
temp_conn.backup(self.conn)
def clear_encounters_for_pokemon(self, pfic):
self.cursor.execute('DELETE FROM encounters WHERE pfic = ?', (pfic,))
self.conn.commit()