You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
630 lines
25 KiB
630 lines
25 KiB
import sqlite3
|
|
import sys
|
|
import os
|
|
import threading
|
|
from queue import Queue
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from event_system import event_system
|
|
from db_classes import PokemonForm, EvolveEncounter
|
|
|
|
def parse_pfic(pfic):
|
|
parts = pfic.split('-')
|
|
return tuple(int(part) if part.isdigit() else part for part in parts)
|
|
|
|
class DBController:
|
|
def __init__(self, db_path=':memory:', max_connections=10):
|
|
self.db_path = db_path
|
|
self.connection_pool = Queue(maxsize=max_connections)
|
|
self.lock = threading.Lock()
|
|
|
|
for _ in range(max_connections):
|
|
conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
self.connection_pool.put(conn)
|
|
|
|
self.executor = ThreadPoolExecutor(max_workers=max_connections)
|
|
|
|
self.conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
self.cursor = self.conn.cursor()
|
|
|
|
event_system.add_listener('get_pokemon_data', self.load_pokemon_details)
|
|
event_system.add_listener('get_pokemon_list', self.send_pokemon_list)
|
|
event_system.add_listener('get_home_storable', self.get_home_storable)
|
|
event_system.add_listener('get_evolution_chain', self.get_evolution_chain)
|
|
event_system.add_listener('get_evolution_parent', self.get_evolution_parent)
|
|
event_system.add_listener('get_encounter_locations', self.get_encounter_locations)
|
|
event_system.add_listener('get_game_generation', self.get_game_generation)
|
|
event_system.add_listener('get_pokemon_with_encounters', self.get_pokemon_with_encounters)
|
|
event_system.add_listener('refresh_in_memory_db', self.refresh_in_memory_db)
|
|
event_system.add_listener('clear_encounters_for_pokemon', self.clear_encounters_for_pokemon)
|
|
event_system.add_listener('get_game_id_for_name', self.get_game_id_for_name)
|
|
event_system.add_listener('get_exclusive_encounter_sets', self.get_exclusive_encounter_sets)
|
|
event_system.add_listener('get_exclusive_set_details', self.get_exclusive_set_details)
|
|
event_system.add_listener('get_set_encounters', self.get_set_encounters)
|
|
event_system.add_listener('delete_encounter_from_set', self.delete_encounter_from_set)
|
|
event_system.add_listener('get_available_encounters', self.get_available_encounters)
|
|
event_system.add_listener('add_encounter_to_set', self.add_encounter_to_set)
|
|
event_system.add_listener('get_games_list', self.get_games_list)
|
|
event_system.add_listener('add_new_exclusive_set', self.add_new_exclusive_set)
|
|
event_system.add_listener('save_changes', self.save_changes)
|
|
event_system.add_listener('export_database', self.export_database)
|
|
event_system.add_listener('assign_mark_to_form', self.assign_mark_to_form)
|
|
event_system.add_listener('get_mark_for_game', self.get_mark_for_game)
|
|
event_system.add_listener('get_mark_for_game_name', self.get_mark_for_game_name)
|
|
event_system.add_listener('get_mark_details', self.get_mark_details)
|
|
event_system.add_listener('get_evolve_encounters', self.get_evolve_encounters)
|
|
event_system.add_listener('get_game_by_id', self.get_game_by_id)
|
|
event_system.add_listener('get_pokemon_form_by_pfic', self.get_pokemon_form_by_pfic)
|
|
event_system.add_listener('find_default_form', self.find_default_form)
|
|
|
|
def init_database(self):
|
|
disk_conn = sqlite3.connect('pokemon_forms.db')
|
|
disk_cursor = disk_conn.cursor()
|
|
|
|
# Create tables in the file-based database
|
|
self.create_games_table(disk_cursor)
|
|
self.create_pokemon_forms_table(disk_cursor)
|
|
self.create_pokemon_storage_table(disk_cursor)
|
|
self.create_evolution_chains_table(disk_cursor)
|
|
self.create_exclusive_encounter_groups_table(disk_cursor)
|
|
self.create_encounter_exclusive_group_table(disk_cursor)
|
|
self.create_encounters_table(disk_cursor)
|
|
self.create_mark_table(disk_cursor)
|
|
self.create_form_marks_table(disk_cursor)
|
|
self.create_evolve_encounter_table(disk_cursor)
|
|
|
|
# Commit changes to the file-based database
|
|
disk_conn.commit()
|
|
|
|
# Copy the file-based database to the in-memory database
|
|
disk_conn.backup(self.conn)
|
|
|
|
temp_connections = []
|
|
for _ in range(self.connection_pool.qsize()):
|
|
conn = self.connection_pool.get()
|
|
disk_conn.backup(conn)
|
|
temp_connections.append(conn)
|
|
|
|
# Put all connections back into the pool
|
|
for conn in temp_connections:
|
|
self.connection_pool.put(conn)
|
|
|
|
# Close the file-based database connection
|
|
disk_conn.close()
|
|
|
|
def create_pokemon_forms_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS pokemon_forms (
|
|
PFIC TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
form_name TEXT,
|
|
national_dex INTEGER NOT NULL,
|
|
generation INTEGER NOT NULL,
|
|
is_baby_form BOOLEAN
|
|
)
|
|
''')
|
|
|
|
def create_pokemon_storage_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS pokemon_storage (
|
|
PFIC TEXT PRIMARY KEY,
|
|
storable_in_home BOOLEAN NOT NULL,
|
|
FOREIGN KEY (PFIC) REFERENCES pokemon_forms (PFIC)
|
|
)
|
|
''')
|
|
|
|
def create_evolution_chains_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS evolution_chains (
|
|
from_pfic TEXT,
|
|
to_pfic TEXT,
|
|
method TEXT,
|
|
PRIMARY KEY (from_pfic, to_pfic),
|
|
FOREIGN KEY (from_pfic) REFERENCES pokemon_forms (PFIC),
|
|
FOREIGN KEY (to_pfic) REFERENCES pokemon_forms (PFIC)
|
|
)
|
|
''')
|
|
|
|
def create_exclusive_encounter_groups_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS exclusive_encounter_groups (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
group_name TEXT NOT NULL,
|
|
description TEXT,
|
|
game_id INTEGER,
|
|
FOREIGN KEY (game_id) REFERENCES games (id)
|
|
)
|
|
''')
|
|
|
|
def create_encounter_exclusive_group_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS encounter_exclusive_groups (
|
|
encounter_id INTEGER,
|
|
group_id INTEGER,
|
|
FOREIGN KEY (encounter_id) REFERENCES encounters (id),
|
|
FOREIGN KEY (group_id) REFERENCES exclusive_encounter_groups (id),
|
|
PRIMARY KEY (encounter_id, group_id)
|
|
)
|
|
''')
|
|
|
|
def create_encounters_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS encounters (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
pfic TEXT,
|
|
game_id INTEGER,
|
|
location TEXT,
|
|
day TEXT,
|
|
time TEXT,
|
|
dual_slot TEXT,
|
|
static_encounter BOOLEAN,
|
|
static_encounter_count INTEGER,
|
|
extra_text TEXT,
|
|
stars TEXT,
|
|
fishing BOOLEAN,
|
|
rods TEXT,
|
|
starter BOOLEAN,
|
|
FOREIGN KEY (pfic) REFERENCES pokemon_forms (PFIC),
|
|
FOREIGN KEY (game_id) REFERENCES games (id)
|
|
)
|
|
''')
|
|
|
|
def create_mark_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS marks (
|
|
id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL UNIQUE,
|
|
icon_path TEXT NOT NULL
|
|
)
|
|
''')
|
|
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS mark_game_associations (
|
|
mark_id INTEGER,
|
|
game_id INTEGER,
|
|
FOREIGN KEY (mark_id) REFERENCES marks (id),
|
|
FOREIGN KEY (game_id) REFERENCES games (id),
|
|
PRIMARY KEY (mark_id, game_id)
|
|
)
|
|
''')
|
|
|
|
marks = [
|
|
("Game Boy", "images/marks/GB_icon_HOME.png", ["Red", "Blue", "Yellow", "Gold", "Silver", "Crystal"]),
|
|
("Kalos", "images/marks/Blue_pentagon_HOME.png", ["X", "Y", "Omega Ruby", "Alpha Sapphire"]),
|
|
("Alola", "images/marks/Black_clover_HOME.png", ["Sun", "Moon", "Ultra Sun", "Ultra Moon"]),
|
|
("Let's Go", "images/marks/Let's_Go_icon_HOME.png", ["Let's Go Pikachu", "Let's Go Eevee"]),
|
|
("Galar", "images/marks/Galar_symbol_HOME.png", ["Sword", "Shield", "Expansion Pass"]),
|
|
("Sinnoh", "images/marks/BDSP_icon_HOME.png", ["Brilliant Diamond", "Shining Pearl"]),
|
|
("Hisui", "images/marks/Arceus_mark_HOME.png", ["Legends Arceus"]),
|
|
("Paldea", "images/marks/Paldea_icon_HOME.png", ["Scarlet", "Violet", "The Teal Mask", "The Hidden Treasure of Area Zero"]),
|
|
("Markless", "images/marks/Markless_icon_HOME.png", ["Ruby", "Sapphire", "Emerald", "FireRed", "LeafGreen", "Diamond", "Pearl", "Platinum", "HeartGold", "SoulSilver", "Black", "White", "Black 2", "White 2"]),
|
|
]
|
|
|
|
# Check if marks already exist
|
|
cursor.execute('SELECT COUNT(*) FROM marks')
|
|
marks_count = cursor.fetchone()[0]
|
|
|
|
|
|
for mark in marks:
|
|
mark_id = None
|
|
if marks_count == 0:
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO marks (name, icon_path)
|
|
VALUES (?, ?)
|
|
''', (mark[0], mark[1]))
|
|
|
|
mark_id = cursor.lastrowid
|
|
|
|
if mark_id == None:
|
|
cursor.execute('''
|
|
SELECT id FROM marks WHERE name = ?
|
|
''', (mark[0],))
|
|
mark_id = cursor.fetchone()[0]
|
|
|
|
for game_name in mark[2]:
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO mark_game_associations (mark_id, game_id)
|
|
SELECT ?, id FROM games WHERE name = ?
|
|
''', (mark_id, game_name))
|
|
|
|
def create_games_table(self, cursor):
|
|
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS games (
|
|
id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
generation INTEGER NOT NULL
|
|
)
|
|
''')
|
|
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS alternate_game_names (
|
|
id INTEGER PRIMARY KEY,
|
|
game_id INTEGER NOT NULL,
|
|
alternate_name TEXT NOT NULL,
|
|
FOREIGN KEY (game_id) REFERENCES games (id),
|
|
UNIQUE (alternate_name COLLATE NOCASE)
|
|
)
|
|
''')
|
|
|
|
games = [
|
|
("Red", 1, ["Red Version"]),
|
|
("Blue", 1, ["Blue Version"]),
|
|
("Yellow", 1, ["Yellow Version"]),
|
|
("Gold", 2, ["Gold Version"]),
|
|
("Silver", 2, ["Silver Version"]),
|
|
("Crystal", 2, ["Crystal Version"]),
|
|
("Ruby", 3, ["Ruby Version"]),
|
|
("Sapphire", 3, ["Sapphire Version"]),
|
|
("Emerald", 3, ["Emerald Version"]),
|
|
("FireRed", 3, ["Fire Red", "Fire-Red"]),
|
|
("LeafGreen", 3, ["Leaf Green", "Leaf-Green"]),
|
|
("Diamond", 4, ["Diamond Version"]),
|
|
("Pearl", 4, ["Pearl Version"]),
|
|
("Platinum", 4, ["Platinum Version"]),
|
|
("HeartGold", 4, ["Heart Gold", "Heart-Gold"]),
|
|
("SoulSilver", 4, ["Soul Silver", "Soul-Silver"]),
|
|
("Black", 5, ["Black Version"]),
|
|
("White", 5, ["White Version"]),
|
|
("Black 2", 5, ["Black Version 2", "Black-2"]),
|
|
("White 2", 5, ["White Version 2", "White-2"]),
|
|
("X", 6, ["X Version"]),
|
|
("Y", 6, ["Y Version"]),
|
|
("Omega Ruby", 6, ["Omega Ruby Version", "Omega-Ruby"]),
|
|
("Alpha Sapphire", 6, ["Alpha Sapphire Version", "Alpha-Sapphire"]),
|
|
("Sun", 7, ["Sun Version"]),
|
|
("Moon", 7, ["Moon Version"]),
|
|
("Ultra Sun", 7, ["Ultra Sun Version", "Ultra-Sun"]),
|
|
("Ultra Moon", 7, ["Ultra Moon Version", "Ultra-Moon"]),
|
|
("Let's Go Pikachu", 7, ["Let's Go, Pikachu!", "Lets Go Pikachu"]),
|
|
("Let's Go Eevee", 7, ["Let's Go, Eevee!", "Lets Go Eevee"]),
|
|
("Sword", 8, ["Sword Version"]),
|
|
("Shield", 8, ["Shield Version"]),
|
|
("Expansion Pass", 8, ["Expansion Pass (Sword)", "Expansion Pass (Shield)"]),
|
|
("Brilliant Diamond", 8, ["Brilliant Diamond Version", "Brilliant-Diamond"]),
|
|
("Shining Pearl", 8, ["Shining Pearl Version", "Shining-Pearl"]),
|
|
("Legends Arceus", 8, ["Legends: Arceus", "Legends-Arceus"]),
|
|
("Scarlet", 9, ["Scarlet Version"]),
|
|
("Violet", 9, ["Violet Version"]),
|
|
("The Teal Mask", 9, ["The Teal Mask Version", "The Teal Mask (Scarlet)", "The Teal Mask (Violet)"]),
|
|
("The Hidden Treasure of Area Zero", 9, ["The Hidden Treasure of Area Zero Version", "The Hidden Treasure of Area Zero (Scarlet)", "The Hidden Treasure of Area Zero (Violet)"]),
|
|
|
|
("Pokémon Home", 98, ["Pokémon HOME"]),
|
|
("Pokémon Go", 99, ["Pokémon GO"]),
|
|
]
|
|
|
|
# Check if marks already exist
|
|
cursor.execute('SELECT COUNT(*) FROM games')
|
|
games_count = cursor.fetchone()[0]
|
|
|
|
for game in games:
|
|
game_id = None
|
|
if games_count == 0:
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO games (name, generation)
|
|
VALUES (?, ?)
|
|
''', (game[0], game[1]))
|
|
|
|
game_id = cursor.lastrowid
|
|
|
|
if game_id == None:
|
|
cursor.execute('''
|
|
SELECT id FROM games WHERE name = ?
|
|
''', (game[0],))
|
|
game_id = cursor.fetchone()[0]
|
|
|
|
# Insert alternate names
|
|
for alt_name in game[2]:
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO alternate_game_names (game_id, alternate_name)
|
|
VALUES (?, ?)
|
|
''', (game_id, alt_name))
|
|
|
|
def create_form_marks_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS form_marks (
|
|
pfic TEXT,
|
|
mark_id INTEGER,
|
|
FOREIGN KEY (pfic) REFERENCES pokemon_forms (PFIC),
|
|
FOREIGN KEY (mark_id) REFERENCES marks (id),
|
|
PRIMARY KEY (pfic, mark_id)
|
|
)
|
|
''')
|
|
|
|
def create_evolve_encounter_table(self, cursor):
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS evolve_encounters (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
pfic TEXT,
|
|
game_id INTEGER,
|
|
day TEXT,
|
|
time TEXT,
|
|
from_pfic TEXT,
|
|
FOREIGN KEY (pfic) REFERENCES pokemon_forms (PFIC),
|
|
FOREIGN KEY (game_id) REFERENCES games (id),
|
|
FOREIGN KEY (from_pfic) REFERENCES pokemon_forms (PFIC)
|
|
)
|
|
''')
|
|
|
|
def load_pokemon_details(self, pfic):
|
|
self.cursor.execute('''
|
|
SELECT pf.name, pf.form_name, pf.national_dex, pf.generation, ps.storable_in_home, pf.is_baby_form
|
|
FROM pokemon_forms pf
|
|
LEFT JOIN pokemon_storage ps ON pf.PFIC = ps.PFIC
|
|
WHERE pf.PFIC = ?
|
|
''', (pfic,))
|
|
|
|
return self.cursor.fetchone()
|
|
|
|
def send_pokemon_list(self, data):
|
|
# Fetch pokemon list from database
|
|
self.cursor.execute('''
|
|
SELECT pf.PFIC, pf.name, pf.form_name, pf.national_dex
|
|
FROM pokemon_forms pf
|
|
''')
|
|
pokemon_data = self.cursor.fetchall()
|
|
|
|
# Sort the pokemon_data based on PFIC
|
|
pokemon_data.sort(key=lambda x: parse_pfic(x[0]))
|
|
return pokemon_data
|
|
|
|
def get_home_storable(self, pfic):
|
|
self.cursor.execute('SELECT storable_in_home FROM pokemon_storage WHERE PFIC = ?', (pfic,))
|
|
result = self.cursor.fetchone()
|
|
home_storable = result[0] if result else False
|
|
return home_storable
|
|
|
|
def get_evolution_chain(self, pfic):
|
|
def follow_chain(start_pfic, visited=None):
|
|
if visited is None:
|
|
visited = set()
|
|
|
|
chain = []
|
|
stack = [(start_pfic, None)]
|
|
|
|
while stack:
|
|
current_pfic, method = stack.pop()
|
|
if current_pfic in visited:
|
|
continue
|
|
|
|
visited.add(current_pfic)
|
|
self.cursor.execute('SELECT name, form_name FROM pokemon_forms WHERE PFIC = ?', (current_pfic,))
|
|
name, form_name = self.cursor.fetchone()
|
|
|
|
chain.append((current_pfic, name, form_name, method))
|
|
|
|
# Get previous evolutions
|
|
self.cursor.execute('SELECT from_pfic, method FROM evolution_chains WHERE to_pfic = ?', (current_pfic,))
|
|
prev_evolutions = self.cursor.fetchall()
|
|
for prev_pfic, prev_method in prev_evolutions:
|
|
if prev_pfic not in visited:
|
|
stack.append((prev_pfic, prev_method))
|
|
|
|
# Get next evolutions
|
|
self.cursor.execute('SELECT to_pfic, method FROM evolution_chains WHERE from_pfic = ?', (current_pfic,))
|
|
next_evolutions = self.cursor.fetchall()
|
|
for next_pfic, next_method in next_evolutions:
|
|
if next_pfic not in visited:
|
|
stack.append((next_pfic, next_method))
|
|
|
|
return chain
|
|
|
|
return follow_chain(pfic)
|
|
|
|
def get_evolution_parent(self, pfic):
|
|
self.cursor.execute('SELECT from_pfic FROM evolution_chains WHERE to_pfic = ?', (pfic,))
|
|
return self.cursor.fetchone()
|
|
|
|
def get_encounter_locations(self, pfic):
|
|
self.cursor.execute('''
|
|
SELECT g.name, e.location, e.day, e.time, e.dual_slot, e.static_encounter_count, e.static_encounter, e.extra_text, e.stars, e.rods, e.fishing
|
|
FROM encounters e
|
|
JOIN games g ON e.game_id = g.id
|
|
WHERE e.pfic = ?
|
|
ORDER BY g.name, e.location
|
|
''', (pfic,))
|
|
return self.cursor.fetchall()
|
|
|
|
def get_game_generation(self, game):
|
|
self.cursor.execute('''
|
|
SELECT g.generation
|
|
FROM games g
|
|
LEFT JOIN alternate_game_names agn ON g.id = agn.game_id
|
|
WHERE g.name = ? OR agn.alternate_name = ?
|
|
LIMIT 1
|
|
''', (game, game))
|
|
result = self.cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def get_game_id_for_name(self, game_name):
|
|
query = "SELECT id FROM games WHERE name = ?"
|
|
result = self.execute_query(query, (game_name,))
|
|
if result:
|
|
return result[0][0]
|
|
|
|
# If not found in main table, check alternate_game_names
|
|
query = """
|
|
SELECT g.id
|
|
FROM games g
|
|
JOIN alternate_game_names agn ON g.id = agn.game_id
|
|
WHERE agn.alternate_name = ?
|
|
"""
|
|
result = self.execute_query(query, (game_name,))
|
|
return result[0][0] if result else None
|
|
|
|
def get_pokemon_with_encounters(self, data):
|
|
self.cursor.execute('''
|
|
SELECT DISTINCT pfic
|
|
FROM encounters
|
|
''')
|
|
pokemon_with_encounters = set(row[0] for row in self.cursor.fetchall())
|
|
return pokemon_with_encounters
|
|
|
|
def refresh_in_memory_db(self, temp_conn):
|
|
temp_conn.backup(self.conn)
|
|
|
|
def clear_encounters_for_pokemon(self, pfic):
|
|
self.cursor.execute('DELETE FROM encounters WHERE pfic = ?', (pfic,))
|
|
self.conn.commit()
|
|
|
|
def get_games_list(self, data):
|
|
self.cursor.execute('SELECT id, name FROM games')
|
|
return self.cursor.fetchall()
|
|
|
|
def get_exclusive_encounter_sets(self, data):
|
|
self.cursor.execute('SELECT id, group_name FROM exclusive_encounter_groups')
|
|
return self.cursor.fetchall()
|
|
|
|
def get_exclusive_set_details(self, set_id):
|
|
self.cursor.execute('SELECT group_name, description, game_id FROM exclusive_encounter_groups WHERE id = ?', (set_id,))
|
|
return self.cursor.fetchone()
|
|
|
|
def get_set_encounters(self, set_id):
|
|
self.cursor.execute('''
|
|
SELECT e.id, e.pfic, e.location, pf.name, pf.form_name
|
|
FROM encounters e
|
|
JOIN encounter_exclusive_groups eeg ON e.id = eeg.encounter_id
|
|
JOIN pokemon_forms pf ON e.pfic = pf.PFIC
|
|
WHERE eeg.group_id = ?
|
|
''', (set_id,))
|
|
return self.cursor.fetchall()
|
|
|
|
def delete_encounter_from_set(self, set_id, encounter_id):
|
|
self.cursor.execute('DELETE FROM encounter_exclusive_groups WHERE group_id = ? AND encounter_id = ?', (set_id, encounter_id))
|
|
self.conn.commit()
|
|
|
|
def add_encounter_to_set(self, data):
|
|
set_id, encounter_id = data
|
|
self.cursor.execute('INSERT INTO encounter_exclusive_groups (group_id, encounter_id) VALUES (?, ?)', (set_id, encounter_id))
|
|
self.conn.commit()
|
|
|
|
def add_new_exclusive_set(self, data):
|
|
set_name, description, game_id = data
|
|
self.cursor.execute('INSERT INTO exclusive_encounter_groups (group_name, description, game_id) VALUES (?, ?, ?)', (set_name, description, game_id))
|
|
self.conn.commit()
|
|
|
|
def get_available_encounters(self, data):
|
|
game_id = data
|
|
self.cursor.execute('''
|
|
SELECT e.id, e.pfic, e.location, pf.name, pf.form_name
|
|
FROM encounters e
|
|
JOIN games g ON e.game_id = g.id
|
|
JOIN pokemon_forms pf ON e.pfic = pf.PFIC
|
|
WHERE g.id = ?
|
|
''', (game_id,))
|
|
return self.cursor.fetchall()
|
|
|
|
def save_changes(self, data):
|
|
disk_conn = sqlite3.connect('pokemon_forms.db')
|
|
self.conn.backup(disk_conn)
|
|
disk_conn.close()
|
|
|
|
def export_database(self, data):
|
|
export_conn = sqlite3.connect('pokemon_forms_production.db')
|
|
self.conn.backup(export_conn)
|
|
export_conn.close()
|
|
|
|
def assign_mark_to_form(self, data):
|
|
pfic, mark_id = data
|
|
self.cursor.execute('''
|
|
INSERT OR REPLACE INTO form_marks (pfic, mark_id)
|
|
VALUES (?, ?)
|
|
''', (pfic, mark_id))
|
|
self.conn.commit()
|
|
|
|
def get_mark_for_game(self, data):
|
|
game_id = data
|
|
self.cursor.execute('SELECT mark_id FROM mark_game_associations WHERE game_id = ?', (game_id,))
|
|
result = self.cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def get_mark_for_game_name(self, data):
|
|
game_name = data
|
|
self.cursor.execute('''
|
|
SELECT m.id
|
|
FROM marks m
|
|
JOIN mark_game_associations mga ON m.id = mga.mark_id
|
|
JOIN games g ON mga.game_id = g.id
|
|
WHERE g.name = ?
|
|
''', (game_name,))
|
|
result = self.cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def get_mark_details(self, data):
|
|
mark_id = data
|
|
self.cursor.execute('SELECT name, icon_path FROM marks WHERE id = ?', (mark_id,))
|
|
return self.cursor.fetchone()
|
|
|
|
def get_evolve_encounters(self, data):
|
|
pfic = data
|
|
self.cursor.execute('SELECT * FROM evolve_encounters WHERE pfic = ?', (pfic,))
|
|
evolve_encounters = self.cursor.fetchall()
|
|
return [EvolveEncounter(encounter[1], encounter[2], encounter[3], encounter[4], encounter[5]) for encounter in evolve_encounters]
|
|
|
|
def get_game_by_id(self, data):
|
|
game_id = data
|
|
self.cursor.execute('SELECT * FROM games WHERE id = ?', (game_id,))
|
|
result = self.cursor.fetchone()
|
|
return result
|
|
|
|
def get_pokemon_form_by_pfic(self, data) -> PokemonForm:
|
|
pfic = data
|
|
self.cursor.execute('''
|
|
SELECT pf.name, pf.form_name, pf.national_dex, pf.generation, ps.storable_in_home, pf.is_baby_form
|
|
FROM pokemon_forms pf
|
|
LEFT JOIN pokemon_storage ps ON pf.PFIC = ps.PFIC
|
|
WHERE pf.PFIC = ?
|
|
''', (pfic,))
|
|
result = self.cursor.fetchone()
|
|
return PokemonForm(pfic, result[0], result[1], result[2], result[3], result[5], result[4])
|
|
|
|
def find_default_form(self, data):
|
|
search_pfic = data
|
|
self.cursor.execute('''
|
|
SELECT pf.PFIC, pf.name, pf.form_name, pf.national_dex, pf.generation, ps.storable_in_home, pf.is_baby_form
|
|
FROM pokemon_forms pf
|
|
LEFT JOIN pokemon_storage ps ON pf.PFIC = ps.PFIC
|
|
WHERE pf.PFIC LIKE ?
|
|
''', (search_pfic + "%",))
|
|
results = self.cursor.fetchall()
|
|
return [PokemonForm(result[0], result[1], result[2], result[3], result[4], result[6], result[5]) for result in results]
|
|
|
|
def get_connection(self):
|
|
return self.connection_pool.get()
|
|
|
|
def release_connection(self, conn):
|
|
self.connection_pool.put(conn)
|
|
|
|
def execute_query_with_commit(self, query, params=None):
|
|
conn = self.get_connection()
|
|
try:
|
|
with conn:
|
|
cursor = conn.cursor()
|
|
if params:
|
|
cursor.execute(query, params)
|
|
else:
|
|
cursor.execute(query)
|
|
conn.commit()
|
|
return cursor.fetchall()
|
|
finally:
|
|
self.release_connection(conn)
|
|
|
|
def execute_query(self, query, params=None):
|
|
conn = self.get_connection()
|
|
try:
|
|
with conn:
|
|
cursor = conn.cursor()
|
|
if params:
|
|
cursor.execute(query, params)
|
|
else:
|
|
cursor.execute(query)
|
|
return cursor.fetchall()
|
|
finally:
|
|
self.release_connection(conn)
|
|
|
|
def close(self):
|
|
while not self.connection_pool.empty():
|
|
conn = self.connection_pool.get()
|
|
conn.close()
|
|
self.executor.shutdown()
|
|
|
|
|