Petbot/src/game_engine.py
megaproxy e17705dc63 Implement comprehensive team management and fix critical bugs
Team Management Features:
- Added 6 new IRC commands: \!teamlist, \!activeteam, \!teamname, \!teamswap, \!heal, \!verifyteamswap
- \!teamlist shows teams with pet names in format "Team name - pet1 - pet2 - pet3"
- \!teamname redirects to web interface for secure PIN-based renaming
- \!teamswap enables team switching with PIN verification via IRC
- \!activeteam displays current team with health status indicators
- \!heal command with 1-hour cooldown for pet health restoration

Critical Bug Fixes:
- Fixed \!teamlist SQL binding error - handled new team data format correctly
- Fixed \!wild command duplicates - now shows unique species types only
- Removed all debug print statements and implemented proper logging
- Fixed data format inconsistencies in team management system

Production Improvements:
- Added logging infrastructure to BaseModule and core components
- Converted 45+ print statements to professional logging calls
- Database query optimization with DISTINCT for spawn deduplication
- Enhanced error handling and user feedback messages

Cross-platform Integration:
- Seamless sync between IRC commands and web interface
- PIN authentication leverages existing secure infrastructure
- Team operations maintain consistency across all interfaces

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-01 15:53:26 +00:00

836 lines
No EOL
38 KiB
Python

