You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

694 lines
28 KiB

import sqlite3
import sys
import os
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from event_system import event_system
from db_classes import PokemonForm, EvolveEncounter
def parse_pfic(pfic):
parts = pfic.split('-')
return tuple(int(part) if part.isdigit() else part for part in parts)
class DBController:
def __init__(self, db_path=':memory:', max_connections=10):
self.db_path = db_path
self.connection_pool = Queue(maxsize=max_connections)
self.lock = threading.Lock()
for _ in range(max_connections):
conn = sqlite3.connect(db_path, check_same_thread=False)
self.connection_pool.put(conn)
self.executor = ThreadPoolExecutor(max_workers=max_connections)
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.cursor = self.conn.cursor()
event_system.add_listener('get_pokemon_data', self.load_pokemon_details)
event_system.add_listener('get_pokemon_list', self.send_pokemon_list)
event_system.add_listener('get_home_storable', self.get_home_storable)
event_system.add_listener('get_evolution_chain', self.get_evolution_chain)
event_system.add_listener('get_evolution_parent', self.get_evolution_parent)
event_system.add_listener('get_encounter_locations', self.get_encounter_locations)
event_system.add_listener('get_game_generation', self.get_game_generation)
event_system.add_listener('get_pokemon_with_encounters', self.get_pokemon_with_encounters)
event_system.add_listener('refresh_in_memory_db', self.refresh_in_memory_db)
event_system.add_listener('clear_encounters_for_pokemon', self.clear_encounters_for_pokemon)
event_system.add_listener('get_game_id_for_name', self.get_game_id_for_name)
event_system.add_listener('get_exclusive_encounter_sets', self.get_exclusive_encounter_sets)
event_system.add_listener('get_exclusive_set_details', self.get_exclusive_set_details)
event_system.add_listener('get_set_encounters', self.get_set_encounters)
event_system.add_listener('delete_encounter_from_set', self.delete_encounter_from_set)
event_system.add_listener('get_available_encounters', self.get_available_encounters)
event_system.add_listener('add_encounter_to_set', self.add_encounter_to_set)
event_system.add_listener('get_games_list', self.get_games_list)
event_system.add_listener('add_new_exclusive_set', self.add_new_exclusive_set)
event_system.add_listener('save_changes', self.save_changes)
event_system.add_listener('export_database', self.export_database)
event_system.add_listener('assign_mark_to_form', self.assign_mark_to_form)
event_system.add_listener('get_mark_for_game', self.get_mark_for_game)
event_system.add_listener('get_mark_for_game_name', self.get_mark_for_game_name)
event_system.add_listener('get_mark_details', self.get_mark_details)
event_system.add_listener('get_evolve_encounters', self.get_evolve_encounters)
event_system.add_listener('get_game_by_id', self.get_game_by_id)
event_system.add_listener('get_pokemon_form_by_pfic', self.get_pokemon_form_by_pfic)
event_system.add_listener('find_default_form', self.find_default_form)
event_system.add_listener('get_shiftable_forms', self.get_shiftable_forms)
event_system.add_listener('assign_shiftable_form', self.assign_shiftable_form)
event_system.add_listener('get_event_encounters', self.get_event_encounters)
def init_database(self):
disk_conn = sqlite3.connect('pokemon_forms.db')
disk_cursor = disk_conn.cursor()
# Create tables in the file-based database
self.create_games_table(disk_cursor)
self.create_pokemon_forms_table(disk_cursor)
self.create_pokemon_storage_table(disk_cursor)
self.create_evolution_chains_table(disk_cursor)
self.create_exclusive_encounter_groups_table(disk_cursor)
self.create_encounter_exclusive_group_table(disk_cursor)
self.create_encounters_table(disk_cursor)
self.create_mark_table(disk_cursor)
self.create_form_marks_table(disk_cursor)
self.create_evolve_encounter_table(disk_cursor)
self.create_event_encounter_table(disk_cursor)
self.create_shiftable_forms_table(disk_cursor)
# Commit changes to the file-based database
disk_conn.commit()
# Copy the file-based database to the in-memory database
disk_conn.backup(self.conn)
temp_connections = []
for _ in range(self.connection_pool.qsize()):
conn = self.connection_pool.get()
disk_conn.backup(conn)
temp_connections.append(conn)
# Put all connections back into the pool
for conn in temp_connections:
self.connection_pool.put(conn)
# Close the file-based database connection
disk_conn.close()
def create_pokemon_forms_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS pokemon_forms (
PFIC TEXT PRIMARY KEY,
name TEXT NOT NULL,
form_name TEXT,
national_dex INTEGER NOT NULL,
generation INTEGER NOT NULL,
is_baby_form BOOLEAN
)
''')
def create_pokemon_storage_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS pokemon_storage (
PFIC TEXT PRIMARY KEY,
storable_in_home BOOLEAN NOT NULL,
FOREIGN KEY (PFIC) REFERENCES pokemon_forms (PFIC)
)
''')
def create_evolution_chains_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS evolution_chains (
from_pfic TEXT,
to_pfic TEXT,
method TEXT,
PRIMARY KEY (from_pfic, to_pfic),
FOREIGN KEY (from_pfic) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (to_pfic) REFERENCES pokemon_forms (PFIC)
)
''')
def create_exclusive_encounter_groups_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS exclusive_encounter_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_name TEXT NOT NULL,
description TEXT,
game_id INTEGER,
FOREIGN KEY (game_id) REFERENCES games (id)
)
''')
def create_encounter_exclusive_group_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS encounter_exclusive_groups (
encounter_id INTEGER,
group_id INTEGER,
FOREIGN KEY (encounter_id) REFERENCES encounters (id),
FOREIGN KEY (group_id) REFERENCES exclusive_encounter_groups (id),
PRIMARY KEY (encounter_id, group_id)
)
''')
def create_encounters_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS encounters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pfic TEXT,
game_id INTEGER,
location TEXT,
day TEXT,
time TEXT,
dual_slot TEXT,
static_encounter BOOLEAN,
static_encounter_count INTEGER,
extra_text TEXT,
stars TEXT,
fishing BOOLEAN,
rods TEXT,
starter BOOLEAN,
FOREIGN KEY (pfic) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (game_id) REFERENCES games (id)
)
''')
def create_mark_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS marks (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
icon_path TEXT NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS mark_game_associations (
mark_id INTEGER,
game_id INTEGER,
FOREIGN KEY (mark_id) REFERENCES marks (id),
FOREIGN KEY (game_id) REFERENCES games (id),
PRIMARY KEY (mark_id, game_id)
)
''')
marks = [
("Game Boy", "images/marks/GB_icon_HOME.png", ["Red", "Blue", "Yellow", "Gold", "Silver", "Crystal"]),
("Kalos", "images/marks/Blue_pentagon_HOME.png", ["X", "Y", "Omega Ruby", "Alpha Sapphire"]),
("Alola", "images/marks/Black_clover_HOME.png", ["Sun", "Moon", "Ultra Sun", "Ultra Moon"]),
("Let's Go", "images/marks/Let's_Go_icon_HOME.png", ["Let's Go Pikachu", "Let's Go Eevee"]),
("Galar", "images/marks/Galar_symbol_HOME.png", ["Sword", "Shield", "Expansion Pass"]),
("Sinnoh", "images/marks/BDSP_icon_HOME.png", ["Brilliant Diamond", "Shining Pearl"]),
("Hisui", "images/marks/Arceus_mark_HOME.png", ["Legends Arceus"]),
("Paldea", "images/marks/Paldea_icon_HOME.png", ["Scarlet", "Violet", "The Teal Mask", "The Hidden Treasure of Area Zero"]),
("Markless", "images/marks/Markless_icon_HOME.png", ["Ruby", "Sapphire", "Emerald", "FireRed", "LeafGreen", "Diamond", "Pearl", "Platinum", "HeartGold", "SoulSilver", "Black", "White", "Black 2", "White 2"]),
]
# Check if marks already exist
cursor.execute('SELECT COUNT(*) FROM marks')
marks_count = cursor.fetchone()[0]
for mark in marks:
mark_id = None
if marks_count == 0:
cursor.execute('''
INSERT OR IGNORE INTO marks (name, icon_path)
VALUES (?, ?)
''', (mark[0], mark[1]))
mark_id = cursor.lastrowid
if mark_id == None:
cursor.execute('''
SELECT id FROM marks WHERE name = ?
''', (mark[0],))
mark_id = cursor.fetchone()[0]
for game_name in mark[2]:
cursor.execute('''
INSERT OR IGNORE INTO mark_game_associations (mark_id, game_id)
SELECT ?, id FROM games WHERE name = ?
''', (mark_id, game_name))
def create_games_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS games (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
generation INTEGER NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS alternate_game_names (
id INTEGER PRIMARY KEY,
game_id INTEGER NOT NULL,
alternate_name TEXT NOT NULL,
FOREIGN KEY (game_id) REFERENCES games (id),
UNIQUE (alternate_name COLLATE NOCASE)
)
''')
games = [
("Red", 1, ["Red Version"]),
("Blue", 1, ["Blue Version"]),
("Yellow", 1, ["Yellow Version"]),
("Gold", 2, ["Gold Version"]),
("Silver", 2, ["Silver Version"]),
("Crystal", 2, ["Crystal Version"]),
("Ruby", 3, ["Ruby Version"]),
("Sapphire", 3, ["Sapphire Version"]),
("Emerald", 3, ["Emerald Version"]),
("FireRed", 3, ["Fire Red", "Fire-Red"]),
("LeafGreen", 3, ["Leaf Green", "Leaf-Green"]),
("Diamond", 4, ["Diamond Version"]),
("Pearl", 4, ["Pearl Version"]),
("Platinum", 4, ["Platinum Version"]),
("HeartGold", 4, ["Heart Gold", "Heart-Gold"]),
("SoulSilver", 4, ["Soul Silver", "Soul-Silver"]),
("Black", 5, ["Black Version"]),
("White", 5, ["White Version"]),
("Black 2", 5, ["Black Version 2", "Black-2"]),
("White 2", 5, ["White Version 2", "White-2"]),
("X", 6, ["X Version"]),
("Y", 6, ["Y Version"]),
("Omega Ruby", 6, ["Omega Ruby Version", "Omega-Ruby"]),
("Alpha Sapphire", 6, ["Alpha Sapphire Version", "Alpha-Sapphire"]),
("Sun", 7, ["Sun Version"]),
("Moon", 7, ["Moon Version"]),
("Ultra Sun", 7, ["Ultra Sun Version", "Ultra-Sun"]),
("Ultra Moon", 7, ["Ultra Moon Version", "Ultra-Moon"]),
("Let's Go Pikachu", 7, ["Let's Go, Pikachu!", "Lets Go Pikachu"]),
("Let's Go Eevee", 7, ["Let's Go, Eevee!", "Lets Go Eevee"]),
("Sword", 8, ["Sword Version"]),
("Shield", 8, ["Shield Version"]),
("Expansion Pass", 8, ["Expansion Pass (Sword)", "Expansion Pass (Shield)"]),
("Brilliant Diamond", 8, ["Brilliant Diamond Version", "Brilliant-Diamond"]),
("Shining Pearl", 8, ["Shining Pearl Version", "Shining-Pearl"]),
("Legends Arceus", 8, ["Legends: Arceus", "Legends-Arceus"]),
("Scarlet", 9, ["Scarlet Version"]),
("Violet", 9, ["Violet Version"]),
("The Teal Mask", 9, ["The Teal Mask Version", "The Teal Mask (Scarlet)", "The Teal Mask (Violet)"]),
("The Hidden Treasure of Area Zero", 9, ["The Hidden Treasure of Area Zero Version", "The Hidden Treasure of Area Zero (Scarlet)", "The Hidden Treasure of Area Zero (Violet)"]),
("Pokémon Home", 98, ["Pokémon HOME"]),
("Pokémon Go", 99, ["Pokémon GO"]),
]
# Check if marks already exist
cursor.execute('SELECT COUNT(*) FROM games')
games_count = cursor.fetchone()[0]
for game in games:
game_id = None
if games_count == 0:
cursor.execute('''
INSERT OR IGNORE INTO games (name, generation)
VALUES (?, ?)
''', (game[0], game[1]))
game_id = cursor.lastrowid
if game_id == None:
cursor.execute('''
SELECT id FROM games WHERE name = ?
''', (game[0],))
game_id = cursor.fetchone()[0]
# Insert alternate names
for alt_name in game[2]:
cursor.execute('''
INSERT OR IGNORE INTO alternate_game_names (game_id, alternate_name)
VALUES (?, ?)
''', (game_id, alt_name))
def create_form_marks_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS form_marks (
pfic TEXT,
mark_id INTEGER,
FOREIGN KEY (pfic) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (mark_id) REFERENCES marks (id),
PRIMARY KEY (pfic, mark_id)
)
''')
def create_evolve_encounter_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS evolve_encounters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pfic TEXT,
game_id INTEGER,
day TEXT,
time TEXT,
from_pfic TEXT,
FOREIGN KEY (pfic) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (game_id) REFERENCES games (id),
FOREIGN KEY (from_pfic) REFERENCES pokemon_forms (PFIC)
)
''')
def create_event_encounter_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS event_encounters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pfic TEXT,
game_id INTEGER,
day TEXT,
time TEXT,
FOREIGN KEY (pfic) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (game_id) REFERENCES games (id)
)
''')
def create_shiftable_forms_table(self, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS shiftable_forms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_pfic TEXT,
to_pfic TEXT,
FOREIGN KEY (from_pfic) REFERENCES pokemon_forms (PFIC),
FOREIGN KEY (to_pfic) REFERENCES pokemon_forms (PFIC),
UNIQUE (from_pfic, to_pfic)
)
''')
def load_pokemon_details(self, pfic):
self.cursor.execute('''
SELECT pf.name, pf.form_name, pf.national_dex, pf.generation, ps.storable_in_home, pf.is_baby_form
FROM pokemon_forms pf
LEFT JOIN pokemon_storage ps ON pf.PFIC = ps.PFIC
WHERE pf.PFIC = ?
''', (pfic,))
return self.cursor.fetchone()
def send_pokemon_list(self, data):
# Fetch pokemon list from database
self.cursor.execute('''
SELECT pf.PFIC, pf.name, pf.form_name, pf.national_dex
FROM pokemon_forms pf
''')
pokemon_data = self.cursor.fetchall()
# Sort the pokemon_data based on PFIC
pokemon_data.sort(key=lambda x: parse_pfic(x[0]))
return pokemon_data
def get_home_storable(self, pfic):
self.cursor.execute('SELECT storable_in_home FROM pokemon_storage WHERE PFIC = ?', (pfic,))
result = self.cursor.fetchone()
home_storable = result[0] if result else False
return home_storable
def get_evolution_chain(self, pfic):
def follow_chain(start_pfic, visited=None):
if visited is None:
visited = set()
chain = []
stack = [(start_pfic, None)]
while stack:
current_pfic, method = stack.pop()
if current_pfic in visited:
continue
visited.add(current_pfic)
self.cursor.execute('SELECT name, form_name FROM pokemon_forms WHERE PFIC = ?', (current_pfic,))
name, form_name = self.cursor.fetchone()
chain.append((current_pfic, name, form_name, method))
# Get previous evolutions
self.cursor.execute('SELECT from_pfic, method FROM evolution_chains WHERE to_pfic = ?', (current_pfic,))
prev_evolutions = self.cursor.fetchall()
for prev_pfic, prev_method in prev_evolutions:
if prev_pfic not in visited:
stack.append((prev_pfic, prev_method))
# Get next evolutions
self.cursor.execute('SELECT to_pfic, method FROM evolution_chains WHERE from_pfic = ?', (current_pfic,))
next_evolutions = self.cursor.fetchall()
for next_pfic, next_method in next_evolutions:
if next_pfic not in visited:
stack.append((next_pfic, next_method))
return chain
return follow_chain(pfic)
def get_evolution_parent(self, pfic):
self.cursor.execute('SELECT from_pfic FROM evolution_chains WHERE to_pfic = ?', (pfic,))
return self.cursor.fetchone()
def get_encounter_locations(self, pfic):
self.cursor.execute('''
SELECT g.name, e.location, e.day, e.time, e.dual_slot, e.static_encounter_count, e.static_encounter, e.extra_text, e.stars, e.rods, e.fishing
FROM encounters e
JOIN games g ON e.game_id = g.id
WHERE e.pfic = ?
ORDER BY g.name, e.location
''', (pfic,))
return self.cursor.fetchall()
def get_game_generation(self, game):
self.cursor.execute('''
SELECT g.generation
FROM games g
LEFT JOIN alternate_game_names agn ON g.id = agn.game_id
WHERE g.name = ? OR agn.alternate_name = ?
LIMIT 1
''', (game, game))
result = self.cursor.fetchone()
return result[0] if result else None
def get_game_id_for_name(self, game_name):
query = "SELECT id FROM games WHERE name = ?"
result = self.execute_query(query, (game_name,))
if result:
return result[0][0]
# If not found in main table, check alternate_game_names
query = """
SELECT g.id
FROM games g
JOIN alternate_game_names agn ON g.id = agn.game_id
WHERE agn.alternate_name = ?
"""
result = self.execute_query(query, (game_name,))
return result[0][0] if result else None
def get_pokemon_with_encounters(self, data):
self.cursor.execute('''
SELECT DISTINCT pfic
FROM encounters
''')
pokemon_with_encounters = set(row[0] for row in self.cursor.fetchall())
return pokemon_with_encounters
def refresh_in_memory_db(self, temp_conn):
temp_conn.backup(self.conn)
def clear_encounters_for_pokemon(self, pfic):
self.cursor.execute('DELETE FROM encounters WHERE pfic = ?', (pfic,))
self.conn.commit()
def get_games_list(self, data):
self.cursor.execute('SELECT id, name FROM games')
return self.cursor.fetchall()
def get_exclusive_encounter_sets(self, data):
self.cursor.execute('SELECT id, group_name FROM exclusive_encounter_groups')
return self.cursor.fetchall()
def get_exclusive_set_details(self, set_id):
self.cursor.execute('SELECT group_name, description, game_id FROM exclusive_encounter_groups WHERE id = ?', (set_id,))
return self.cursor.fetchone()
def get_set_encounters(self, set_id):
self.cursor.execute('''
SELECT e.id, e.pfic, e.location, pf.name, pf.form_name
FROM encounters e
JOIN encounter_exclusive_groups eeg ON e.id = eeg.encounter_id
JOIN pokemon_forms pf ON e.pfic = pf.PFIC
WHERE eeg.group_id = ?
''', (set_id,))
return self.cursor.fetchall()
def delete_encounter_from_set(self, set_id, encounter_id):
self.cursor.execute('DELETE FROM encounter_exclusive_groups WHERE group_id = ? AND encounter_id = ?', (set_id, encounter_id))
self.conn.commit()
def add_encounter_to_set(self, data):
set_id, encounter_id = data
self.cursor.execute('INSERT INTO encounter_exclusive_groups (group_id, encounter_id) VALUES (?, ?)', (set_id, encounter_id))
self.conn.commit()
def add_new_exclusive_set(self, data):
set_name, description, game_id = data
self.cursor.execute('INSERT INTO exclusive_encounter_groups (group_name, description, game_id) VALUES (?, ?, ?)', (set_name, description, game_id))
self.conn.commit()
def get_available_encounters(self, data):
game_id = data
self.cursor.execute('''
SELECT e.id, e.pfic, e.location, pf.name, pf.form_name
FROM encounters e
JOIN games g ON e.game_id = g.id
JOIN pokemon_forms pf ON e.pfic = pf.PFIC
WHERE g.id = ?
''', (game_id,))
return self.cursor.fetchall()
def save_changes(self, data):
disk_conn = sqlite3.connect('pokemon_forms.db')
self.conn.backup(disk_conn)
disk_conn.close()
def export_database(self, data):
export_conn = sqlite3.connect('pokemon_forms_production.db')
self.conn.backup(export_conn)
export_conn.close()
def assign_mark_to_form(self, data):
pfic, mark_id = data
self.cursor.execute('''
INSERT OR REPLACE INTO form_marks (pfic, mark_id)
VALUES (?, ?)
''', (pfic, mark_id))
self.conn.commit()
def get_mark_for_game(self, data):
game_id = data
self.cursor.execute('SELECT mark_id FROM mark_game_associations WHERE game_id = ?', (game_id,))
result = self.cursor.fetchone()
return result[0] if result else None
def get_mark_for_game_name(self, data):
game_name = data
self.cursor.execute('''
SELECT m.id
FROM marks m
JOIN mark_game_associations mga ON m.id = mga.mark_id
JOIN games g ON mga.game_id = g.id
WHERE g.name = ?
''', (game_name,))
result = self.cursor.fetchone()
return result[0] if result else None
def get_mark_details(self, data):
mark_id = data
self.cursor.execute('SELECT name, icon_path FROM marks WHERE id = ?', (mark_id,))
return self.cursor.fetchone()
def get_evolve_encounters(self, data):
pfic = data
self.cursor.execute('SELECT * FROM evolve_encounters WHERE pfic = ?', (pfic,))
evolve_encounters = self.cursor.fetchall()
return [EvolveEncounter(encounter[1], encounter[2], encounter[3], encounter[4], encounter[5]) for encounter in evolve_encounters]
def get_game_by_id(self, data):
game_id = data
self.cursor.execute('SELECT * FROM games WHERE id = ?', (game_id,))
result = self.cursor.fetchone()
return result
def get_pokemon_form_by_pfic(self, data) -> PokemonForm:
pfic = data
self.cursor.execute('''
SELECT pf.name, pf.form_name, pf.national_dex, pf.generation, ps.storable_in_home, pf.is_baby_form
FROM pokemon_forms pf
LEFT JOIN pokemon_storage ps ON pf.PFIC = ps.PFIC
WHERE pf.PFIC = ?
''', (pfic,))
result = self.cursor.fetchone()
return PokemonForm(pfic, result[0], result[1], result[2], result[3], result[5], result[4])
def find_default_form(self, data):
search_pfic = data
self.cursor.execute('''
SELECT pf.PFIC, pf.name, pf.form_name, pf.national_dex, pf.generation, ps.storable_in_home, pf.is_baby_form
FROM pokemon_forms pf
LEFT JOIN pokemon_storage ps ON pf.PFIC = ps.PFIC
WHERE pf.PFIC LIKE ?
''', (search_pfic + "%",))
results = self.cursor.fetchall()
return [PokemonForm(result[0], result[1], result[2], result[3], result[4], result[6], result[5]) for result in results]
def get_shiftable_forms(self, data):
pfic = data
self.cursor.execute('SELECT * FROM shiftable_forms WHERE from_pfic = ?', (pfic,))
return self.cursor.fetchall()
def assign_shiftable_form(self, data):
from_pfic, to_pfic = data
self.cursor.execute('''
INSERT OR IGNORE INTO shiftable_forms (from_pfic, to_pfic)
VALUES (?, ?)
''', (from_pfic, to_pfic))
self.conn.commit()
def get_event_encounters(self, data):
pfic = data
self.cursor.execute('SELECT * FROM event_encounters WHERE pfic = ?', (pfic,))
return self.cursor.fetchall()
def get_connection(self):
return self.connection_pool.get()
def release_connection(self, conn):
self.connection_pool.put(conn)
def execute_query_with_commit(self, query, params=None):
conn = self.get_connection()
try:
with conn:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
conn.commit()
return cursor.fetchall()
finally:
self.release_connection(conn)
def execute_query(self, query, params=None):
conn = self.get_connection()
try:
with conn:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
finally:
self.release_connection(conn)
def close(self):
while not self.connection_pool.empty():
conn = self.connection_pool.get()
conn.close()
self.executor.shutdown()
def build_query_string(self, table_name, criteria):
conditions = []
values = []
query = f"SELECT * FROM {table_name} WHERE "
for column, value in criteria.items():
if value is None:
conditions.append(f"({column} IS NULL OR {column} = '')")
elif value == "":
conditions.append(f"{column} = ''")
else:
conditions.append(f"{column} = ?")
values.append(value)
return query + " AND ".join(conditions), values