import json import random import aiosqlite import asyncio from typing import Dict, List, Optional from .database import Database from .battle_engine import BattleEngine class GameEngine: def __init__(self, database: Database): self.database = database self.battle_engine = BattleEngine(database) self.pet_species = {} self.locations = {} self.moves = {} self.type_chart = {} self.weather_patterns = {} self.weather_task = None self.shutdown_event = asyncio.Event() async def load_game_data(self): await self.load_pet_species() await self.load_locations() await self.load_moves() await self.load_type_chart() await self.load_achievements() await self.database.initialize_items() await self.database.initialize_gyms() await self.init_weather_system() await self.battle_engine.load_battle_data() async def load_pet_species(self): try: with open("config/pets.json", "r") as f: species_data = json.load(f) async with aiosqlite.connect(self.database.db_path) as db: for species in species_data: await db.execute(""" INSERT OR IGNORE INTO pet_species (name, type1, type2, base_hp, base_attack, base_defense, base_speed, evolution_level, rarity) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( species["name"], species["type1"], species.get("type2"), species["base_hp"], species["base_attack"], species["base_defense"], species["base_speed"], species.get("evolution_level"), species.get("rarity", 1) )) await db.commit() except FileNotFoundError: await self.create_default_species() async def create_default_species(self): default_species = [ { "name": "Flamey", "type1": "Fire", "base_hp": 45, "base_attack": 52, "base_defense": 43, "base_speed": 65, "rarity": 1 }, { "name": "Aqua", "type1": "Water", "base_hp": 44, "base_attack": 48, "base_defense": 65, "base_speed": 43, "rarity": 1 }, { "name": "Leafy", "type1": "Grass", "base_hp": 45, "base_attack": 49, "base_defense": 49, "base_speed": 45, "rarity": 1 }, { "name": "Sparky", "type1": "Electric", "base_hp": 35, "base_attack": 55, "base_defense": 40, "base_speed": 90, "rarity": 2 }, { "name": "Rocky", "type1": "Rock", "base_hp": 40, "base_attack": 80, "base_defense": 100, "base_speed": 25, "rarity": 2 } ] async with aiosqlite.connect(self.database.db_path) as db: for species in default_species: await db.execute(""" INSERT OR IGNORE INTO pet_species (name, type1, type2, base_hp, base_attack, base_defense, base_speed, rarity) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( species["name"], species["type1"], species.get("type2"), species["base_hp"], species["base_attack"], species["base_defense"], species["base_speed"], species["rarity"] )) await db.commit() async def load_locations(self): try: with open("config/locations.json", "r") as f: locations_data = json.load(f) except FileNotFoundError: locations_data = self.get_default_locations() async with aiosqlite.connect(self.database.db_path) as db: for location in locations_data: await db.execute(""" INSERT OR IGNORE INTO locations (name, description, level_min, level_max) VALUES (?, ?, ?, ?) """, (location["name"], location["description"], location["level_min"], location["level_max"])) cursor = await db.execute( "SELECT id FROM locations WHERE name = ?", (location["name"],) ) location_row = await cursor.fetchone() location_id = location_row[0] for spawn in location["spawns"]: species_cursor = await db.execute( "SELECT id FROM pet_species WHERE name = ?", (spawn["species"],) ) species_id = await species_cursor.fetchone() if species_id: await db.execute(""" INSERT OR IGNORE INTO location_spawns (location_id, species_id, spawn_rate, min_level, max_level) VALUES (?, ?, ?, ?, ?) """, (location_id, species_id[0], spawn["spawn_rate"], spawn["min_level"], spawn["max_level"])) await db.commit() def get_default_locations(self): return [ { "name": "Starter Woods", "description": "A peaceful forest perfect for beginners", "level_min": 1, "level_max": 5, "spawns": [ {"species": "Leafy", "spawn_rate": 0.4, "min_level": 1, "max_level": 3}, {"species": "Flamey", "spawn_rate": 0.3, "min_level": 1, "max_level": 3}, {"species": "Aqua", "spawn_rate": 0.3, "min_level": 1, "max_level": 3} ] }, { "name": "Electric Canyon", "description": "A charged valley crackling with energy", "level_min": 3, "level_max": 8, "spawns": [ {"species": "Sparky", "spawn_rate": 0.6, "min_level": 3, "max_level": 6}, {"species": "Rocky", "spawn_rate": 0.4, "min_level": 4, "max_level": 7} ] } ] async def load_moves(self): pass async def load_type_chart(self): pass async def give_starter_pet(self, player_id: int) -> Dict: starters = ["Flamey", "Aqua", "Leafy"] chosen_starter = random.choice(starters) async with aiosqlite.connect(self.database.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM pet_species WHERE name = ?", (chosen_starter,) ) species = await cursor.fetchone() pet_data = self.generate_pet_stats(dict(species), level=5) cursor = await db.execute(""" INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp, attack, defense, speed, is_active) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (player_id, species["id"], pet_data["level"], 0, pet_data["hp"], pet_data["hp"], pet_data["attack"], pet_data["defense"], pet_data["speed"], True)) await db.commit() return {"species_name": chosen_starter, **pet_data} def generate_pet_stats(self, species: Dict, level: int = 1) -> Dict: iv_bonus = random.randint(0, 31) hp = int((2 * species["base_hp"] + iv_bonus) * level / 100) + level + 10 attack = int((2 * species["base_attack"] + iv_bonus) * level / 100) + 5 defense = int((2 * species["base_defense"] + iv_bonus) * level / 100) + 5 speed = int((2 * species["base_speed"] + iv_bonus) * level / 100) + 5 return { "level": level, "hp": hp, "attack": attack, "defense": defense, "speed": speed } async def attempt_catch(self, player_id: int, location_name: str) -> str: async with aiosqlite.connect(self.database.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM locations WHERE name = ?", (location_name,) ) location = await cursor.fetchone() if not location: return f"Location '{location_name}' not found!" cursor = await db.execute(""" SELECT ls.*, ps.name as species_name, ps.* FROM location_spawns ls JOIN pet_species ps ON ls.species_id = ps.id WHERE ls.location_id = ? """, (location["id"],)) spawns = await cursor.fetchall() if not spawns: return f"No pets found in {location_name}!" if random.random() > 0.7: return f"You searched {location_name} but found nothing..." chosen_spawn = random.choices( spawns, weights=[spawn["spawn_rate"] for spawn in spawns] )[0] pet_level = random.randint(chosen_spawn["min_level"], chosen_spawn["max_level"]) pet_stats = self.generate_pet_stats(dict(chosen_spawn), pet_level) catch_rate = 0.5 + (0.3 / chosen_spawn["rarity"]) if random.random() < catch_rate: cursor = await db.execute(""" INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp, attack, defense, speed, is_active) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (player_id, chosen_spawn["species_id"], pet_level, 0, pet_stats["hp"], pet_stats["hp"], pet_stats["attack"], pet_stats["defense"], pet_stats["speed"], False)) await db.commit() return f"Caught a level {pet_level} {chosen_spawn['species_name']}!" else: return f"A wild {chosen_spawn['species_name']} appeared but escaped!" async def get_location_spawns(self, location_name: str) -> List[Dict]: async with aiosqlite.connect(self.database.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM locations WHERE name = ?", (location_name,) ) location = await cursor.fetchone() if not location: return [] cursor = await db.execute(""" SELECT ps.name, ps.type1, ps.type2, ls.spawn_rate FROM location_spawns ls JOIN pet_species ps ON ls.species_id = ps.id WHERE ls.location_id = ? """, (location["id"],)) spawns = await cursor.fetchall() return [dict(spawn) for spawn in spawns] async def explore_location(self, player_id: int) -> Dict: """Explore the player's current location and return encounter results""" async with aiosqlite.connect(self.database.db_path) as db: db.row_factory = aiosqlite.Row # Get player's current location location = await self.database.get_player_location(player_id) if not location: return {"type": "error", "message": "You are not in a valid location!"} # Check for item discovery first (30% chance) item_result = await self.check_item_discovery(player_id, location) if item_result: return item_result # Get spawns for current location cursor = await db.execute(""" SELECT ls.*, ps.name as species_name, ps.* FROM location_spawns ls JOIN pet_species ps ON ls.species_id = ps.id WHERE ls.location_id = ? """, (location["id"],)) spawns = await cursor.fetchall() if not spawns: return {"type": "empty", "message": f"You explore {location['name']} but find nothing interesting..."} # Apply weather modifiers to spawns modified_spawns = await self.get_weather_modified_spawns(location["id"], spawns) # Random encounter chance (50% chance of finding a pet after item check) if random.random() > 0.5: return {"type": "empty", "message": f"You explore {location['name']} but find nothing this time..."} # Choose random spawn with weather-modified rates chosen_spawn = random.choices( modified_spawns, weights=[spawn["spawn_rate"] for spawn in modified_spawns] )[0] pet_level = random.randint(chosen_spawn["min_level"], chosen_spawn["max_level"]) pet_stats = self.generate_pet_stats(dict(chosen_spawn), pet_level) return { "type": "encounter", "location": location["name"], "pet": { "species_id": chosen_spawn["species_id"], "species_name": chosen_spawn["species_name"], "level": pet_level, "type1": chosen_spawn["type1"], "type2": chosen_spawn["type2"], "stats": pet_stats, # Additional battle-ready stats "hp": pet_stats["hp"], "max_hp": pet_stats["hp"], "attack": pet_stats["attack"], "defense": pet_stats["defense"], "speed": pet_stats["speed"] } } async def check_item_discovery(self, player_id: int, location: Dict) -> Optional[Dict]: """Check if player finds an item during exploration""" import json # Load items config try: with open("config/items.json", "r") as f: items_data = json.load(f) except FileNotFoundError: return None # Get all possible items for this location available_items = [] location_name = location["name"].lower().replace(" ", "_") for category_items in items_data.values(): if isinstance(category_items, list): for item in category_items: if "locations" in item: item_locations = item["locations"] if "all" in item_locations or location_name in item_locations: available_items.append(item) if not available_items: return None # Calculate total spawn rates for this location total_rate = sum(item.get("spawn_rate", 0.1) for item in available_items) # 30% base chance of finding an item if random.random() > 0.3: return None # Choose item based on spawn rates chosen_item = random.choices( available_items, weights=[item.get("spawn_rate", 0.1) for item in available_items] )[0] # Add item to player's inventory success = await self.database.add_item_to_inventory(player_id, chosen_item["name"]) if success: # Get rarity info for display rarity_info = items_data.get("rarity_info", {}).get(chosen_item["rarity"], {}) symbol = rarity_info.get("symbol", "○") return { "type": "item_found", "location": location["name"], "item": chosen_item, "symbol": symbol, "message": f"🎒 You found a {symbol} {chosen_item['name']} ({chosen_item['rarity']})! {chosen_item['description']}" } return None async def attempt_catch_current_location(self, player_id: int, target_pet: Dict) -> str: """Attempt to catch a pet during exploration""" catch_rate = 0.5 + (0.3 / target_pet.get("rarity", 1)) if random.random() < catch_rate: # Successfully caught the pet async with aiosqlite.connect(self.database.db_path) as db: cursor = await db.execute(""" INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp, attack, defense, speed, is_active) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (player_id, target_pet["species_id"], target_pet["level"], 0, target_pet["stats"]["hp"], target_pet["stats"]["hp"], target_pet["stats"]["attack"], target_pet["stats"]["defense"], target_pet["stats"]["speed"], False)) await db.commit() return f"Success! You caught the Level {target_pet['level']} {target_pet['species_name']}!" else: return f"The wild {target_pet['species_name']} escaped!" async def load_achievements(self): """Load achievements from config and populate database""" try: with open("config/achievements.json", "r") as f: achievements_data = json.load(f) async with aiosqlite.connect(self.database.db_path) as db: for achievement in achievements_data: # Get location ID if specified location_id = None if achievement.get("unlock_location"): cursor = await db.execute( "SELECT id FROM locations WHERE name = ?", (achievement["unlock_location"],) ) location_row = await cursor.fetchone() if location_row: location_id = location_row[0] # Insert or update achievement await db.execute(""" INSERT OR REPLACE INTO achievements (name, description, requirement_type, requirement_data, unlock_location_id) VALUES (?, ?, ?, ?, ?) """, ( achievement["name"], achievement["description"], achievement["requirement_type"], achievement["requirement_data"], location_id )) await db.commit() except FileNotFoundError: print("No achievements.json found, skipping achievement loading") async def init_weather_system(self): """Initialize random weather for all locations""" try: with open("config/weather_patterns.json", "r") as f: self.weather_patterns = json.load(f) # Set initial weather for all locations await self.update_all_weather() # Start background weather update task await self.start_weather_system() except FileNotFoundError: print("No weather_patterns.json found, skipping weather system") self.weather_patterns = {"weather_types": {}, "location_weather_chances": {}} async def update_all_weather(self): """Update weather for all locations""" import datetime async with aiosqlite.connect(self.database.db_path) as db: # Get all locations cursor = await db.execute("SELECT * FROM locations") locations = await cursor.fetchall() for location in locations: location_name = location[1] # name is second column # Get possible weather for this location possible_weather = self.weather_patterns.get("location_weather_chances", {}).get(location_name, ["Calm"]) # Choose random weather weather_type = random.choice(possible_weather) weather_config = self.weather_patterns.get("weather_types", {}).get(weather_type, { "spawn_modifier": 1.0, "affected_types": [], "duration_minutes": [90, 180] }) # Calculate end time with random duration duration_range = weather_config.get("duration_minutes", [90, 180]) duration_minutes = random.randint(duration_range[0], duration_range[1]) end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration_minutes) # Clear old weather and set new await db.execute("DELETE FROM location_weather WHERE location_id = ?", (location[0],)) await db.execute(""" INSERT INTO location_weather (location_id, weather_type, active_until, spawn_modifier, affected_types) VALUES (?, ?, ?, ?, ?) """, ( location[0], weather_type, end_time.isoformat(), weather_config.get("spawn_modifier", 1.0), ",".join(weather_config.get("affected_types", [])) )) await db.commit() async def check_and_award_achievements(self, player_id: int, action_type: str, data: str = ""): """Check for new achievements after player actions""" return await self.database.check_player_achievements(player_id, action_type, data) async def get_weather_modified_spawns(self, location_id: int, spawns: list) -> list: """Apply weather modifiers to spawn rates""" weather = await self.database.get_location_weather(location_id) if not weather or not weather.get("affected_types"): return spawns # No weather effects affected_types = weather["affected_types"].split(",") if weather["affected_types"] else [] modifier = weather.get("spawn_modifier", 1.0) # Apply weather modifier to matching pet types modified_spawns = [] for spawn in spawns: spawn_dict = dict(spawn) pet_types = [spawn_dict.get("type1")] if spawn_dict.get("type2"): pet_types.append(spawn_dict["type2"]) # Check if this pet type is affected by weather if any(pet_type in affected_types for pet_type in pet_types): spawn_dict["spawn_rate"] *= modifier modified_spawns.append(spawn_dict) return modified_spawns async def start_weather_system(self): """Start the background weather update task""" if self.weather_task is None or self.weather_task.done(): print("🌤️ Starting weather update background task...") self.weather_task = asyncio.create_task(self._weather_update_loop()) async def stop_weather_system(self): """Stop the background weather update task""" print("🌤️ Stopping weather update background task...") self.shutdown_event.set() if self.weather_task and not self.weather_task.done(): self.weather_task.cancel() try: await self.weather_task except asyncio.CancelledError: pass async def _weather_update_loop(self): """Background task that checks and updates expired weather""" try: while not self.shutdown_event.is_set(): try: # Check every 5 minutes for expired weather await asyncio.sleep(300) # 5 minutes if self.shutdown_event.is_set(): break # Check for locations with expired weather await self._check_and_update_expired_weather() except asyncio.CancelledError: break except Exception as e: print(f"Error in weather update loop: {e}") # Continue the loop even if there's an error await asyncio.sleep(60) # Wait a minute before retrying except asyncio.CancelledError: print("Weather update task cancelled") async def _check_and_update_expired_weather(self): """Check for expired weather and update it""" try: async with aiosqlite.connect(self.database.db_path) as db: # Find locations with expired weather cursor = await db.execute(""" SELECT l.id, l.name FROM locations l WHERE l.id NOT IN ( SELECT location_id FROM location_weather WHERE active_until > datetime('now') ) """) expired_locations = await cursor.fetchall() if expired_locations: print(f"🌤️ Updating weather for {len(expired_locations)} locations with expired weather") for location in expired_locations: location_id = location[0] location_name = location[1] # Get possible weather for this location possible_weather = self.weather_patterns.get("location_weather_chances", {}).get(location_name, ["Calm"]) # Choose random weather weather_type = random.choice(possible_weather) weather_config = self.weather_patterns.get("weather_types", {}).get(weather_type, { "spawn_modifier": 1.0, "affected_types": [], "duration_minutes": [90, 180] }) # Calculate end time with random duration duration_range = weather_config.get("duration_minutes", [90, 180]) duration_minutes = random.randint(duration_range[0], duration_range[1]) import datetime end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration_minutes) # Clear old weather and set new await db.execute("DELETE FROM location_weather WHERE location_id = ?", (location_id,)) await db.execute(""" INSERT INTO location_weather (location_id, weather_type, active_until, spawn_modifier, affected_types) VALUES (?, ?, ?, ?, ?) """, ( location_id, weather_type, end_time.isoformat(), weather_config.get("spawn_modifier", 1.0), ",".join(weather_config.get("affected_types", [])) )) print(f" 🌤️ {location_name}: New {weather_type} weather for {duration_minutes} minutes") await db.commit() except Exception as e: print(f"Error checking expired weather: {e}") async def shutdown(self): """Gracefully shutdown the game engine""" print("🔄 Shutting down game engine...") await self.stop_weather_system()