import json
import random
import aiosqlite
import asyncio
import logging
from typing import Dict, List, Optional
from .database import Database
from .battle_engine import BattleEngine
class GameEngine:
def __init__(self, database: Database):
self.database = database
self.battle_engine = BattleEngine(database)
self.pet_species = {}
self.locations = {}
self.moves = {}
self.type_chart = {}
self.weather_patterns = {}
self.weather_task = None
self.pet_recovery_task = None
self.shutdown_event = asyncio.Event()
self.logger = logging.getLogger(__name__)
async def load_game_data(self):
await self.load_pet_species()
await self.load_locations()
await self.load_moves()
await self.load_type_chart()
await self.load_achievements()
await self.database.initialize_items()
await self.database.initialize_gyms()
await self.init_weather_system()
await self.battle_engine.load_battle_data()
await self.start_pet_recovery_system()
async def load_pet_species(self):
try:
with open("config/pets.json", "r") as f:
species_data = json.load(f)
async with aiosqlite.connect(self.database.db_path) as db:
# Check if species already exist to avoid re-inserting and changing IDs
cursor = await db.execute("SELECT COUNT(*) FROM pet_species")
existing_count = (await cursor.fetchone())[0]
if existing_count == 0:
# Only insert if no species exist to avoid ID conflicts
for species in species_data:
await db.execute("""
INSERT OR IGNORE INTO pet_species
(name, type1, type2, base_hp, base_attack, base_defense,
base_speed, evolution_level, rarity, emoji)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
species["name"], species["type1"], species.get("type2"),
species["base_hp"], species["base_attack"], species["base_defense"],
species["base_speed"], species.get("evolution_level"),
species.get("rarity", 1), species.get("emoji", "🐾")
))
await db.commit()
self.logger.info(f"✅ Loaded {len(species_data)} pet species into database")
else:
self.logger.info(f"✅ Found {existing_count} existing pet species - skipping reload to preserve IDs")
except FileNotFoundError:
await self.create_default_species()
async def create_default_species(self):
default_species = [
{
"name": "Flamey",
"type1": "Fire",
"base_hp": 45,
"base_attack": 52,
"base_defense": 43,
"base_speed": 65,
"rarity": 1
},
{
"name": "Aqua",
"type1": "Water",
"base_hp": 44,
"base_attack": 48,
"base_defense": 65,
"base_speed": 43,
"rarity": 1
},
{
"name": "Leafy",
"type1": "Grass",
"base_hp": 45,
"base_attack": 49,
"base_defense": 49,
"base_speed": 45,
"rarity": 1
},
{
"name": "Sparky",
"type1": "Electric",
"base_hp": 35,
"base_attack": 55,
"base_defense": 40,
"base_speed": 90,
"rarity": 2
},
{
"name": "Rocky",
"type1": "Rock",
"base_hp": 40,
"base_attack": 80,
"base_defense": 100,
"base_speed": 25,
"rarity": 2
}
]
async with aiosqlite.connect(self.database.db_path) as db:
for species in default_species:
await db.execute("""
INSERT OR IGNORE INTO pet_species
(name, type1, type2, base_hp, base_attack, base_defense, base_speed, rarity)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
species["name"], species["type1"], species.get("type2"),
species["base_hp"], species["base_attack"], species["base_defense"],
species["base_speed"], species["rarity"]
))
await db.commit()
async def load_locations(self):
try:
with open("config/locations.json", "r") as f:
locations_data = json.load(f)
except FileNotFoundError:
locations_data = self.get_default_locations()
async with aiosqlite.connect(self.database.db_path) as db:
for location in locations_data:
await db.execute("""
INSERT OR IGNORE INTO locations (name, description, level_min, level_max)
VALUES (?, ?, ?, ?)
""", (location["name"], location["description"],
location["level_min"], location["level_max"]))
cursor = await db.execute(
"SELECT id FROM locations WHERE name = ?", (location["name"],)
)
location_row = await cursor.fetchone()
location_id = location_row[0]
for spawn in location["spawns"]:
species_cursor = await db.execute(
"SELECT id FROM pet_species WHERE name = ?", (spawn["species"],)
)
species_id = await species_cursor.fetchone()
if species_id:
await db.execute("""
INSERT OR IGNORE INTO location_spawns
(location_id, species_id, spawn_rate, min_level, max_level)
VALUES (?, ?, ?, ?, ?)
""", (location_id, species_id[0], spawn["spawn_rate"],
spawn["min_level"], spawn["max_level"]))
await db.commit()
def get_default_locations(self):
return [
{
"name": "Starter Woods",
"description": "A peaceful forest perfect for beginners",
"level_min": 1,
"level_max": 5,
"spawns": [
{"species": "Leafy", "spawn_rate": 0.4, "min_level": 1, "max_level": 3},
{"species": "Flamey", "spawn_rate": 0.3, "min_level": 1, "max_level": 3},
{"species": "Aqua", "spawn_rate": 0.3, "min_level": 1, "max_level": 3}
]
},
{
"name": "Electric Canyon",
"description": "A charged valley crackling with energy",
"level_min": 3,
"level_max": 8,
"spawns": [
{"species": "Sparky", "spawn_rate": 0.6, "min_level": 3, "max_level": 6},
{"species": "Rocky", "spawn_rate": 0.4, "min_level": 4, "max_level": 7}
]
}
]
async def load_moves(self):
pass
async def load_type_chart(self):
pass
async def give_starter_pet(self, player_id: int) -> Dict:
starters = ["Flamey", "Aqua", "Leafy"]
chosen_starter = random.choice(starters)
async with aiosqlite.connect(self.database.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM pet_species WHERE name = ?", (chosen_starter,)
)
species = await cursor.fetchone()
pet_data = self.generate_pet_stats(dict(species), level=5)
cursor = await db.execute("""
INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp,
attack, defense, speed, is_active, team_order,
iv_hp, iv_attack, iv_defense, iv_speed, original_trainer_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (player_id, species["id"], pet_data["level"], 0,
pet_data["hp"], pet_data["max_hp"], pet_data["attack"],
pet_data["defense"], pet_data["speed"], True, 1,
pet_data["iv_hp"], pet_data["iv_attack"], pet_data["iv_defense"],
pet_data["iv_speed"], player_id))
await db.commit()
return {"species_name": chosen_starter, **pet_data}
def generate_pet_stats(self, species: Dict, level: int = 1) -> Dict:
# Generate individual IVs for each stat (0-31)
iv_hp = random.randint(0, 31)
iv_attack = random.randint(0, 31)
iv_defense = random.randint(0, 31)
iv_speed = random.randint(0, 31)
# Calculate stats using individual IVs (Pokemon-style formula)
hp = int((2 * species["base_hp"] + iv_hp) * level / 100) + level + 10
attack = int((2 * species["base_attack"] + iv_attack) * level / 100) + 5
defense = int((2 * species["base_defense"] + iv_defense) * level / 100) + 5
speed = int((2 * species["base_speed"] + iv_speed) * level / 100) + 5
return {
"level": level,
"hp": hp,
"max_hp": hp, # Initial HP is max HP
"attack": attack,
"defense": defense,
"speed": speed,
# Include IVs in the returned data for storage
"iv_hp": iv_hp,
"iv_attack": iv_attack,
"iv_defense": iv_defense,
"iv_speed": iv_speed
}
async def attempt_catch(self, player_id: int, location_name: str) -> str:
async with aiosqlite.connect(self.database.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM locations WHERE name = ?", (location_name,)
)
location = await cursor.fetchone()
if not location:
return f"Location '{location_name}' not found!"
cursor = await db.execute("""
SELECT ls.*, ps.name as species_name, ps.*
FROM location_spawns ls
JOIN pet_species ps ON ls.species_id = ps.id
WHERE ls.location_id = ?
""", (location["id"],))
spawns = await cursor.fetchall()
if not spawns:
return f"No pets found in {location_name}!"
if random.random() > 0.7:
return f"You searched {location_name} but found nothing..."
chosen_spawn = random.choices(
spawns,
weights=[spawn["spawn_rate"] for spawn in spawns]
)[0]
pet_level = random.randint(chosen_spawn["min_level"], chosen_spawn["max_level"])
pet_stats = self.generate_pet_stats(dict(chosen_spawn), pet_level)
catch_rate = 0.5 + (0.3 / chosen_spawn["rarity"])
if random.random() < catch_rate:
cursor = await db.execute("""
INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp,
attack, defense, speed, is_active,
iv_hp, iv_attack, iv_defense, iv_speed, original_trainer_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (player_id, chosen_spawn["species_id"], pet_level, 0,
pet_stats["hp"], pet_stats["max_hp"], pet_stats["attack"],
pet_stats["defense"], pet_stats["speed"], False,
pet_stats["iv_hp"], pet_stats["iv_attack"], pet_stats["iv_defense"],
pet_stats["iv_speed"], player_id))
await db.commit()
return f"Caught a level {pet_level} {chosen_spawn['species_name']}!"
else:
return f"A wild {chosen_spawn['species_name']} appeared but escaped!"
async def get_location_spawns(self, location_name: str) -> List[Dict]:
async with aiosqlite.connect(self.database.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM locations WHERE name = ?", (location_name,)
)
location = await cursor.fetchone()
if not location:
return []
cursor = await db.execute("""
SELECT DISTINCT ps.name, ps.type1, ps.type2, MIN(ls.spawn_rate) as spawn_rate
FROM location_spawns ls
JOIN pet_species ps ON ls.species_id = ps.id
WHERE ls.location_id = ?
GROUP BY ps.id, ps.name, ps.type1, ps.type2
""", (location["id"],))
spawns = await cursor.fetchall()
return [dict(spawn) for spawn in spawns]
async def explore_location(self, player_id: int) -> Dict:
"""Explore the player's current location and return encounter results"""
async with aiosqlite.connect(self.database.db_path) as db:
db.row_factory = aiosqlite.Row
# Get player's current location
location = await self.database.get_player_location(player_id)
if not location:
return {"type": "error", "message": "You are not in a valid location!"}
# Check for item discovery first (30% chance)
item_result = await self.check_item_discovery(player_id, location)
if item_result:
return item_result
# Get spawns for current location
cursor = await db.execute("""
SELECT ls.*, ps.name as species_name, ps.*
FROM location_spawns ls
JOIN pet_species ps ON ls.species_id = ps.id
WHERE ls.location_id = ?
""", (location["id"],))
spawns = await cursor.fetchall()
if not spawns:
return {"type": "empty", "message": f"You explore {location['name']} but find nothing interesting..."}
# Apply weather modifiers to spawns
modified_spawns = await self.get_weather_modified_spawns(location["id"], spawns)
# Random encounter chance (50% chance of finding a pet after item check)
if random.random() > 0.5:
return {"type": "empty", "message": f"You explore {location['name']} but find nothing this time..."}
# Choose random spawn with weather-modified rates
chosen_spawn = random.choices(
modified_spawns,
weights=[spawn["spawn_rate"] for spawn in modified_spawns]
)[0]
pet_level = random.randint(chosen_spawn["min_level"], chosen_spawn["max_level"])
pet_stats = self.generate_pet_stats(dict(chosen_spawn), pet_level)
return {
"type": "encounter",
"location": location["name"],
"pet": {
"species_id": chosen_spawn["species_id"],
"species_name": chosen_spawn["species_name"],
"level": pet_level,
"type1": chosen_spawn["type1"],
"type2": chosen_spawn["type2"],
"stats": pet_stats,
# Additional battle-ready stats
"hp": pet_stats["hp"],
"max_hp": pet_stats["hp"],
"attack": pet_stats["attack"],
"defense": pet_stats["defense"],
"speed": pet_stats["speed"]
}
}
async def check_item_discovery(self, player_id: int, location: Dict) -> Optional[Dict]:
"""Check if player finds an item during exploration"""
import json
# Load items config
try:
with open("config/items.json", "r") as f:
items_data = json.load(f)
except FileNotFoundError:
return None
# Get global spawn multiplier from config
global_multiplier = items_data.get("_config", {}).get("global_spawn_multiplier", 1.0)
# Get all possible items for this location
available_items = []
location_name = location["name"].lower().replace(" ", "_")
for category_items in items_data.values():
if isinstance(category_items, list):
for item in category_items:
if "locations" in item:
item_locations = item["locations"]
if "all" in item_locations or location_name in item_locations:
# Apply global multiplier to spawn rate
item_copy = item.copy()
item_copy["effective_spawn_rate"] = item.get("spawn_rate", 0.1) * global_multiplier
available_items.append(item_copy)
if not available_items:
return None
# Calculate total spawn rates for this location (using effective rates)
total_rate = sum(item.get("effective_spawn_rate", 0.1) for item in available_items)
# 30% base chance of finding an item
if random.random() > 0.3:
return None
# Choose item based on effective spawn rates (with global multiplier applied)
chosen_item = random.choices(
available_items,
weights=[item.get("effective_spawn_rate", 0.1) for item in available_items]
)[0]
# Add item to player's inventory
success = await self.database.add_item_to_inventory(player_id, chosen_item["name"])
if success:
# Get rarity info for display
rarity_info = items_data.get("rarity_info", {}).get(chosen_item["rarity"], {})
symbol = rarity_info.get("symbol", "")
return {
"type": "item_found",
"location": location["name"],
"item": chosen_item,
"symbol": symbol,
"message": f"🎒 You found a {symbol} {chosen_item['name']} ({chosen_item['rarity']})! {chosen_item['description']}"
}
return None
async def attempt_catch_current_location(self, player_id: int, target_pet: Dict) -> str:
"""Attempt to catch a pet during exploration"""
catch_rate = 0.5 + (0.3 / target_pet.get("rarity", 1))
if random.random() < catch_rate:
# Successfully caught the pet
async with aiosqlite.connect(self.database.db_path) as db:
cursor = await db.execute("""
INSERT INTO pets (player_id, species_id, level, experience, hp, max_hp,
attack, defense, speed, is_active,
iv_hp, iv_attack, iv_defense, iv_speed, original_trainer_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (player_id, target_pet["species_id"], target_pet["level"], 0,
target_pet["stats"]["hp"], target_pet["stats"]["max_hp"],
target_pet["stats"]["attack"], target_pet["stats"]["defense"],
target_pet["stats"]["speed"], False,
target_pet["stats"]["iv_hp"], target_pet["stats"]["iv_attack"],
target_pet["stats"]["iv_defense"], target_pet["stats"]["iv_speed"], player_id))
await db.commit()
return f"Success! You caught the Level {target_pet['level']} {target_pet['species_name']}!"
else:
return f"The wild {target_pet['species_name']} escaped!"
async def load_achievements(self):
"""Load achievements from config and populate database"""
try:
with open("config/achievements.json", "r") as f:
achievements_data = json.load(f)
async with aiosqlite.connect(self.database.db_path) as db:
for achievement in achievements_data:
# Get location ID if specified
location_id = None
if achievement.get("unlock_location"):
cursor = await db.execute(
"SELECT id FROM locations WHERE name = ?",
(achievement["unlock_location"],)
)
location_row = await cursor.fetchone()
if location_row:
location_id = location_row[0]
# Insert or update achievement
await db.execute("""
INSERT OR REPLACE INTO achievements
(name, description, requirement_type, requirement_data, unlock_location_id)
VALUES (?, ?, ?, ?, ?)
""", (
achievement["name"], achievement["description"],
achievement["requirement_type"], achievement["requirement_data"],
location_id
))
await db.commit()
except FileNotFoundError:
self.logger.warning("No achievements.json found, skipping achievement loading")
async def init_weather_system(self):
"""Initialize random weather for all locations"""
try:
with open("config/weather_patterns.json", "r") as f:
self.weather_patterns = json.load(f)
# Set initial weather for all locations
await self.update_all_weather()
# Start background weather update task
await self.start_weather_system()
except FileNotFoundError:
self.logger.warning("No weather_patterns.json found, skipping weather system")
self.weather_patterns = {"weather_types": {}, "location_weather_chances": {}}
async def update_all_weather(self):
"""Update weather for all locations"""
import datetime
async with aiosqlite.connect(self.database.db_path) as db:
# Get all locations
cursor = await db.execute("SELECT * FROM locations")
locations = await cursor.fetchall()
for location in locations:
location_name = location[1] # name is second column
# Get possible weather for this location
possible_weather = self.weather_patterns.get("location_weather_chances", {}).get(location_name, ["Calm"])
# Choose random weather
weather_type = random.choice(possible_weather)
weather_config = self.weather_patterns.get("weather_types", {}).get(weather_type, {
"spawn_modifier": 1.0,
"affected_types": [],
"duration_minutes": [90, 180]
})
# Calculate end time with random duration
duration_range = weather_config.get("duration_minutes", [90, 180])
duration_minutes = random.randint(duration_range[0], duration_range[1])
end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration_minutes)
# Clear old weather and set new
await db.execute("DELETE FROM location_weather WHERE location_id = ?", (location[0],))
await db.execute("""
INSERT INTO location_weather
(location_id, weather_type, active_until, spawn_modifier, affected_types)
VALUES (?, ?, ?, ?, ?)
""", (
location[0], weather_type, end_time.isoformat(),
weather_config.get("spawn_modifier", 1.0),
",".join(weather_config.get("affected_types", []))
))
await db.commit()
async def check_and_award_achievements(self, player_id: int, action_type: str, data: str = ""):
"""Check for new achievements after player actions"""
return await self.database.check_player_achievements(player_id, action_type, data)
async def check_all_achievements(self, player_id: int):
"""Check and award ALL possible achievements for a player"""
return await self.database.check_all_achievements(player_id)
async def get_weather_modified_spawns(self, location_id: int, spawns: list) -> list:
"""Apply weather modifiers to spawn rates"""
weather = await self.database.get_location_weather(location_id)
if not weather or not weather.get("affected_types"):
return spawns # No weather effects
affected_types = weather["affected_types"].split(",") if weather["affected_types"] else []
modifier = weather.get("spawn_modifier", 1.0)
# Apply weather modifier to matching pet types
modified_spawns = []
for spawn in spawns:
spawn_dict = dict(spawn)
pet_types = [spawn_dict.get("type1")]
if spawn_dict.get("type2"):
pet_types.append(spawn_dict["type2"])
# Check if this pet type is affected by weather
if any(pet_type in affected_types for pet_type in pet_types):
spawn_dict["spawn_rate"] *= modifier
modified_spawns.append(spawn_dict)
return modified_spawns
async def start_weather_system(self):
"""Start the background weather update task"""
if self.weather_task is None or self.weather_task.done():
self.logger.info("🌤️ Starting weather update background task...")
self.weather_task = asyncio.create_task(self._weather_update_loop())
async def stop_weather_system(self):
"""Stop the background weather update task"""
self.logger.info("🌤️ Stopping weather update background task...")
self.shutdown_event.set()
if self.weather_task and not self.weather_task.done():
self.weather_task.cancel()
try:
await self.weather_task
except asyncio.CancelledError:
pass
async def _weather_update_loop(self):
"""Background task that checks and updates expired weather"""
try:
while not self.shutdown_event.is_set():
try:
# Check every 5 minutes for expired weather
await asyncio.sleep(300) # 5 minutes
if self.shutdown_event.is_set():
break
# Check for locations with expired weather
await self._check_and_update_expired_weather()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in weather update loop: {e}")
# Continue the loop even if there's an error
await asyncio.sleep(60) # Wait a minute before retrying
except asyncio.CancelledError:
self.logger.info("Weather update task cancelled")
async def _check_and_update_expired_weather(self):
"""Check for expired weather and update it with announcements"""
try:
async with aiosqlite.connect(self.database.db_path) as db:
# Find locations with expired weather and get their current weather
cursor = await db.execute("""
SELECT l.id, l.name, lw.weather_type as current_weather
FROM locations l
LEFT JOIN location_weather lw ON l.id = lw.location_id
AND lw.active_until > datetime('now')
WHERE lw.location_id IS NULL OR lw.active_until <= datetime('now')
""")
expired_locations = await cursor.fetchall()
if expired_locations:
self.logger.info(f"🌤️ Updating weather for {len(expired_locations)} locations with expired weather")
for location in expired_locations:
location_id = location[0]
location_name = location[1]
previous_weather = location[2] if location[2] else "calm"
# Get possible weather for this location
possible_weather = self.weather_patterns.get("location_weather_chances", {}).get(location_name, ["calm"])
# Choose random weather
weather_type = random.choice(possible_weather)
weather_config = self.weather_patterns.get("weather_types", {}).get(weather_type, {
"spawn_modifier": 1.0,
"affected_types": [],
"duration_minutes": [90, 180]
})
# Calculate end time with random duration
duration_range = weather_config.get("duration_minutes", [90, 180])
duration_minutes = random.randint(duration_range[0], duration_range[1])
import datetime
end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration_minutes)
# Clear old weather and set new
await db.execute("DELETE FROM location_weather WHERE location_id = ?", (location_id,))
await db.execute("""
INSERT INTO location_weather
(location_id, weather_type, active_until, spawn_modifier, affected_types)
VALUES (?, ?, ?, ?, ?)
""", (
location_id, weather_type, end_time.isoformat(),
weather_config.get("spawn_modifier", 1.0),
",".join(weather_config.get("affected_types", []))
))
self.logger.info(f" 🌤️ {location_name}: Weather changed from {previous_weather} to {weather_type} for {duration_minutes} minutes")
# Announce weather change to IRC
await self.announce_weather_change(location_name, previous_weather, weather_type, "auto")
await db.commit()
except Exception as e:
self.logger.error(f"Error checking expired weather: {e}")
async def announce_weather_change(self, location_name: str, previous_weather: str, new_weather: str, source: str = "auto"):
"""Announce weather changes to IRC channel"""
try:
# Get weather emojis
weather_emojis = {
"sunny": "☀️",
"rainy": "🌧️",
"storm": "⛈️",
"blizzard": "❄️",
"earthquake": "🌍",
"calm": "🌤️"
}
prev_emoji = weather_emojis.get(previous_weather, "🌤️")
new_emoji = weather_emojis.get(new_weather, "🌤️")
# Create announcement message
if previous_weather == new_weather:
return # No change, no announcement
if source == "admin":
message = f"🌤️ Weather Update: {location_name} weather has been changed from {prev_emoji} {previous_weather} to {new_emoji} {new_weather} by admin command!"
elif source == "web":
message = f"🌤️ Weather Update: {location_name} weather has been changed from {prev_emoji} {previous_weather} to {new_emoji} {new_weather} via web interface!"
else:
message = f"🌤️ Weather Update: {location_name} weather has naturally changed from {prev_emoji} {previous_weather} to {new_emoji} {new_weather}!"
# Send to IRC channel via bot instance
if hasattr(self, 'bot') and self.bot:
from config import IRC_CONFIG
channel = IRC_CONFIG.get("channel", "#petz")
if hasattr(self.bot, 'send_message_sync'):
self.bot.send_message_sync(channel, message)
elif hasattr(self.bot, 'send_message'):
import asyncio
if hasattr(self.bot, 'main_loop') and self.bot.main_loop:
future = asyncio.run_coroutine_threadsafe(
self.bot.send_message(channel, message),
self.bot.main_loop
)
else:
asyncio.create_task(self.bot.send_message(channel, message))
except Exception as e:
self.logger.error(f"Error announcing weather change: {e}")
async def get_pet_emoji(self, species_name: str) -> str:
"""Get emoji for a pet species"""
try:
async with aiosqlite.connect(self.database.db_path) as db:
cursor = await db.execute(
"SELECT emoji FROM pet_species WHERE name = ?",
(species_name,)
)
row = await cursor.fetchone()
return row[0] if row and row[0] else "🐾"
except Exception:
return "🐾" # Default emoji if something goes wrong
def format_pet_name_with_emoji(self, pet_name: str, emoji: str = None, species_name: str = None) -> str:
"""Format pet name with emoji for display in IRC or web
Args:
pet_name: The pet's nickname or species name
emoji: Optional emoji to use (if None, will look up by species_name)
species_name: Species name for emoji lookup if emoji not provided
Returns:
Formatted string like "🔥 Flamey" or "🐉 Infernowyrm"
"""
if emoji:
return f"{emoji} {pet_name}"
elif species_name:
# This would need to be async, but for IRC we can use sync fallback
return f"🐾 {pet_name}" # Use default for now, can be enhanced later
else:
return f"🐾 {pet_name}"
async def start_pet_recovery_system(self):
"""Start the background pet recovery task"""
if self.pet_recovery_task is None or self.pet_recovery_task.done():
self.logger.info("🏥 Starting pet recovery background task...")
self.pet_recovery_task = asyncio.create_task(self._pet_recovery_loop())
async def stop_pet_recovery_system(self):
"""Stop the background pet recovery task"""
self.logger.info("🏥 Stopping pet recovery background task...")
if self.pet_recovery_task and not self.pet_recovery_task.done():
self.pet_recovery_task.cancel()
try:
await self.pet_recovery_task
except asyncio.CancelledError:
pass
async def _pet_recovery_loop(self):
"""Background task that checks for pets eligible for auto-recovery"""
try:
while not self.shutdown_event.is_set():
try:
# Check every 5 minutes for pets that need recovery
await asyncio.sleep(300) # 5 minutes
if self.shutdown_event.is_set():
break
# Get pets eligible for auto-recovery (fainted for 30+ minutes)
eligible_pets = await self.database.get_pets_for_auto_recovery()
if eligible_pets:
self.logger.info(f"🏥 Auto-recovering {len(eligible_pets)} pet(s) after 30 minutes...")
for pet in eligible_pets:
success = await self.database.auto_recover_pet(pet["id"])
if success:
self.logger.info(f" 🏥 Auto-recovered {pet['nickname'] or pet['species_name']} (ID: {pet['id']}) to 1 HP")
else:
self.logger.error(f" ❌ Failed to auto-recover pet ID: {pet['id']}")
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in pet recovery loop: {e}")
# Continue the loop even if there's an error
await asyncio.sleep(60) # Wait a minute before retrying
except asyncio.CancelledError:
self.logger.info("Pet recovery task cancelled")
async def shutdown(self):
"""Gracefully shutdown the game engine"""
self.logger.info("🔄 Shutting down game engine...")
await self.stop_weather_system()
await self.stop_pet_recovery_system()