11 changed files with 529 additions and 237 deletions
@ -0,0 +1,38 @@ |
|||
from typing import List, Set |
|||
import networkx as nx |
|||
|
|||
class GameWorld: |
|||
def __init__(self, graph: nx.Graph, start: str, end: str, goals: Set[str], must_visit: Set[str], initial_conditions: frozenset, towns_and_cities: Set[str]): |
|||
self.graph = graph |
|||
self.start = start |
|||
self.end = end |
|||
self.goals = goals |
|||
self.must_visit = must_visit |
|||
self.initial_conditions = initial_conditions |
|||
self.towns_and_cities = towns_and_cities |
|||
|
|||
class RouteStage: |
|||
def __init__(self, name: str, graph: nx.Graph, start: str, end: str, goals: Set[str], must_visit: Set[str], towns_and_cities: Set[str]): |
|||
self.name: str = name |
|||
self.graph: nx.Graph = graph # A function or class that returns a subgraph for this stage |
|||
self.start: str = start |
|||
self.end: str = end |
|||
self.goals: Set[str] = goals |
|||
self.must_visit: Set[str] = must_visit |
|||
self.towns_and_cities: Set[str] = towns_and_cities |
|||
|
|||
def create_world(self, previous_conditions=None): |
|||
# Combine initial_conditions with previous_conditions if needed |
|||
# previous_conditions are those acquired from the last stage. |
|||
#final_goals = self.goals.union(previous_conditions.get('badges', set())) if previous_conditions else self.goals |
|||
# Construct a GameWorld for this stage |
|||
world = GameWorld( |
|||
graph=self.graph, |
|||
start=self.start, |
|||
end=self.end, |
|||
goals=self.goals, |
|||
must_visit=self.must_visit, |
|||
initial_conditions=previous_conditions.get('conditions', frozenset()) if previous_conditions else frozenset(), |
|||
towns_and_cities = self.towns_and_cities |
|||
) |
|||
return world |
|||
@ -1,154 +1,20 @@ |
|||
import heapq |
|||
import networkx as nx |
|||
from typing import List, Set |
|||
import networkx as nx |
|||
|
|||
FLY_OUT_OF_BATTLE = 'Fly out of battle' |
|||
|
|||
class State: |
|||
def __init__(self, location, conditions, cost, path, visited_required_nodes): |
|||
self.location = location |
|||
self.conditions = conditions # A frozenset of conditions |
|||
self.cost = cost |
|||
self.path = path # List of locations visited in order |
|||
self.visited_required_nodes = visited_required_nodes # A frozenset of required nodes visited |
|||
|
|||
def __lt__(self, other): |
|||
return self.cost < other.cost # For priority queue |
|||
|
|||
def heuristic(state, goal_conditions, required_nodes): |
|||
# Since we don't have actual distances, we can use the number of badges remaining as the heuristic |
|||
remaining_conditions = goal_conditions - state.conditions |
|||
remaining_nodes = required_nodes - state.visited_required_nodes |
|||
return len(remaining_conditions) + len(remaining_nodes) |
|||
|
|||
def is_goal_state(state, goal_location, goals, required_nodes): |
|||
return ( |
|||
state.location == goal_location and |
|||
goals.issubset(state.conditions) and |
|||
required_nodes.issubset(state.visited_required_nodes) |
|||
) |
|||
from routes.game_world import RouteStage |
|||
|
|||
class PokemonGameDesc: |
|||
def __init__(self): |
|||
self.game_name: str = "" |
|||
self.towns_and_cities: Set[str] = set() |
|||
self.badges: Set[str] = set() |
|||
self.items: Set[str] = set() |
|||
self.hms: Set[str] = set() |
|||
self.starting_town: str |
|||
self.end_goal: str |
|||
self.flying_badge: str |
|||
self.additional_goals: List[str] = [] |
|||
self.one_way_routes: List[str] = [] |
|||
self.must_visit: Set[str] = set() |
|||
self.graph: nx.Graph = nx.Graph() |
|||
self.games_covered: List[str] = [] |
|||
self.file_name: str = "" |
|||
|
|||
def astar_search(self): |
|||
from collections import deque |
|||
|
|||
self.goals = set(self.badges + self.additional_goals) |
|||
|
|||
# Priority queue for open states |
|||
open_list = [] |
|||
heapq.heappush(open_list, (0, State( |
|||
location=self.starting_town, |
|||
conditions=frozenset(), # Start with no conditions |
|||
cost=0, |
|||
path=[self.starting_town], |
|||
visited_required_nodes=frozenset([self.starting_town]) if self.starting_town in self.must_visit else frozenset() |
|||
))) |
|||
|
|||
# Closed set to keep track of visited states |
|||
closed_set = {} |
|||
|
|||
while open_list: |
|||
_, current_state = heapq.heappop(open_list) |
|||
|
|||
# Check if we've reached the goal location with all required conditions |
|||
if is_goal_state(current_state, self.end_goal, self.goals, self.must_visit): |
|||
return current_state.path, current_state.cost, current_state.conditions |
|||
|
|||
# Check if we've already visited this state with equal or better conditions |
|||
state_key = (current_state.location, current_state.conditions, current_state.visited_required_nodes) |
|||
if state_key in closed_set and closed_set[state_key] <= current_state.cost: |
|||
continue # Skip this state |
|||
|
|||
closed_set[state_key] = current_state.cost |
|||
|
|||
# Expand neighbors via normal moves |
|||
for neighbor in self.graph.neighbors(current_state.location): |
|||
edge_data = self.graph.get_edge_data(current_state.location, neighbor) |
|||
edge_condition = edge_data.get('condition', []) |
|||
|
|||
if edge_condition is None: |
|||
edge_requires = set() |
|||
else: |
|||
edge_requires = set(edge_condition) |
|||
|
|||
# Check if we have the required conditions to traverse this edge |
|||
if not edge_requires.issubset(current_state.conditions): |
|||
continue # Can't traverse this edge |
|||
|
|||
# Update conditions based on grants at the neighbor node |
|||
neighbor_data = self.graph.nodes[neighbor] |
|||
new_conditions = set(current_state.conditions) |
|||
|
|||
# Check if the neighbor grants any conditions |
|||
grants = neighbor_data.get('grants_conditions', []) |
|||
for grant in grants: |
|||
required_for_grant = set(grant.get('required_conditions', [])) |
|||
if required_for_grant.issubset(new_conditions): |
|||
# We can acquire the condition |
|||
new_conditions.add(grant['condition']) |
|||
|
|||
# Update visited required nodes |
|||
new_visited_required_nodes = set(current_state.visited_required_nodes) |
|||
if neighbor in self.must_visit: |
|||
new_visited_required_nodes.add(neighbor) |
|||
|
|||
new_state = State( |
|||
location=neighbor, |
|||
conditions=frozenset(new_conditions), |
|||
cost=current_state.cost + 1, # Assuming uniform cost; adjust if needed |
|||
path=current_state.path + [neighbor], |
|||
visited_required_nodes=frozenset(new_visited_required_nodes) |
|||
) |
|||
|
|||
estimated_total_cost = new_state.cost + heuristic(new_state, self.goals, self.must_visit) |
|||
|
|||
heapq.heappush(open_list, (estimated_total_cost, new_state)) |
|||
|
|||
# Expand neighbors via FLY if applicable |
|||
if FLY_OUT_OF_BATTLE in current_state.conditions and current_state.location in self.towns_and_cities: |
|||
for fly_target in self.towns_and_cities: |
|||
if fly_target != current_state.location and fly_target in current_state.path: |
|||
# You can fly to this location |
|||
new_conditions = set(current_state.conditions) |
|||
neighbor_data = self.graph.nodes[fly_target] |
|||
grants = neighbor_data.get('grants_conditions', []) |
|||
for grant in grants: |
|||
required_for_grant = set(grant.get('required_conditions', [])) |
|||
if required_for_grant.issubset(new_conditions): |
|||
new_conditions.add(grant['condition']) |
|||
|
|||
# Update visited required nodes |
|||
new_visited_required_nodes = set(current_state.visited_required_nodes) |
|||
if fly_target in self.must_visit: |
|||
new_visited_required_nodes.add(fly_target) |
|||
|
|||
fly_state = State( |
|||
location=fly_target, |
|||
conditions=frozenset(new_conditions), |
|||
cost=current_state.cost + 1, # Adjust cost if flying is different |
|||
path=current_state.path + [fly_target], |
|||
visited_required_nodes=frozenset(new_visited_required_nodes) |
|||
) |
|||
estimated_total_cost = fly_state.cost + heuristic(fly_state, self.goals, self.must_visit) |
|||
heapq.heappush(open_list, (estimated_total_cost, fly_state)) |
|||
|
|||
return None # No path found |
|||
|
|||
self.stages: List[RouteStage] = [] |
|||
|
|||
__all__ = ["PokemonGameDesc"] |
|||
|
|||
@ -0,0 +1,37 @@ |
|||
import json |
|||
from typing import Set |
|||
|
|||
def determine_must_visit_locations(plan_json: dict, db_conn, game_name: str) -> Set[str]: |
|||
# Extract needed Pokemon, find their encounter locations via db, return that set. |
|||
# No route logic here, just data derivation. |
|||
|
|||
target = None |
|||
for game in plan_json: |
|||
if game["game_name"] == game_name: |
|||
target = game |
|||
break |
|||
|
|||
if not target: |
|||
return set() |
|||
|
|||
# gather up all the pokemon needed for say crystal and find all the encounter routes/locations |
|||
needed_locations = set() |
|||
|
|||
game_info = db_conn.get_game_id_by_name(game_name) |
|||
|
|||
for key in target["pokemon"]: |
|||
catch_stats = target["pokemon"][key] |
|||
rep = catch_stats["representative"] |
|||
rand = db_conn.get_encounters(rep, "random") |
|||
static = db_conn.get_encounters(rep, "static") |
|||
encounters = [] |
|||
encounters.extend(rand) |
|||
encounters.extend(static) |
|||
|
|||
for encounter in encounters: |
|||
if encounter["game_id"] != game_info["id"]: |
|||
continue |
|||
|
|||
encounter_data = json.loads(encounter["data"]) |
|||
needed_locations.add(encounter_data["location"].replace("*", "")) |
|||
return needed_locations |
|||
@ -0,0 +1,184 @@ |
|||
|
|||
from typing import List, Set |
|||
from routes.game_world import GameWorld |
|||
import heapq |
|||
import networkx as nx |
|||
|
|||
class RoutePlan: |
|||
def __init__(self, path: List[str], cost: int, conditions: Set[str]): |
|||
self.path = path |
|||
self.cost = cost |
|||
self.conditions = conditions |
|||
|
|||
FLY_OUT_OF_BATTLE = 'Fly out of battle' |
|||
|
|||
class State: |
|||
def __init__(self, location, conditions, cost, path, visited_required_nodes): |
|||
self.location = location |
|||
self.conditions = conditions # A frozenset of conditions |
|||
self.cost = cost |
|||
self.path = path # List of locations visited in order |
|||
self.visited_required_nodes = visited_required_nodes # A frozenset of required nodes visited |
|||
|
|||
def __lt__(self, other): |
|||
return self.cost < other.cost # For priority queue |
|||
|
|||
class RoutePlanner: |
|||
def __init__(self, world: GameWorld): |
|||
self.world: GameWorld = world |
|||
|
|||
def heuristic(self, state, goal_conditions, required_nodes): |
|||
# Since we don't have actual distances, we can use the number of badges remaining as the heuristic |
|||
remaining_conditions = goal_conditions - state.conditions |
|||
remaining_nodes = required_nodes - state.visited_required_nodes |
|||
return len(remaining_conditions) + len(remaining_nodes) |
|||
|
|||
def heuristic2(self, state, goal_conditions, end_goal, required_nodes, distances): |
|||
remaining_conditions = goal_conditions - state.conditions |
|||
remaining_nodes = required_nodes - state.visited_required_nodes |
|||
|
|||
# Find the shortest distance from current_state.location to any required node + eventually to the goal |
|||
# As a simple first step: take the minimum distance from the current node to any required node or the goal. |
|||
node_candidates = list(remaining_nodes) + [end_goal] |
|||
min_dist = float('inf') |
|||
for candidate in node_candidates: |
|||
d = distances.get((state.location, candidate), float('inf')) |
|||
if d < min_dist: |
|||
min_dist = d |
|||
|
|||
# If no must-visit nodes remain, just consider distance to the goal |
|||
if not remaining_nodes: |
|||
min_dist = distances.get((state.location, end_goal), float('inf')) |
|||
|
|||
# Combine with remaining conditions count as before |
|||
return len(remaining_conditions) + len(remaining_nodes) + (min_dist if min_dist != float('inf') else 0) |
|||
|
|||
|
|||
def is_goal_state(self, state, goal_location, goals, required_nodes): |
|||
return ( |
|||
state.location == goal_location and |
|||
goals.issubset(state.conditions) and |
|||
required_nodes.issubset(state.visited_required_nodes) |
|||
) |
|||
|
|||
def compute_shortest_path(self, graph, key_nodes): |
|||
distances = {} # distances[(u,v)] = shortest distance from u to v ignoring conditions |
|||
|
|||
for node in key_nodes: |
|||
dist_from_node = nx.single_source_shortest_path_length(graph, node) |
|||
for other in key_nodes: |
|||
distances[(node, other)] = dist_from_node.get(other, float('inf')) |
|||
|
|||
return distances |
|||
|
|||
def astar_search(self) -> RoutePlan: |
|||
from collections import deque |
|||
|
|||
self.goals = set(self.world.goals) |
|||
|
|||
key_nodes = [self.world.start, self.world.end] + list(self.world.towns_and_cities) |
|||
if len(self.world.must_visit) > 0: |
|||
key_nodes += list(self.world.must_visit) |
|||
distances = self.compute_shortest_path(self.world.graph, key_nodes) |
|||
|
|||
# Priority queue for open states |
|||
open_list = [] |
|||
heapq.heappush(open_list, (0, State( |
|||
location=self.world.start, |
|||
conditions=self.world.initial_conditions, # Start with no conditions |
|||
cost=0, |
|||
path=[self.world.start], |
|||
visited_required_nodes=frozenset([self.world.start]) if self.world.start in self.world.must_visit else frozenset() |
|||
))) |
|||
|
|||
# Closed set to keep track of visited states |
|||
closed_set = {} |
|||
|
|||
while open_list: |
|||
_, current_state = heapq.heappop(open_list) |
|||
|
|||
# Check if we've reached the goal location with all required conditions |
|||
if self.is_goal_state(current_state, self.world.end, self.goals, self.world.must_visit): |
|||
return RoutePlan(current_state.path, current_state.cost, current_state.conditions) |
|||
|
|||
# Check if we've already visited this state with equal or better conditions |
|||
state_key = (current_state.location, current_state.conditions, current_state.visited_required_nodes) |
|||
if state_key in closed_set and closed_set[state_key] <= current_state.cost: |
|||
continue # Skip this state |
|||
|
|||
closed_set[state_key] = current_state.cost |
|||
|
|||
# Expand neighbors via normal moves |
|||
for neighbor in self.world.graph.neighbors(current_state.location): |
|||
edge_data = self.world.graph.get_edge_data(current_state.location, neighbor) |
|||
edge_condition = edge_data.get('condition', []) |
|||
|
|||
if edge_condition is None: |
|||
edge_requires = set() |
|||
else: |
|||
edge_requires = set(edge_condition) |
|||
|
|||
# Check if we have the required conditions to traverse this edge |
|||
if not edge_requires.issubset(current_state.conditions): |
|||
continue # Can't traverse this edge |
|||
|
|||
# Update conditions based on grants at the neighbor node |
|||
neighbor_data = self.world.graph.nodes[neighbor] |
|||
new_conditions = set(current_state.conditions) |
|||
|
|||
# Check if the neighbor grants any conditions |
|||
grants = neighbor_data.get('grants_conditions', []) |
|||
for grant in grants: |
|||
required_for_grant = set(grant.get('required_conditions', [])) |
|||
if required_for_grant.issubset(new_conditions): |
|||
# We can acquire the condition |
|||
new_conditions.add(grant['condition']) |
|||
|
|||
# Update visited required nodes |
|||
new_visited_required_nodes = set(current_state.visited_required_nodes) |
|||
if neighbor in self.world.must_visit: |
|||
new_visited_required_nodes.add(neighbor) |
|||
|
|||
new_state = State( |
|||
location=neighbor, |
|||
conditions=frozenset(new_conditions), |
|||
cost=current_state.cost + 1, # Assuming uniform cost; adjust if needed |
|||
path=current_state.path + [neighbor], |
|||
visited_required_nodes=frozenset(new_visited_required_nodes) |
|||
) |
|||
|
|||
#estimated_total_cost = new_state.cost + self.heuristic(new_state, self.goals, self.world.must_visit) |
|||
estimated_total_cost = new_state.cost + self.heuristic2(new_state, self.goals, self.world.end, self.world.must_visit, distances) |
|||
|
|||
heapq.heappush(open_list, (estimated_total_cost, new_state)) |
|||
|
|||
# Expand neighbors via FLY if applicable |
|||
if FLY_OUT_OF_BATTLE in current_state.conditions and current_state.location in self.world.towns_and_cities: |
|||
for fly_target in self.world.towns_and_cities: |
|||
if fly_target != current_state.location and fly_target in current_state.path: |
|||
# You can fly to this location |
|||
new_conditions = set(current_state.conditions) |
|||
neighbor_data = self.world.graph.nodes[fly_target] |
|||
grants = neighbor_data.get('grants_conditions', []) |
|||
for grant in grants: |
|||
required_for_grant = set(grant.get('required_conditions', [])) |
|||
if required_for_grant.issubset(new_conditions): |
|||
new_conditions.add(grant['condition']) |
|||
|
|||
# Update visited required nodes |
|||
new_visited_required_nodes = set(current_state.visited_required_nodes) |
|||
if fly_target in self.world.must_visit: |
|||
new_visited_required_nodes.add(fly_target) |
|||
|
|||
fly_state = State( |
|||
location=fly_target, |
|||
conditions=frozenset(new_conditions), |
|||
cost=current_state.cost + 1, # Adjust cost if flying is different |
|||
path=current_state.path + [fly_target], |
|||
visited_required_nodes=frozenset(new_visited_required_nodes) |
|||
) |
|||
#estimated_total_cost = fly_state.cost + self.heuristic(fly_state, self.goals, self.world.must_visit) |
|||
estimated_total_cost = fly_state.cost + self.heuristic2(fly_state, self.goals, self.world.end, self.world.must_visit, distances) |
|||
heapq.heappush(open_list, (estimated_total_cost, fly_state)) |
|||
|
|||
return None # No path found |
|||
@ -0,0 +1,78 @@ |
|||
import json |
|||
from PyQt6.QtCore import QObject, pyqtSignal, QRunnable |
|||
|
|||
from db import db |
|||
from routes.game_world import GameWorld |
|||
from routes.pokemon_game_desc import PokemonGameDesc |
|||
from routes.requirements import determine_must_visit_locations |
|||
from routes.route_planner import RoutePlanner |
|||
|
|||
class SolveRouteWorkerSignals(QObject): |
|||
finished = pyqtSignal(list) |
|||
|
|||
class SolveRouteWorker(QRunnable): |
|||
def __init__(self, initial_data: PokemonGameDesc): |
|||
super().__init__() |
|||
self.signals = SolveRouteWorkerSignals() |
|||
self.initial_data = initial_data |
|||
|
|||
def run(self): |
|||
try: |
|||
stage_results = [] |
|||
current_conditions = { |
|||
'badges': set(), |
|||
'conditions': frozenset() |
|||
} |
|||
|
|||
for i, stage in enumerate(self.initial_data.stages): |
|||
if i == 0: |
|||
world = stage.create_world() |
|||
else: |
|||
prev_result = stage_results[-1] |
|||
current_conditions['conditions'] = prev_result.conditions |
|||
#current_conditions['badges'] = prev_result.conditions.intersection(all_badges_set) # if badges are a subset of conditions |
|||
stage.start = prev_result.path[-1] # end of last path is start of this stage |
|||
world = stage.create_world(previous_conditions=current_conditions) |
|||
|
|||
# First route calculation |
|||
planner = RoutePlanner(world) |
|||
initial_plan = planner.astar_search() |
|||
|
|||
# Load plan.json and determine if more visits are needed |
|||
with open("./temp/plan.json", 'r', encoding='utf-8') as f: |
|||
plan_json = json.load(f) |
|||
|
|||
needed_locations = determine_must_visit_locations(plan_json, db, self.initial_data.game_name) |
|||
missing_locations = needed_locations - set(initial_plan.path) |
|||
|
|||
# compare those to all the ones we visit in the generated route |
|||
|
|||
node_list = list(world.graph.nodes()) |
|||
|
|||
# add missing ones to the has_visited list and re-gen the problem, then run it again |
|||
nodes_to_visit = [] |
|||
for loc in missing_locations: |
|||
if loc in node_list: |
|||
nodes_to_visit.append(loc) |
|||
|
|||
print("Additional Nodes to Visit:", nodes_to_visit) |
|||
|
|||
result = None |
|||
if nodes_to_visit: |
|||
stage.must_visit = stage.must_visit.union(nodes_to_visit) |
|||
re_visisted = stage.create_world() |
|||
planner = RoutePlanner(re_visisted) |
|||
distances = planner.compute_shortest_path(re_visisted.graph, stage.must_visit) |
|||
result = planner.astar_search() |
|||
else: |
|||
result = initial_plan |
|||
|
|||
if result: |
|||
print(result.path) |
|||
|
|||
stage_results.append(result) |
|||
|
|||
self.signals.finished.emit(stage_results) |
|||
|
|||
except Exception as e: |
|||
print(f"Error: {e}") |
|||
Loading…
Reference in new issue