import sqlite3
import sys
import os
from event_system import event_system
def parse_pfic ( pfic ) :
parts = pfic . split ( ' - ' )
return tuple ( int ( part ) if part . isdigit ( ) else part for part in parts )
class DBController :
def __init__ ( self ) :
self . conn = sqlite3 . connect ( ' :memory: ' , check_same_thread = False ) # Use in-memory database for runtime
self . cursor = self . conn . cursor ( )
event_system . add_listener ( ' get_pokemon_data ' , self . load_pokemon_details )
event_system . add_listener ( ' get_pokemon_list ' , self . send_pokemon_list )
event_system . add_listener ( ' get_home_storable ' , self . get_home_storable )
event_system . add_listener ( ' get_evolution_chain ' , self . get_evolution_chain )
event_system . add_listener ( ' get_evolution_parent ' , self . get_evolution_parent )
event_system . add_listener ( ' get_encounter_locations ' , self . get_encounter_locations )
event_system . add_listener ( ' get_game_generation ' , self . get_game_generation )
event_system . add_listener ( ' get_pokemon_with_encounters ' , self . get_pokemon_with_encounters )
event_system . add_listener ( ' refresh_in_memory_db ' , self . refresh_in_memory_db )
event_system . add_listener ( ' clear_encounters_for_pokemon ' , self . clear_encounters_for_pokemon )
event_system . add_listener ( ' get_game_id_for_name ' , self . get_game_id_for_name )
event_system . add_listener ( ' get_exclusive_encounter_sets ' , self . get_exclusive_encounter_sets )
event_system . add_listener ( ' get_exclusive_set_details ' , self . get_exclusive_set_details )
event_system . add_listener ( ' get_set_encounters ' , self . get_set_encounters )
event_system . add_listener ( ' delete_encounter_from_set ' , self . delete_encounter_from_set )
event_system . add_listener ( ' get_available_encounters ' , self . get_available_encounters )
event_system . add_listener ( ' add_encounter_to_set ' , self . add_encounter_to_set )
event_system . add_listener ( ' get_games_list ' , self . get_games_list )
event_system . add_listener ( ' add_new_exclusive_set ' , self . add_new_exclusive_set )
def init_database ( self ) :
disk_conn = sqlite3 . connect ( ' pokemon_forms.db ' )
disk_cursor = disk_conn . cursor ( )
# Create tables in the file-based database
self . create_games_table ( disk_cursor )
self . create_pokemon_forms_table ( disk_cursor )
self . create_pokemon_storage_table ( disk_cursor )
self . create_evolution_chains_table ( disk_cursor )
self . create_exclusive_encounter_groups_table ( disk_cursor )
self . create_encounter_exclusive_group_table ( disk_cursor )
self . create_encounters_table ( disk_cursor )
self . create_mark_table ( disk_cursor )
# Commit changes to the file-based database
disk_conn . commit ( )
# Copy the file-based database to the in-memory database
disk_conn . backup ( self . conn )
# Close the file-based database connection
disk_conn . close ( )
def create_pokemon_forms_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS pokemon_forms (
PFIC TEXT PRIMARY KEY ,
name TEXT NOT NULL ,
form_name TEXT ,
national_dex INTEGER NOT NULL ,
generation INTEGER NOT NULL ,
is_baby_form BOOLEAN
)
''' )
def create_pokemon_storage_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS pokemon_storage (
PFIC TEXT PRIMARY KEY ,
storable_in_home BOOLEAN NOT NULL ,
FOREIGN KEY ( PFIC ) REFERENCES pokemon_forms ( PFIC )
)
''' )
def create_evolution_chains_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS evolution_chains (
from_pfic TEXT ,
to_pfic TEXT ,
method TEXT ,
PRIMARY KEY ( from_pfic , to_pfic ) ,
FOREIGN KEY ( from_pfic ) REFERENCES pokemon_forms ( PFIC ) ,
FOREIGN KEY ( to_pfic ) REFERENCES pokemon_forms ( PFIC )
)
''' )
def create_exclusive_encounter_groups_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS exclusive_encounter_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT ,
group_name TEXT NOT NULL ,
description TEXT ,
game_id INTEGER ,
FOREIGN KEY ( game_id ) REFERENCES games ( id )
)
''' )
def create_encounter_exclusive_group_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS encounter_exclusive_groups (
encounter_id INTEGER ,
group_id INTEGER ,
FOREIGN KEY ( encounter_id ) REFERENCES encounters ( id ) ,
FOREIGN KEY ( group_id ) REFERENCES exclusive_encounter_groups ( id ) ,
PRIMARY KEY ( encounter_id , group_id )
)
''' )
def create_encounters_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS encounters (
id INTEGER PRIMARY KEY AUTOINCREMENT ,
pfic TEXT ,
game_id INTEGER ,
location TEXT ,
day TEXT ,
time TEXT ,
dual_slot TEXT ,
static_encounter BOOLEAN ,
static_encounter_count INTEGER ,
extra_text TEXT ,
stars TEXT ,
fishing BOOLEAN ,
rods TEXT ,
starter BOOLEAN ,
FOREIGN KEY ( pfic ) REFERENCES pokemon_forms ( PFIC ) ,
FOREIGN KEY ( game_id ) REFERENCES games ( id )
)
''' )
def create_mark_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS marks (
id INTEGER PRIMARY KEY ,
name TEXT NOT NULL UNIQUE ,
icon_path TEXT NOT NULL
)
''' )
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS mark_game_associations (
mark_id INTEGER ,
game_id INTEGER ,
FOREIGN KEY ( mark_id ) REFERENCES marks ( id ) ,
FOREIGN KEY ( game_id ) REFERENCES games ( id ) ,
PRIMARY KEY ( mark_id , game_id )
)
''' )
marks = [
( " Game Boy " , " images/marks/GB_icon_HOME.png " , [ " Red " , " Blue " , " Yellow " , " Gold " , " Silver " , " Crystal " ] ) ,
( " Kalos " , " images/marks/Blue_pentagon_HOME.png " , [ " X " , " Y " , " Omega Ruby " , " Alpha Sapphire " ] ) ,
( " Alola " , " images/marks/Black_clover_HOME.png " , [ " Sun " , " Moon " , " Ultra Sun " , " Ultra Moon " ] ) ,
( " Let ' s Go " , " images/marks/Let ' s_Go_icon_HOME.png " , [ " Let ' s Go Pikachu " , " Let ' s Go Eevee " ] ) ,
( " Galar " , " images/marks/Galar_symbol_HOME.png " , [ " Sword " , " Shield " ] ) ,
( " Sinnoh " , " images/marks/BDSP_icon_HOME.png " , [ " Brilliant Diamond " , " Shining Pearl " ] ) ,
( " Hisui " , " images/marks/Arceus_mark_HOME.png " , [ " Legends Arceus " ] ) ,
( " Paldea " , " images/marks/Paldea_icon_HOME.png " , [ " Scarlet " , " Violet " ] ) ,
]
# Check if marks already exist
cursor . execute ( ' SELECT COUNT(*) FROM marks ' )
marks_count = cursor . fetchone ( ) [ 0 ]
if marks_count == 0 :
for mark in marks :
cursor . execute ( '''
INSERT OR IGNORE INTO marks ( name , icon_path )
VALUES ( ? , ? )
''' , (mark[0], mark[1]))
mark_id = cursor . lastrowid
for game_name in mark [ 2 ] :
cursor . execute ( '''
INSERT OR IGNORE INTO mark_game_associations ( mark_id , game_id )
SELECT ? , id FROM games WHERE name = ?
''' , (mark_id, game_name))
def create_games_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS games (
id INTEGER PRIMARY KEY ,
name TEXT NOT NULL ,
generation INTEGER NOT NULL
)
''' )
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS alternate_game_names (
id INTEGER PRIMARY KEY ,
game_id INTEGER NOT NULL ,
alternate_name TEXT NOT NULL ,
FOREIGN KEY ( game_id ) REFERENCES games ( id ) ,
UNIQUE ( alternate_name COLLATE NOCASE )
)
''' )
games = [
( " Red " , 1 , [ " Red Version " ] ) ,
( " Blue " , 1 , [ " Blue Version " ] ) ,
( " Yellow " , 1 , [ " Yellow Version " ] ) ,
( " Gold " , 2 , [ " Gold Version " ] ) ,
( " Silver " , 2 , [ " Silver Version " ] ) ,
( " Crystal " , 2 , [ " Crystal Version " ] ) ,
( " Ruby " , 3 , [ " Ruby Version " ] ) ,
( " Sapphire " , 3 , [ " Sapphire Version " ] ) ,
( " Emerald " , 3 , [ " Emerald Version " ] ) ,
( " FireRed " , 3 , [ " Fire Red " , " Fire-Red " ] ) ,
( " LeafGreen " , 3 , [ " Leaf Green " , " Leaf-Green " ] ) ,
( " Diamond " , 4 , [ " Diamond Version " ] ) ,
( " Pearl " , 4 , [ " Pearl Version " ] ) ,
( " Platinum " , 4 , [ " Platinum Version " ] ) ,
( " HeartGold " , 4 , [ " Heart Gold " , " Heart-Gold " ] ) ,
( " SoulSilver " , 4 , [ " Soul Silver " , " Soul-Silver " ] ) ,
( " Black " , 5 , [ " Black Version " ] ) ,
( " White " , 5 , [ " White Version " ] ) ,
( " Black 2 " , 5 , [ " Black Version 2 " , " Black-2 " ] ) ,
( " White 2 " , 5 , [ " White Version 2 " , " White-2 " ] ) ,
( " X " , 6 , [ " X Version " ] ) ,
( " Y " , 6 , [ " Y Version " ] ) ,
( " Omega Ruby " , 6 , [ " Omega Ruby Version " , " Omega-Ruby " ] ) ,
( " Alpha Sapphire " , 6 , [ " Alpha Sapphire Version " , " Alpha-Sapphire " ] ) ,
( " Sun " , 7 , [ " Sun Version " ] ) ,
( " Moon " , 7 , [ " Moon Version " ] ) ,
( " Ultra Sun " , 7 , [ " Ultra Sun Version " , " Ultra-Sun " ] ) ,
( " Ultra Moon " , 7 , [ " Ultra Moon Version " , " Ultra-Moon " ] ) ,
( " Let ' s Go Pikachu " , 7 , [ " Let ' s Go, Pikachu! " , " Lets Go Pikachu " ] ) ,
( " Let ' s Go Eevee " , 7 , [ " Let ' s Go, Eevee! " , " Lets Go Eevee " ] ) ,
( " Sword " , 8 , [ " Sword Version " ] ) ,
( " Shield " , 8 , [ " Shield Version " ] ) ,
( " Expansion Pass " , 8 , [ " Expansion Pass (Sword) " , " Expansion Pass (Shield) " ] ) ,
( " Brilliant Diamond " , 8 , [ " Brilliant Diamond Version " , " Brilliant-Diamond " ] ) ,
( " Shining Pearl " , 8 , [ " Shining Pearl Version " , " Shining-Pearl " ] ) ,
( " Legends Arceus " , 8 , [ " Legends: Arceus " , " Legends-Arceus " ] ) ,
( " Scarlet " , 9 , [ " Scarlet Version " ] ) ,
( " Violet " , 9 , [ " Violet Version " ] ) ,
( " The Teal Mask " , 9 , [ " The Teal Mask Version " , " The Teal Mask (Scarlet) " , " The Teal Mask (Violet) " ] ) ,
( " The Hidden Treasure of Area Zero " , 9 , [ " The Hidden Treasure of Area Zero Version " , " The Hidden Treasure of Area Zero (Scarlet) " , " The Hidden Treasure of Area Zero (Violet) " ] ) ,
( " Pokémon Home " , 98 , [ " Pokémon HOME " ] ) ,
( " Pokémon Go " , 99 , [ " Pokémon GO " ] ) ,
]
for game in games :
cursor . execute ( '''
INSERT OR IGNORE INTO games ( name , generation )
VALUES ( ? , ? )
''' , (game[0], game[1]))
game_id = cursor . lastrowid
# Insert alternate names
for alt_name in game [ 2 ] :
cursor . execute ( '''
INSERT OR IGNORE INTO alternate_game_names ( game_id , alternate_name )
VALUES ( ? , ? )
''' , (game_id, alt_name))
def load_pokemon_details ( self , pfic ) :
self . cursor . execute ( '''
SELECT pf . name , pf . form_name , pf . national_dex , pf . generation , ps . storable_in_home , pf . is_baby_form
FROM pokemon_forms pf
LEFT JOIN pokemon_storage ps ON pf . PFIC = ps . PFIC
WHERE pf . PFIC = ?
''' , (pfic,))
return self . cursor . fetchone ( )
def send_pokemon_list ( self , data ) :
# Fetch pokemon list from database
self . cursor . execute ( '''
SELECT pf . PFIC , pf . name , pf . form_name , pf . national_dex
FROM pokemon_forms pf
''' )
pokemon_data = self . cursor . fetchall ( )
# Sort the pokemon_data based on PFIC
pokemon_data . sort ( key = lambda x : parse_pfic ( x [ 0 ] ) )
return pokemon_data
def get_home_storable ( self , pfic ) :
self . cursor . execute ( ' SELECT storable_in_home FROM pokemon_storage WHERE PFIC = ? ' , ( pfic , ) )
result = self . cursor . fetchone ( )
home_storable = result [ 0 ] if result else False
return home_storable
def get_evolution_chain ( self , pfic ) :
def follow_chain ( start_pfic , visited = None ) :
if visited is None :
visited = set ( )
chain = [ ]
stack = [ ( start_pfic , None ) ]
while stack :
current_pfic , method = stack . pop ( )
if current_pfic in visited :
continue
visited . add ( current_pfic )
self . cursor . execute ( ' SELECT name, form_name FROM pokemon_forms WHERE PFIC = ? ' , ( current_pfic , ) )
name , form_name = self . cursor . fetchone ( )
chain . append ( ( current_pfic , name , form_name , method ) )
# Get previous evolutions
self . cursor . execute ( ' SELECT from_pfic, method FROM evolution_chains WHERE to_pfic = ? ' , ( current_pfic , ) )
prev_evolutions = self . cursor . fetchall ( )
for prev_pfic , prev_method in prev_evolutions :
if prev_pfic not in visited :
stack . append ( ( prev_pfic , prev_method ) )
# Get next evolutions
self . cursor . execute ( ' SELECT to_pfic, method FROM evolution_chains WHERE from_pfic = ? ' , ( current_pfic , ) )
next_evolutions = self . cursor . fetchall ( )
for next_pfic , next_method in next_evolutions :
if next_pfic not in visited :
stack . append ( ( next_pfic , next_method ) )
return chain
return follow_chain ( pfic )
def get_evolution_parent ( self , pfic ) :
self . cursor . execute ( ' SELECT from_pfic FROM evolution_chains WHERE to_pfic = ? ' , ( pfic , ) )
return self . cursor . fetchone ( )
def get_encounter_locations ( self , pfic ) :
self . cursor . execute ( '''
SELECT g . name , e . location , e . day , e . time , e . dual_slot , e . static_encounter_count , e . static_encounter , e . extra_text , e . stars , e . rods , e . fishing
FROM encounters e
JOIN games g ON e . game_id = g . id
WHERE e . pfic = ?
ORDER BY g . name , e . location
''' , (pfic,))
return self . cursor . fetchall ( )
def get_game_generation ( self , game ) :
self . cursor . execute ( '''
SELECT g . generation
FROM games g
LEFT JOIN alternate_game_names agn ON g . id = agn . game_id
WHERE g . name = ? OR agn . alternate_name = ?
LIMIT 1
''' , (game, game))
result = self . cursor . fetchone ( )
return result [ 0 ] if result else None
def get_game_id_for_name ( self , game ) :
# First, try to find the game in the main games table
self . cursor . execute ( '''
SELECT id
FROM games
WHERE name = ?
''' , (game,))
result = self . cursor . fetchone ( )
if result :
return result [ 0 ]
# If not found, check the alternate_game_names table
self . cursor . execute ( '''
SELECT game_id
FROM alternate_game_names
WHERE alternate_name = ?
''' , (game,))
result = self . cursor . fetchone ( )
return result [ 0 ] if result else None
def get_pokemon_with_encounters ( self , data ) :
self . cursor . execute ( '''
SELECT DISTINCT pfic
FROM encounters
''' )
pokemon_with_encounters = set ( row [ 0 ] for row in self . cursor . fetchall ( ) )
return pokemon_with_encounters
def refresh_in_memory_db ( self , temp_conn ) :
temp_conn . backup ( self . conn )
def clear_encounters_for_pokemon ( self , pfic ) :
self . cursor . execute ( ' DELETE FROM encounters WHERE pfic = ? ' , ( pfic , ) )
self . conn . commit ( )
def get_games_list ( self , data ) :
self . cursor . execute ( ' SELECT id, name FROM games ' )
return self . cursor . fetchall ( )
def get_exclusive_encounter_sets ( self , data ) :
self . cursor . execute ( ' SELECT id, group_name FROM exclusive_encounter_groups ' )
return self . cursor . fetchall ( )
def get_exclusive_set_details ( self , set_id ) :
self . cursor . execute ( ' SELECT group_name, description, game_id FROM exclusive_encounter_groups WHERE id = ? ' , ( set_id , ) )
return self . cursor . fetchone ( )
def get_set_encounters ( self , set_id ) :
self . cursor . execute ( '''
SELECT e . id , e . pfic , e . location , pf . name , pf . form_name
FROM encounters e
JOIN encounter_exclusive_groups eeg ON e . id = eeg . encounter_id
JOIN pokemon_forms pf ON e . pfic = pf . PFIC
WHERE eeg . group_id = ?
''' , (set_id,))
return self . cursor . fetchall ( )
def delete_encounter_from_set ( self , set_id , encounter_id ) :
self . cursor . execute ( ' DELETE FROM encounter_exclusive_groups WHERE group_id = ? AND encounter_id = ? ' , ( set_id , encounter_id ) )
self . conn . commit ( )
def add_encounter_to_set ( self , data ) :
set_id , encounter_id = data
self . cursor . execute ( ' INSERT INTO encounter_exclusive_groups (group_id, encounter_id) VALUES (?, ?) ' , ( set_id , encounter_id ) )
self . conn . commit ( )
def add_new_exclusive_set ( self , data ) :
set_name , description , game_id = data
self . cursor . execute ( ' INSERT INTO exclusive_encounter_groups (group_name, description, game_id) VALUES (?, ?, ?) ' , ( set_name , description , game_id ) )
self . conn . commit ( )
def get_available_encounters ( self , data ) :
game_id = data
self . cursor . execute ( '''
SELECT e . id , e . pfic , e . location , pf . name , pf . form_name
FROM encounters e
JOIN games g ON e . game_id = g . id
JOIN pokemon_forms pf ON e . pfic = pf . PFIC
WHERE g . id = ?
''' , (game_id,))
return self . cursor . fetchall ( )