import sqlite3
import sys
import os
import threading
from queue import Queue
from concurrent . futures import ThreadPoolExecutor
from event_system import event_system
from db_classes import PokemonForm , EvolveEncounter
def parse_pfic ( pfic ) :
parts = pfic . split ( ' - ' )
return tuple ( int ( part ) if part . isdigit ( ) else part for part in parts )
class DBController :
def __init__ ( self , db_path = ' :memory: ' , max_connections = 10 ) :
self . db_path = db_path
self . connection_pool = Queue ( maxsize = max_connections )
self . lock = threading . Lock ( )
for _ in range ( max_connections ) :
conn = sqlite3 . connect ( db_path , check_same_thread = False )
self . connection_pool . put ( conn )
self . executor = ThreadPoolExecutor ( max_workers = max_connections )
self . conn = sqlite3 . connect ( db_path , check_same_thread = False )
self . cursor = self . conn . cursor ( )
event_system . add_listener ( ' get_pokemon_data ' , self . load_pokemon_details )
event_system . add_listener ( ' get_pokemon_list ' , self . send_pokemon_list )
event_system . add_listener ( ' get_home_storable ' , self . get_home_storable )
event_system . add_listener ( ' get_evolution_chain ' , self . get_evolution_chain )
event_system . add_listener ( ' get_evolution_parent ' , self . get_evolution_parent )
event_system . add_listener ( ' get_encounter_locations ' , self . get_encounter_locations )
event_system . add_listener ( ' get_game_generation ' , self . get_game_generation )
event_system . add_listener ( ' get_pokemon_with_encounters ' , self . get_pokemon_with_encounters )
event_system . add_listener ( ' refresh_in_memory_db ' , self . refresh_in_memory_db )
event_system . add_listener ( ' clear_encounters_for_pokemon ' , self . clear_encounters_for_pokemon )
event_system . add_listener ( ' get_game_id_for_name ' , self . get_game_id_for_name )
event_system . add_listener ( ' get_exclusive_encounter_sets ' , self . get_exclusive_encounter_sets )
event_system . add_listener ( ' get_exclusive_set_details ' , self . get_exclusive_set_details )
event_system . add_listener ( ' get_set_encounters ' , self . get_set_encounters )
event_system . add_listener ( ' delete_encounter_from_set ' , self . delete_encounter_from_set )
event_system . add_listener ( ' get_available_encounters ' , self . get_available_encounters )
event_system . add_listener ( ' add_encounter_to_set ' , self . add_encounter_to_set )
event_system . add_listener ( ' get_games_list ' , self . get_games_list )
event_system . add_listener ( ' add_new_exclusive_set ' , self . add_new_exclusive_set )
event_system . add_listener ( ' save_changes ' , self . save_changes )
event_system . add_listener ( ' export_database ' , self . export_database )
event_system . add_listener ( ' assign_mark_to_form ' , self . assign_mark_to_form )
event_system . add_listener ( ' get_mark_for_game ' , self . get_mark_for_game )
event_system . add_listener ( ' get_mark_for_game_name ' , self . get_mark_for_game_name )
event_system . add_listener ( ' get_mark_details ' , self . get_mark_details )
event_system . add_listener ( ' get_evolve_encounters ' , self . get_evolve_encounters )
event_system . add_listener ( ' get_game_by_id ' , self . get_game_by_id )
event_system . add_listener ( ' get_pokemon_form_by_pfic ' , self . get_pokemon_form_by_pfic )
event_system . add_listener ( ' find_default_form ' , self . find_default_form )
def init_database ( self ) :
disk_conn = sqlite3 . connect ( ' pokemon_forms.db ' )
disk_cursor = disk_conn . cursor ( )
# Create tables in the file-based database
self . create_games_table ( disk_cursor )
self . create_pokemon_forms_table ( disk_cursor )
self . create_pokemon_storage_table ( disk_cursor )
self . create_evolution_chains_table ( disk_cursor )
self . create_exclusive_encounter_groups_table ( disk_cursor )
self . create_encounter_exclusive_group_table ( disk_cursor )
self . create_encounters_table ( disk_cursor )
self . create_mark_table ( disk_cursor )
self . create_form_marks_table ( disk_cursor )
self . create_evolve_encounter_table ( disk_cursor )
# Commit changes to the file-based database
disk_conn . commit ( )
# Copy the file-based database to the in-memory database
disk_conn . backup ( self . conn )
temp_connections = [ ]
for _ in range ( self . connection_pool . qsize ( ) ) :
conn = self . connection_pool . get ( )
disk_conn . backup ( conn )
temp_connections . append ( conn )
# Put all connections back into the pool
for conn in temp_connections :
self . connection_pool . put ( conn )
# Close the file-based database connection
disk_conn . close ( )
def create_pokemon_forms_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS pokemon_forms (
PFIC TEXT PRIMARY KEY ,
name TEXT NOT NULL ,
form_name TEXT ,
national_dex INTEGER NOT NULL ,
generation INTEGER NOT NULL ,
is_baby_form BOOLEAN
)
''' )
def create_pokemon_storage_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS pokemon_storage (
PFIC TEXT PRIMARY KEY ,
storable_in_home BOOLEAN NOT NULL ,
FOREIGN KEY ( PFIC ) REFERENCES pokemon_forms ( PFIC )
)
''' )
def create_evolution_chains_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS evolution_chains (
from_pfic TEXT ,
to_pfic TEXT ,
method TEXT ,
PRIMARY KEY ( from_pfic , to_pfic ) ,
FOREIGN KEY ( from_pfic ) REFERENCES pokemon_forms ( PFIC ) ,
FOREIGN KEY ( to_pfic ) REFERENCES pokemon_forms ( PFIC )
)
''' )
def create_exclusive_encounter_groups_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS exclusive_encounter_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT ,
group_name TEXT NOT NULL ,
description TEXT ,
game_id INTEGER ,
FOREIGN KEY ( game_id ) REFERENCES games ( id )
)
''' )
def create_encounter_exclusive_group_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS encounter_exclusive_groups (
encounter_id INTEGER ,
group_id INTEGER ,
FOREIGN KEY ( encounter_id ) REFERENCES encounters ( id ) ,
FOREIGN KEY ( group_id ) REFERENCES exclusive_encounter_groups ( id ) ,
PRIMARY KEY ( encounter_id , group_id )
)
''' )
def create_encounters_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS encounters (
id INTEGER PRIMARY KEY AUTOINCREMENT ,
pfic TEXT ,
game_id INTEGER ,
location TEXT ,
day TEXT ,
time TEXT ,
dual_slot TEXT ,
static_encounter BOOLEAN ,
static_encounter_count INTEGER ,
extra_text TEXT ,
stars TEXT ,
fishing BOOLEAN ,
rods TEXT ,
starter BOOLEAN ,
FOREIGN KEY ( pfic ) REFERENCES pokemon_forms ( PFIC ) ,
FOREIGN KEY ( game_id ) REFERENCES games ( id )
)
''' )
def create_mark_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS marks (
id INTEGER PRIMARY KEY ,
name TEXT NOT NULL UNIQUE ,
icon_path TEXT NOT NULL
)
''' )
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS mark_game_associations (
mark_id INTEGER ,
game_id INTEGER ,
FOREIGN KEY ( mark_id ) REFERENCES marks ( id ) ,
FOREIGN KEY ( game_id ) REFERENCES games ( id ) ,
PRIMARY KEY ( mark_id , game_id )
)
''' )
marks = [
( " Game Boy " , " images/marks/GB_icon_HOME.png " , [ " Red " , " Blue " , " Yellow " , " Gold " , " Silver " , " Crystal " ] ) ,
( " Kalos " , " images/marks/Blue_pentagon_HOME.png " , [ " X " , " Y " , " Omega Ruby " , " Alpha Sapphire " ] ) ,
( " Alola " , " images/marks/Black_clover_HOME.png " , [ " Sun " , " Moon " , " Ultra Sun " , " Ultra Moon " ] ) ,
( " Let ' s Go " , " images/marks/Let ' s_Go_icon_HOME.png " , [ " Let ' s Go Pikachu " , " Let ' s Go Eevee " ] ) ,
( " Galar " , " images/marks/Galar_symbol_HOME.png " , [ " Sword " , " Shield " , " Expansion Pass " ] ) ,
( " Sinnoh " , " images/marks/BDSP_icon_HOME.png " , [ " Brilliant Diamond " , " Shining Pearl " ] ) ,
( " Hisui " , " images/marks/Arceus_mark_HOME.png " , [ " Legends Arceus " ] ) ,
( " Paldea " , " images/marks/Paldea_icon_HOME.png " , [ " Scarlet " , " Violet " , " The Teal Mask " , " The Hidden Treasure of Area Zero " ] ) ,
( " Markless " , " images/marks/Markless_icon_HOME.png " , [ " Ruby " , " Sapphire " , " Emerald " , " FireRed " , " LeafGreen " , " Diamond " , " Pearl " , " Platinum " , " HeartGold " , " SoulSilver " , " Black " , " White " , " Black 2 " , " White 2 " ] ) ,
]
# Check if marks already exist
cursor . execute ( ' SELECT COUNT(*) FROM marks ' )
marks_count = cursor . fetchone ( ) [ 0 ]
for mark in marks :
mark_id = None
if marks_count == 0 :
cursor . execute ( '''
INSERT OR IGNORE INTO marks ( name , icon_path )
VALUES ( ? , ? )
''' , (mark[0], mark[1]))
mark_id = cursor . lastrowid
if mark_id == None :
cursor . execute ( '''
SELECT id FROM marks WHERE name = ?
''' , (mark[0],))
mark_id = cursor . fetchone ( ) [ 0 ]
for game_name in mark [ 2 ] :
cursor . execute ( '''
INSERT OR IGNORE INTO mark_game_associations ( mark_id , game_id )
SELECT ? , id FROM games WHERE name = ?
''' , (mark_id, game_name))
def create_games_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS games (
id INTEGER PRIMARY KEY ,
name TEXT NOT NULL ,
generation INTEGER NOT NULL
)
''' )
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS alternate_game_names (
id INTEGER PRIMARY KEY ,
game_id INTEGER NOT NULL ,
alternate_name TEXT NOT NULL ,
FOREIGN KEY ( game_id ) REFERENCES games ( id ) ,
UNIQUE ( alternate_name COLLATE NOCASE )
)
''' )
games = [
( " Red " , 1 , [ " Red Version " ] ) ,
( " Blue " , 1 , [ " Blue Version " ] ) ,
( " Yellow " , 1 , [ " Yellow Version " ] ) ,
( " Gold " , 2 , [ " Gold Version " ] ) ,
( " Silver " , 2 , [ " Silver Version " ] ) ,
( " Crystal " , 2 , [ " Crystal Version " ] ) ,
( " Ruby " , 3 , [ " Ruby Version " ] ) ,
( " Sapphire " , 3 , [ " Sapphire Version " ] ) ,
( " Emerald " , 3 , [ " Emerald Version " ] ) ,
( " FireRed " , 3 , [ " Fire Red " , " Fire-Red " ] ) ,
( " LeafGreen " , 3 , [ " Leaf Green " , " Leaf-Green " ] ) ,
( " Diamond " , 4 , [ " Diamond Version " ] ) ,
( " Pearl " , 4 , [ " Pearl Version " ] ) ,
( " Platinum " , 4 , [ " Platinum Version " ] ) ,
( " HeartGold " , 4 , [ " Heart Gold " , " Heart-Gold " ] ) ,
( " SoulSilver " , 4 , [ " Soul Silver " , " Soul-Silver " ] ) ,
( " Black " , 5 , [ " Black Version " ] ) ,
( " White " , 5 , [ " White Version " ] ) ,
( " Black 2 " , 5 , [ " Black Version 2 " , " Black-2 " ] ) ,
( " White 2 " , 5 , [ " White Version 2 " , " White-2 " ] ) ,
( " X " , 6 , [ " X Version " ] ) ,
( " Y " , 6 , [ " Y Version " ] ) ,
( " Omega Ruby " , 6 , [ " Omega Ruby Version " , " Omega-Ruby " ] ) ,
( " Alpha Sapphire " , 6 , [ " Alpha Sapphire Version " , " Alpha-Sapphire " ] ) ,
( " Sun " , 7 , [ " Sun Version " ] ) ,
( " Moon " , 7 , [ " Moon Version " ] ) ,
( " Ultra Sun " , 7 , [ " Ultra Sun Version " , " Ultra-Sun " ] ) ,
( " Ultra Moon " , 7 , [ " Ultra Moon Version " , " Ultra-Moon " ] ) ,
( " Let ' s Go Pikachu " , 7 , [ " Let ' s Go, Pikachu! " , " Lets Go Pikachu " ] ) ,
( " Let ' s Go Eevee " , 7 , [ " Let ' s Go, Eevee! " , " Lets Go Eevee " ] ) ,
( " Sword " , 8 , [ " Sword Version " ] ) ,
( " Shield " , 8 , [ " Shield Version " ] ) ,
( " Expansion Pass " , 8 , [ " Expansion Pass (Sword) " , " Expansion Pass (Shield) " ] ) ,
( " Brilliant Diamond " , 8 , [ " Brilliant Diamond Version " , " Brilliant-Diamond " ] ) ,
( " Shining Pearl " , 8 , [ " Shining Pearl Version " , " Shining-Pearl " ] ) ,
( " Legends Arceus " , 8 , [ " Legends: Arceus " , " Legends-Arceus " ] ) ,
( " Scarlet " , 9 , [ " Scarlet Version " ] ) ,
( " Violet " , 9 , [ " Violet Version " ] ) ,
( " The Teal Mask " , 9 , [ " The Teal Mask Version " , " The Teal Mask (Scarlet) " , " The Teal Mask (Violet) " ] ) ,
( " The Hidden Treasure of Area Zero " , 9 , [ " The Hidden Treasure of Area Zero Version " , " The Hidden Treasure of Area Zero (Scarlet) " , " The Hidden Treasure of Area Zero (Violet) " ] ) ,
( " Pokémon Home " , 98 , [ " Pokémon HOME " ] ) ,
( " Pokémon Go " , 99 , [ " Pokémon GO " ] ) ,
]
# Check if marks already exist
cursor . execute ( ' SELECT COUNT(*) FROM games ' )
games_count = cursor . fetchone ( ) [ 0 ]
for game in games :
game_id = None
if games_count == 0 :
cursor . execute ( '''
INSERT OR IGNORE INTO games ( name , generation )
VALUES ( ? , ? )
''' , (game[0], game[1]))
game_id = cursor . lastrowid
if game_id == None :
cursor . execute ( '''
SELECT id FROM games WHERE name = ?
''' , (game[0],))
game_id = cursor . fetchone ( ) [ 0 ]
# Insert alternate names
for alt_name in game [ 2 ] :
cursor . execute ( '''
INSERT OR IGNORE INTO alternate_game_names ( game_id , alternate_name )
VALUES ( ? , ? )
''' , (game_id, alt_name))
def create_form_marks_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS form_marks (
pfic TEXT ,
mark_id INTEGER ,
FOREIGN KEY ( pfic ) REFERENCES pokemon_forms ( PFIC ) ,
FOREIGN KEY ( mark_id ) REFERENCES marks ( id ) ,
PRIMARY KEY ( pfic , mark_id )
)
''' )
def create_evolve_encounter_table ( self , cursor ) :
cursor . execute ( '''
CREATE TABLE IF NOT EXISTS evolve_encounters (
id INTEGER PRIMARY KEY AUTOINCREMENT ,
pfic TEXT ,
game_id INTEGER ,
day TEXT ,
time TEXT ,
from_pfic TEXT ,
FOREIGN KEY ( pfic ) REFERENCES pokemon_forms ( PFIC ) ,
FOREIGN KEY ( game_id ) REFERENCES games ( id ) ,
FOREIGN KEY ( from_pfic ) REFERENCES pokemon_forms ( PFIC )
)
''' )
def load_pokemon_details ( self , pfic ) :
self . cursor . execute ( '''
SELECT pf . name , pf . form_name , pf . national_dex , pf . generation , ps . storable_in_home , pf . is_baby_form
FROM pokemon_forms pf
LEFT JOIN pokemon_storage ps ON pf . PFIC = ps . PFIC
WHERE pf . PFIC = ?
''' , (pfic,))
return self . cursor . fetchone ( )
def send_pokemon_list ( self , data ) :
# Fetch pokemon list from database
self . cursor . execute ( '''
SELECT pf . PFIC , pf . name , pf . form_name , pf . national_dex
FROM pokemon_forms pf
''' )
pokemon_data = self . cursor . fetchall ( )
# Sort the pokemon_data based on PFIC
pokemon_data . sort ( key = lambda x : parse_pfic ( x [ 0 ] ) )
return pokemon_data
def get_home_storable ( self , pfic ) :
self . cursor . execute ( ' SELECT storable_in_home FROM pokemon_storage WHERE PFIC = ? ' , ( pfic , ) )
result = self . cursor . fetchone ( )
home_storable = result [ 0 ] if result else False
return home_storable
def get_evolution_chain ( self , pfic ) :
def follow_chain ( start_pfic , visited = None ) :
if visited is None :
visited = set ( )
chain = [ ]
stack = [ ( start_pfic , None ) ]
while stack :
current_pfic , method = stack . pop ( )
if current_pfic in visited :
continue
visited . add ( current_pfic )
self . cursor . execute ( ' SELECT name, form_name FROM pokemon_forms WHERE PFIC = ? ' , ( current_pfic , ) )
name , form_name = self . cursor . fetchone ( )
chain . append ( ( current_pfic , name , form_name , method ) )
# Get previous evolutions
self . cursor . execute ( ' SELECT from_pfic, method FROM evolution_chains WHERE to_pfic = ? ' , ( current_pfic , ) )
prev_evolutions = self . cursor . fetchall ( )
for prev_pfic , prev_method in prev_evolutions :
if prev_pfic not in visited :
stack . append ( ( prev_pfic , prev_method ) )
# Get next evolutions
self . cursor . execute ( ' SELECT to_pfic, method FROM evolution_chains WHERE from_pfic = ? ' , ( current_pfic , ) )
next_evolutions = self . cursor . fetchall ( )
for next_pfic , next_method in next_evolutions :
if next_pfic not in visited :
stack . append ( ( next_pfic , next_method ) )
return chain
return follow_chain ( pfic )
def get_evolution_parent ( self , pfic ) :
self . cursor . execute ( ' SELECT from_pfic FROM evolution_chains WHERE to_pfic = ? ' , ( pfic , ) )
return self . cursor . fetchone ( )
def get_encounter_locations ( self , pfic ) :
self . cursor . execute ( '''
SELECT g . name , e . location , e . day , e . time , e . dual_slot , e . static_encounter_count , e . static_encounter , e . extra_text , e . stars , e . rods , e . fishing
FROM encounters e
JOIN games g ON e . game_id = g . id
WHERE e . pfic = ?
ORDER BY g . name , e . location
''' , (pfic,))
return self . cursor . fetchall ( )
def get_game_generation ( self , game ) :
self . cursor . execute ( '''
SELECT g . generation
FROM games g
LEFT JOIN alternate_game_names agn ON g . id = agn . game_id
WHERE g . name = ? OR agn . alternate_name = ?
LIMIT 1
''' , (game, game))
result = self . cursor . fetchone ( )
return result [ 0 ] if result else None
def get_game_id_for_name ( self , game_name ) :
query = " SELECT id FROM games WHERE name = ? "
result = self . execute_query ( query , ( game_name , ) )
if result :
return result [ 0 ] [ 0 ]
# If not found in main table, check alternate_game_names
query = """
SELECT g . id
FROM games g
JOIN alternate_game_names agn ON g . id = agn . game_id
WHERE agn . alternate_name = ?
"""
result = self . execute_query ( query , ( game_name , ) )
return result [ 0 ] [ 0 ] if result else None
def get_pokemon_with_encounters ( self , data ) :
self . cursor . execute ( '''
SELECT DISTINCT pfic
FROM encounters
''' )
pokemon_with_encounters = set ( row [ 0 ] for row in self . cursor . fetchall ( ) )
return pokemon_with_encounters
def refresh_in_memory_db ( self , temp_conn ) :
temp_conn . backup ( self . conn )
def clear_encounters_for_pokemon ( self , pfic ) :
self . cursor . execute ( ' DELETE FROM encounters WHERE pfic = ? ' , ( pfic , ) )
self . conn . commit ( )
def get_games_list ( self , data ) :
self . cursor . execute ( ' SELECT id, name FROM games ' )
return self . cursor . fetchall ( )
def get_exclusive_encounter_sets ( self , data ) :
self . cursor . execute ( ' SELECT id, group_name FROM exclusive_encounter_groups ' )
return self . cursor . fetchall ( )
def get_exclusive_set_details ( self , set_id ) :
self . cursor . execute ( ' SELECT group_name, description, game_id FROM exclusive_encounter_groups WHERE id = ? ' , ( set_id , ) )
return self . cursor . fetchone ( )
def get_set_encounters ( self , set_id ) :
self . cursor . execute ( '''
SELECT e . id , e . pfic , e . location , pf . name , pf . form_name
FROM encounters e
JOIN encounter_exclusive_groups eeg ON e . id = eeg . encounter_id
JOIN pokemon_forms pf ON e . pfic = pf . PFIC
WHERE eeg . group_id = ?
''' , (set_id,))
return self . cursor . fetchall ( )
def delete_encounter_from_set ( self , set_id , encounter_id ) :
self . cursor . execute ( ' DELETE FROM encounter_exclusive_groups WHERE group_id = ? AND encounter_id = ? ' , ( set_id , encounter_id ) )
self . conn . commit ( )
def add_encounter_to_set ( self , data ) :
set_id , encounter_id = data
self . cursor . execute ( ' INSERT INTO encounter_exclusive_groups (group_id, encounter_id) VALUES (?, ?) ' , ( set_id , encounter_id ) )
self . conn . commit ( )
def add_new_exclusive_set ( self , data ) :
set_name , description , game_id = data
self . cursor . execute ( ' INSERT INTO exclusive_encounter_groups (group_name, description, game_id) VALUES (?, ?, ?) ' , ( set_name , description , game_id ) )
self . conn . commit ( )
def get_available_encounters ( self , data ) :
game_id = data
self . cursor . execute ( '''
SELECT e . id , e . pfic , e . location , pf . name , pf . form_name
FROM encounters e
JOIN games g ON e . game_id = g . id
JOIN pokemon_forms pf ON e . pfic = pf . PFIC
WHERE g . id = ?
''' , (game_id,))
return self . cursor . fetchall ( )
def save_changes ( self , data ) :
disk_conn = sqlite3 . connect ( ' pokemon_forms.db ' )
self . conn . backup ( disk_conn )
disk_conn . close ( )
def export_database ( self , data ) :
export_conn = sqlite3 . connect ( ' pokemon_forms_production.db ' )
self . conn . backup ( export_conn )
export_conn . close ( )
def assign_mark_to_form ( self , data ) :
pfic , mark_id = data
self . cursor . execute ( '''
INSERT OR REPLACE INTO form_marks ( pfic , mark_id )
VALUES ( ? , ? )
''' , (pfic, mark_id))
self . conn . commit ( )
def get_mark_for_game ( self , data ) :
game_id = data
self . cursor . execute ( ' SELECT mark_id FROM mark_game_associations WHERE game_id = ? ' , ( game_id , ) )
result = self . cursor . fetchone ( )
return result [ 0 ] if result else None
def get_mark_for_game_name ( self , data ) :
game_name = data
self . cursor . execute ( '''
SELECT m . id
FROM marks m
JOIN mark_game_associations mga ON m . id = mga . mark_id
JOIN games g ON mga . game_id = g . id
WHERE g . name = ?
''' , (game_name,))
result = self . cursor . fetchone ( )
return result [ 0 ] if result else None
def get_mark_details ( self , data ) :
mark_id = data
self . cursor . execute ( ' SELECT name, icon_path FROM marks WHERE id = ? ' , ( mark_id , ) )
return self . cursor . fetchone ( )
def get_evolve_encounters ( self , data ) :
pfic = data
self . cursor . execute ( ' SELECT * FROM evolve_encounters WHERE pfic = ? ' , ( pfic , ) )
evolve_encounters = self . cursor . fetchall ( )
return [ EvolveEncounter ( encounter [ 1 ] , encounter [ 2 ] , encounter [ 3 ] , encounter [ 4 ] , encounter [ 5 ] ) for encounter in evolve_encounters ]
def get_game_by_id ( self , data ) :
game_id = data
self . cursor . execute ( ' SELECT * FROM games WHERE id = ? ' , ( game_id , ) )
result = self . cursor . fetchone ( )
return result
def get_pokemon_form_by_pfic ( self , data ) - > PokemonForm :
pfic = data
self . cursor . execute ( '''
SELECT pf . name , pf . form_name , pf . national_dex , pf . generation , ps . storable_in_home , pf . is_baby_form
FROM pokemon_forms pf
LEFT JOIN pokemon_storage ps ON pf . PFIC = ps . PFIC
WHERE pf . PFIC = ?
''' , (pfic,))
result = self . cursor . fetchone ( )
return PokemonForm ( pfic , result [ 0 ] , result [ 1 ] , result [ 2 ] , result [ 3 ] , result [ 5 ] , result [ 4 ] )
def find_default_form ( self , data ) :
search_pfic = data
self . cursor . execute ( '''
SELECT pf . PFIC , pf . name , pf . form_name , pf . national_dex , pf . generation , ps . storable_in_home , pf . is_baby_form
FROM pokemon_forms pf
LEFT JOIN pokemon_storage ps ON pf . PFIC = ps . PFIC
WHERE pf . PFIC LIKE ?
''' , (search_pfic + " % " ,))
results = self . cursor . fetchall ( )
return [ PokemonForm ( result [ 0 ] , result [ 1 ] , result [ 2 ] , result [ 3 ] , result [ 4 ] , result [ 6 ] , result [ 5 ] ) for result in results ]
def get_connection ( self ) :
return self . connection_pool . get ( )
def release_connection ( self , conn ) :
self . connection_pool . put ( conn )
def execute_query_with_commit ( self , query , params = None ) :
conn = self . get_connection ( )
try :
with conn :
cursor = conn . cursor ( )
if params :
cursor . execute ( query , params )
else :
cursor . execute ( query )
conn . commit ( )
return cursor . fetchall ( )
finally :
self . release_connection ( conn )
def execute_query ( self , query , params = None ) :
conn = self . get_connection ( )
try :
with conn :
cursor = conn . cursor ( )
if params :
cursor . execute ( query , params )
else :
cursor . execute ( query )
return cursor . fetchall ( )
finally :
self . release_connection ( conn )
def close ( self ) :
while not self . connection_pool . empty ( ) :
conn = self . connection_pool . get ( )
conn . close ( )
self . executor . shutdown ( )
def build_query_string ( self , table_name , criteria ) :
conditions = [ ]
values = [ ]
query = f " SELECT * FROM { table_name } WHERE "
for column , value in criteria . items ( ) :
if value is None :
conditions . append ( f " ( { column } IS NULL OR { column } = ' ' ) " )
elif value == " " :
conditions . append ( f " { column } = ' ' " )
else :
conditions . append ( f " { column } = ? " )
values . append ( value )
return query + " AND " . join ( conditions ) , values