- Implemented comprehensive navigation system with hover dropdowns - Added navigation to all webserver pages for consistent user experience - Enhanced page templates with unified styling and active page highlighting - Improved accessibility and discoverability of all bot features through web interface 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
664 lines
No EOL
28 KiB
Python
664 lines
No EOL
28 KiB
Python
import json
|
|
import random
|
|
import aiosqlite
|
|
import asyncio
|
|
from typing import Dict, List, Optional
|
|
from .database import Database
|
|
from .battle_engine import BattleEngine
|
|
|
|
class GameEngine:
|
|
def __init__(self, database: Database):
|
|
self.database = database
|
|
self.battle_engine = BattleEngine(database)
|
|
self.pet_species = {}
|
|
self.locations = {}
|
|
self.moves = {}
|
|
self.type_chart = {}
|
|
self.weather_patterns = {}
|
|
self.weather_task = None
|
|
self.shutdown_event = asyncio.Event()
|
|
|
|
async def load_game_data(self):
|
|
await self.load_pet_species()
|
|
await self.load_locations()
|
|
await self.load_moves()
|
|
await self.load_type_chart()
|
|
await self.load_achievements()
|
|
await self.database.initialize_items()
|
|
await self.database.initialize_gyms()
|
|
await self.init_weather_system()
|
|
await self.battle_engine.load_battle_data()
|
|
|
|
async def load_pet_species(self):
|
|
try:
|
|
with open("config/pets.json", "r") as f:
|
|
species_data = json.load(f)
|
|
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
for species in species_data:
|
|
await db.execute("""
|
|
INSERT OR IGNORE INTO pet_species
|
|
(name, type1, type2, base_hp, base_attack, base_defense,
|
|
base_speed, evolution_level, rarity)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
species["name"], species["type1"], species.get("type2"),
|
|
species["base_hp"], species["base_attack"], species["base_defense"],
|
|
species["base_speed"], species.get("evolution_level"),
|
|
species.get("rarity", 1)
|
|
))
|
|
await db.commit()
|
|
|
|
except FileNotFoundError:
|
|
await self.create_default_species()
|
|
|
|
async def create_default_species(self):
|
|
default_species = [
|
|
{
|
|
"name": "Flamey",
|
|
"type1": "Fire",
|
|
"base_hp": 45,
|
|
"base_attack": 52,
|
|
"base_defense": 43,
|
|
"base_speed": 65,
|
|
"rarity": 1
|
|
},
|
|
{
|
|
"name": "Aqua",
|
|
"type1": "Water",
|
|
"base_hp": 44,
|
|
"base_attack": 48,
|
|
"base_defense": 65,
|
|
"base_speed": 43,
|
|
"rarity": 1
|
|
},
|
|
{
|
|
"name": "Leafy",
|
|
"type1": "Grass",
|
|
"base_hp": 45,
|
|
"base_attack": 49,
|
|
"base_defense": 49,
|
|
"base_speed": 45,
|
|
"rarity": 1
|
|
},
|
|
{
|
|
"name": "Sparky",
|
|
"type1": "Electric",
|
|
"base_hp": 35,
|
|
"base_attack": 55,
|
|
"base_defense": 40,
|
|
"base_speed": 90,
|
|
"rarity": 2
|
|
},
|
|
{
|
|
"name": "Rocky",
|
|
"type1": "Rock",
|
|
"base_hp": 40,
|
|
"base_attack": 80,
|
|
"base_defense": 100,
|
|
"base_speed": 25,
|
|
"rarity": 2
|
|
}
|
|
]
|
|
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
for species in default_species:
|
|
await db.execute("""
|
|
INSERT OR IGNORE INTO pet_species
|
|
(name, type1, type2, base_hp, base_attack, base_defense, base_speed, rarity)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
species["name"], species["type1"], species.get("type2"),
|
|
species["base_hp"], species["base_attack"], species["base_defense"],
|
|
species["base_speed"], species["rarity"]
|
|
))
|
|
await db.commit()
|
|
|
|
async def load_locations(self):
|
|
try:
|
|
with open("config/locations.json", "r") as f:
|
|
locations_data = json.load(f)
|
|
except FileNotFoundError:
|
|
locations_data = self.get_default_locations()
|
|
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
for location in locations_data:
|
|
await db.execute("""
|
|
INSERT OR IGNORE INTO locations (name, description, level_min, level_max)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (location["name"], location["description"],
|
|
location["level_min"], location["level_max"]))
|
|
|
|
cursor = await db.execute(
|
|
"SELECT id FROM locations WHERE name = ?", (location["name"],)
|
|
)
|
|
location_row = await cursor.fetchone()
|
|
location_id = location_row[0]
|
|
|
|
for spawn in location["spawns"]:
|
|
species_cursor = await db.execute(
|
|
"SELECT id FROM pet_species WHERE name = ?", (spawn["species"],)
|
|
)
|
|
species_id = await species_cursor.fetchone()
|
|
|
|
if species_id:
|
|
await db.execute("""
|
|
INSERT OR IGNORE INTO location_spawns
|
|
(location_id, species_id, spawn_rate, min_level, max_level)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (location_id, species_id[0], spawn["spawn_rate"],
|
|
spawn["min_level"], spawn["max_level"]))
|
|
await db.commit()
|
|
|
|
def get_default_locations(self):
|
|
return [
|
|
{
|
|
"name": "Starter Woods",
|
|
"description": "A peaceful forest perfect for beginners",
|
|
"level_min": 1,
|
|
"level_max": 5,
|
|
"spawns": [
|
|
{"species": "Leafy", "spawn_rate": 0.4, "min_level": 1, "max_level": 3},
|
|
{"species": "Flamey", "spawn_rate": 0.3, "min_level": 1, "max_level": 3},
|
|
{"species": "Aqua", "spawn_rate": 0.3, "min_level": 1, "max_level": 3}
|
|
]
|
|
},
|
|
{
|
|
"name": "Electric Canyon",
|
|
"description": "A charged valley crackling with energy",
|
|
"level_min": 3,
|
|
"level_max": 8,
|
|
"spawns": [
|
|
{"species": "Sparky", "spawn_rate": 0.6, "min_level": 3, "max_level": 6},
|
|
{"species": "Rocky", "spawn_rate": 0.4, "min_level": 4, "max_level": 7}
|
|
]
|
|
}
|
|
]
|
|
|
|
async def load_moves(self):
|
|
pass
|
|
|
|
async def load_type_chart(self):
|
|
pass
|
|
|
|
async def give_starter_pet(self, player_id: int) -> Dict:
|
|
starters = ["Flamey", "Aqua", "Leafy"]
|
|
chosen_starter = random.choice(starters)
|
|
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT * FROM pet_species WHERE name = ?", (chosen_starter,)
|
|
)
|
|
species = await cursor.fetchone()
|
|
|
|
pet_data = self.generate_pet_stats(dict(species), level=5)
|
|
|
|
cursor = await db.execute("""
|
|
INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp,
|
|
attack, defense, speed, is_active, team_order)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (player_id, species["id"], pet_data["level"], 0,
|
|
pet_data["hp"], pet_data["hp"], pet_data["attack"],
|
|
pet_data["defense"], pet_data["speed"], True, 1))
|
|
|
|
await db.commit()
|
|
|
|
return {"species_name": chosen_starter, **pet_data}
|
|
|
|
def generate_pet_stats(self, species: Dict, level: int = 1) -> Dict:
|
|
iv_bonus = random.randint(0, 31)
|
|
|
|
hp = int((2 * species["base_hp"] + iv_bonus) * level / 100) + level + 10
|
|
attack = int((2 * species["base_attack"] + iv_bonus) * level / 100) + 5
|
|
defense = int((2 * species["base_defense"] + iv_bonus) * level / 100) + 5
|
|
speed = int((2 * species["base_speed"] + iv_bonus) * level / 100) + 5
|
|
|
|
return {
|
|
"level": level,
|
|
"hp": hp,
|
|
"attack": attack,
|
|
"defense": defense,
|
|
"speed": speed
|
|
}
|
|
|
|
async def attempt_catch(self, player_id: int, location_name: str) -> str:
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT * FROM locations WHERE name = ?", (location_name,)
|
|
)
|
|
location = await cursor.fetchone()
|
|
|
|
if not location:
|
|
return f"Location '{location_name}' not found!"
|
|
|
|
cursor = await db.execute("""
|
|
SELECT ls.*, ps.name as species_name, ps.*
|
|
FROM location_spawns ls
|
|
JOIN pet_species ps ON ls.species_id = ps.id
|
|
WHERE ls.location_id = ?
|
|
""", (location["id"],))
|
|
spawns = await cursor.fetchall()
|
|
|
|
if not spawns:
|
|
return f"No pets found in {location_name}!"
|
|
|
|
if random.random() > 0.7:
|
|
return f"You searched {location_name} but found nothing..."
|
|
|
|
chosen_spawn = random.choices(
|
|
spawns,
|
|
weights=[spawn["spawn_rate"] for spawn in spawns]
|
|
)[0]
|
|
|
|
pet_level = random.randint(chosen_spawn["min_level"], chosen_spawn["max_level"])
|
|
pet_stats = self.generate_pet_stats(dict(chosen_spawn), pet_level)
|
|
|
|
catch_rate = 0.5 + (0.3 / chosen_spawn["rarity"])
|
|
if random.random() < catch_rate:
|
|
cursor = await db.execute("""
|
|
INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp,
|
|
attack, defense, speed, is_active)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (player_id, chosen_spawn["species_id"], pet_level, 0,
|
|
pet_stats["hp"], pet_stats["hp"], pet_stats["attack"],
|
|
pet_stats["defense"], pet_stats["speed"], False))
|
|
|
|
await db.commit()
|
|
return f"Caught a level {pet_level} {chosen_spawn['species_name']}!"
|
|
else:
|
|
return f"A wild {chosen_spawn['species_name']} appeared but escaped!"
|
|
|
|
async def get_location_spawns(self, location_name: str) -> List[Dict]:
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT * FROM locations WHERE name = ?", (location_name,)
|
|
)
|
|
location = await cursor.fetchone()
|
|
|
|
if not location:
|
|
return []
|
|
|
|
cursor = await db.execute("""
|
|
SELECT ps.name, ps.type1, ps.type2, ls.spawn_rate
|
|
FROM location_spawns ls
|
|
JOIN pet_species ps ON ls.species_id = ps.id
|
|
WHERE ls.location_id = ?
|
|
""", (location["id"],))
|
|
spawns = await cursor.fetchall()
|
|
|
|
return [dict(spawn) for spawn in spawns]
|
|
|
|
async def explore_location(self, player_id: int) -> Dict:
|
|
"""Explore the player's current location and return encounter results"""
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
|
|
# Get player's current location
|
|
location = await self.database.get_player_location(player_id)
|
|
if not location:
|
|
return {"type": "error", "message": "You are not in a valid location!"}
|
|
|
|
# Check for item discovery first (30% chance)
|
|
item_result = await self.check_item_discovery(player_id, location)
|
|
if item_result:
|
|
return item_result
|
|
|
|
# Get spawns for current location
|
|
cursor = await db.execute("""
|
|
SELECT ls.*, ps.name as species_name, ps.*
|
|
FROM location_spawns ls
|
|
JOIN pet_species ps ON ls.species_id = ps.id
|
|
WHERE ls.location_id = ?
|
|
""", (location["id"],))
|
|
spawns = await cursor.fetchall()
|
|
|
|
if not spawns:
|
|
return {"type": "empty", "message": f"You explore {location['name']} but find nothing interesting..."}
|
|
|
|
# Apply weather modifiers to spawns
|
|
modified_spawns = await self.get_weather_modified_spawns(location["id"], spawns)
|
|
|
|
# Random encounter chance (50% chance of finding a pet after item check)
|
|
if random.random() > 0.5:
|
|
return {"type": "empty", "message": f"You explore {location['name']} but find nothing this time..."}
|
|
|
|
# Choose random spawn with weather-modified rates
|
|
chosen_spawn = random.choices(
|
|
modified_spawns,
|
|
weights=[spawn["spawn_rate"] for spawn in modified_spawns]
|
|
)[0]
|
|
|
|
pet_level = random.randint(chosen_spawn["min_level"], chosen_spawn["max_level"])
|
|
|
|
pet_stats = self.generate_pet_stats(dict(chosen_spawn), pet_level)
|
|
|
|
return {
|
|
"type": "encounter",
|
|
"location": location["name"],
|
|
"pet": {
|
|
"species_id": chosen_spawn["species_id"],
|
|
"species_name": chosen_spawn["species_name"],
|
|
"level": pet_level,
|
|
"type1": chosen_spawn["type1"],
|
|
"type2": chosen_spawn["type2"],
|
|
"stats": pet_stats,
|
|
# Additional battle-ready stats
|
|
"hp": pet_stats["hp"],
|
|
"max_hp": pet_stats["hp"],
|
|
"attack": pet_stats["attack"],
|
|
"defense": pet_stats["defense"],
|
|
"speed": pet_stats["speed"]
|
|
}
|
|
}
|
|
|
|
async def check_item_discovery(self, player_id: int, location: Dict) -> Optional[Dict]:
|
|
"""Check if player finds an item during exploration"""
|
|
import json
|
|
|
|
# Load items config
|
|
try:
|
|
with open("config/items.json", "r") as f:
|
|
items_data = json.load(f)
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
# Get all possible items for this location
|
|
available_items = []
|
|
location_name = location["name"].lower().replace(" ", "_")
|
|
|
|
for category_items in items_data.values():
|
|
if isinstance(category_items, list):
|
|
for item in category_items:
|
|
if "locations" in item:
|
|
item_locations = item["locations"]
|
|
if "all" in item_locations or location_name in item_locations:
|
|
available_items.append(item)
|
|
|
|
if not available_items:
|
|
return None
|
|
|
|
# Calculate total spawn rates for this location
|
|
total_rate = sum(item.get("spawn_rate", 0.1) for item in available_items)
|
|
|
|
# 30% base chance of finding an item
|
|
if random.random() > 0.3:
|
|
return None
|
|
|
|
# Choose item based on spawn rates
|
|
chosen_item = random.choices(
|
|
available_items,
|
|
weights=[item.get("spawn_rate", 0.1) for item in available_items]
|
|
)[0]
|
|
|
|
# Add item to player's inventory
|
|
success = await self.database.add_item_to_inventory(player_id, chosen_item["name"])
|
|
|
|
if success:
|
|
# Get rarity info for display
|
|
rarity_info = items_data.get("rarity_info", {}).get(chosen_item["rarity"], {})
|
|
symbol = rarity_info.get("symbol", "○")
|
|
|
|
return {
|
|
"type": "item_found",
|
|
"location": location["name"],
|
|
"item": chosen_item,
|
|
"symbol": symbol,
|
|
"message": f"🎒 You found a {symbol} {chosen_item['name']} ({chosen_item['rarity']})! {chosen_item['description']}"
|
|
}
|
|
|
|
return None
|
|
|
|
async def attempt_catch_current_location(self, player_id: int, target_pet: Dict) -> str:
|
|
"""Attempt to catch a pet during exploration"""
|
|
catch_rate = 0.5 + (0.3 / target_pet.get("rarity", 1))
|
|
|
|
if random.random() < catch_rate:
|
|
# Successfully caught the pet
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
cursor = await db.execute("""
|
|
INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp,
|
|
attack, defense, speed, is_active)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (player_id, target_pet["species_id"], target_pet["level"], 0,
|
|
target_pet["stats"]["hp"], target_pet["stats"]["hp"],
|
|
target_pet["stats"]["attack"], target_pet["stats"]["defense"],
|
|
target_pet["stats"]["speed"], False))
|
|
|
|
await db.commit()
|
|
return f"Success! You caught the Level {target_pet['level']} {target_pet['species_name']}!"
|
|
else:
|
|
return f"The wild {target_pet['species_name']} escaped!"
|
|
|
|
async def load_achievements(self):
|
|
"""Load achievements from config and populate database"""
|
|
try:
|
|
with open("config/achievements.json", "r") as f:
|
|
achievements_data = json.load(f)
|
|
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
for achievement in achievements_data:
|
|
# Get location ID if specified
|
|
location_id = None
|
|
if achievement.get("unlock_location"):
|
|
cursor = await db.execute(
|
|
"SELECT id FROM locations WHERE name = ?",
|
|
(achievement["unlock_location"],)
|
|
)
|
|
location_row = await cursor.fetchone()
|
|
if location_row:
|
|
location_id = location_row[0]
|
|
|
|
# Insert or update achievement
|
|
await db.execute("""
|
|
INSERT OR REPLACE INTO achievements
|
|
(name, description, requirement_type, requirement_data, unlock_location_id)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
achievement["name"], achievement["description"],
|
|
achievement["requirement_type"], achievement["requirement_data"],
|
|
location_id
|
|
))
|
|
|
|
await db.commit()
|
|
|
|
except FileNotFoundError:
|
|
print("No achievements.json found, skipping achievement loading")
|
|
|
|
async def init_weather_system(self):
|
|
"""Initialize random weather for all locations"""
|
|
try:
|
|
with open("config/weather_patterns.json", "r") as f:
|
|
self.weather_patterns = json.load(f)
|
|
|
|
# Set initial weather for all locations
|
|
await self.update_all_weather()
|
|
|
|
# Start background weather update task
|
|
await self.start_weather_system()
|
|
|
|
except FileNotFoundError:
|
|
print("No weather_patterns.json found, skipping weather system")
|
|
self.weather_patterns = {"weather_types": {}, "location_weather_chances": {}}
|
|
|
|
async def update_all_weather(self):
|
|
"""Update weather for all locations"""
|
|
import datetime
|
|
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
# Get all locations
|
|
cursor = await db.execute("SELECT * FROM locations")
|
|
locations = await cursor.fetchall()
|
|
|
|
for location in locations:
|
|
location_name = location[1] # name is second column
|
|
|
|
# Get possible weather for this location
|
|
possible_weather = self.weather_patterns.get("location_weather_chances", {}).get(location_name, ["Calm"])
|
|
|
|
# Choose random weather
|
|
weather_type = random.choice(possible_weather)
|
|
weather_config = self.weather_patterns.get("weather_types", {}).get(weather_type, {
|
|
"spawn_modifier": 1.0,
|
|
"affected_types": [],
|
|
"duration_minutes": [90, 180]
|
|
})
|
|
|
|
# Calculate end time with random duration
|
|
duration_range = weather_config.get("duration_minutes", [90, 180])
|
|
duration_minutes = random.randint(duration_range[0], duration_range[1])
|
|
end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration_minutes)
|
|
|
|
# Clear old weather and set new
|
|
await db.execute("DELETE FROM location_weather WHERE location_id = ?", (location[0],))
|
|
await db.execute("""
|
|
INSERT INTO location_weather
|
|
(location_id, weather_type, active_until, spawn_modifier, affected_types)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
location[0], weather_type, end_time.isoformat(),
|
|
weather_config.get("spawn_modifier", 1.0),
|
|
",".join(weather_config.get("affected_types", []))
|
|
))
|
|
|
|
await db.commit()
|
|
|
|
async def check_and_award_achievements(self, player_id: int, action_type: str, data: str = ""):
|
|
"""Check for new achievements after player actions"""
|
|
return await self.database.check_player_achievements(player_id, action_type, data)
|
|
|
|
async def check_all_achievements(self, player_id: int):
|
|
"""Check and award ALL possible achievements for a player"""
|
|
return await self.database.check_all_achievements(player_id)
|
|
|
|
async def get_weather_modified_spawns(self, location_id: int, spawns: list) -> list:
|
|
"""Apply weather modifiers to spawn rates"""
|
|
weather = await self.database.get_location_weather(location_id)
|
|
|
|
if not weather or not weather.get("affected_types"):
|
|
return spawns # No weather effects
|
|
|
|
affected_types = weather["affected_types"].split(",") if weather["affected_types"] else []
|
|
modifier = weather.get("spawn_modifier", 1.0)
|
|
|
|
# Apply weather modifier to matching pet types
|
|
modified_spawns = []
|
|
for spawn in spawns:
|
|
spawn_dict = dict(spawn)
|
|
pet_types = [spawn_dict.get("type1")]
|
|
if spawn_dict.get("type2"):
|
|
pet_types.append(spawn_dict["type2"])
|
|
|
|
# Check if this pet type is affected by weather
|
|
if any(pet_type in affected_types for pet_type in pet_types):
|
|
spawn_dict["spawn_rate"] *= modifier
|
|
|
|
modified_spawns.append(spawn_dict)
|
|
|
|
return modified_spawns
|
|
|
|
async def start_weather_system(self):
|
|
"""Start the background weather update task"""
|
|
if self.weather_task is None or self.weather_task.done():
|
|
print("🌤️ Starting weather update background task...")
|
|
self.weather_task = asyncio.create_task(self._weather_update_loop())
|
|
|
|
async def stop_weather_system(self):
|
|
"""Stop the background weather update task"""
|
|
print("🌤️ Stopping weather update background task...")
|
|
self.shutdown_event.set()
|
|
if self.weather_task and not self.weather_task.done():
|
|
self.weather_task.cancel()
|
|
try:
|
|
await self.weather_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _weather_update_loop(self):
|
|
"""Background task that checks and updates expired weather"""
|
|
try:
|
|
while not self.shutdown_event.is_set():
|
|
try:
|
|
# Check every 5 minutes for expired weather
|
|
await asyncio.sleep(300) # 5 minutes
|
|
|
|
if self.shutdown_event.is_set():
|
|
break
|
|
|
|
# Check for locations with expired weather
|
|
await self._check_and_update_expired_weather()
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
print(f"Error in weather update loop: {e}")
|
|
# Continue the loop even if there's an error
|
|
await asyncio.sleep(60) # Wait a minute before retrying
|
|
|
|
except asyncio.CancelledError:
|
|
print("Weather update task cancelled")
|
|
|
|
async def _check_and_update_expired_weather(self):
|
|
"""Check for expired weather and update it"""
|
|
try:
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
# Find locations with expired weather
|
|
cursor = await db.execute("""
|
|
SELECT l.id, l.name
|
|
FROM locations l
|
|
WHERE l.id NOT IN (
|
|
SELECT location_id FROM location_weather
|
|
WHERE active_until > datetime('now')
|
|
)
|
|
""")
|
|
expired_locations = await cursor.fetchall()
|
|
|
|
if expired_locations:
|
|
print(f"🌤️ Updating weather for {len(expired_locations)} locations with expired weather")
|
|
|
|
for location in expired_locations:
|
|
location_id = location[0]
|
|
location_name = location[1]
|
|
|
|
# Get possible weather for this location
|
|
possible_weather = self.weather_patterns.get("location_weather_chances", {}).get(location_name, ["Calm"])
|
|
|
|
# Choose random weather
|
|
weather_type = random.choice(possible_weather)
|
|
weather_config = self.weather_patterns.get("weather_types", {}).get(weather_type, {
|
|
"spawn_modifier": 1.0,
|
|
"affected_types": [],
|
|
"duration_minutes": [90, 180]
|
|
})
|
|
|
|
# Calculate end time with random duration
|
|
duration_range = weather_config.get("duration_minutes", [90, 180])
|
|
duration_minutes = random.randint(duration_range[0], duration_range[1])
|
|
import datetime
|
|
end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration_minutes)
|
|
|
|
# Clear old weather and set new
|
|
await db.execute("DELETE FROM location_weather WHERE location_id = ?", (location_id,))
|
|
await db.execute("""
|
|
INSERT INTO location_weather
|
|
(location_id, weather_type, active_until, spawn_modifier, affected_types)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
location_id, weather_type, end_time.isoformat(),
|
|
weather_config.get("spawn_modifier", 1.0),
|
|
",".join(weather_config.get("affected_types", []))
|
|
))
|
|
|
|
print(f" 🌤️ {location_name}: New {weather_type} weather for {duration_minutes} minutes")
|
|
|
|
await db.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error checking expired weather: {e}")
|
|
|
|
async def shutdown(self):
|
|
"""Gracefully shutdown the game engine"""
|
|
print("🔄 Shutting down game engine...")
|
|
await self.stop_weather_system() |