Petbot/modules/admin.py
megaproxy d758d6b924 Add \!heal command and automatic pet recovery background system
- Implement \!heal command with 1-hour cooldown available to all users
- Add comprehensive cooldown tracking with database last_heal_time validation
- Heal command restores all active pets to full health
- Add background pet recovery system to game engine:
  - Automatic 30-minute recovery timer for fainted pets
  - Background task checks every 5 minutes for eligible pets
  - Auto-recovery restores pets to 1 HP after 30 minutes
  - Proper startup/shutdown integration with game engine
- Add pet_recovery_task to game engine with graceful shutdown
- Include detailed logging for recovery operations
- Ensure system resilience with error handling and task cancellation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-16 11:32:25 +00:00

510 lines
No EOL
23 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Admin commands module for PetBot"""
import sys
import os
# Add parent directory to path for config import
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from .base_module import BaseModule
from config import ADMIN_USER # Import admin user from central config
# =============================================================================
# ADMIN CONFIGURATION
# =============================================================================
# To change the admin user, edit config.py in the project root
# Current admin user: {ADMIN_USER}
# =============================================================================
class Admin(BaseModule):
"""Handles admin-only commands like reload"""
def get_commands(self):
return ["reload", "rate_stats", "rate_user", "rate_unban", "rate_reset", "weather", "setweather", "spawnevent", "startevent", "status", "uptime", "ping", "heal"]
async def handle_command(self, channel, nickname, command, args):
if command == "reload":
await self.cmd_reload(channel, nickname)
elif command == "rate_stats":
await self.cmd_rate_stats(channel, nickname)
elif command == "rate_user":
await self.cmd_rate_user(channel, nickname, args)
elif command == "rate_unban":
await self.cmd_rate_unban(channel, nickname, args)
elif command == "rate_reset":
await self.cmd_rate_reset(channel, nickname, args)
elif command == "weather":
await self.cmd_weather(channel, nickname, args)
elif command == "setweather":
await self.cmd_setweather(channel, nickname, args)
elif command == "spawnevent":
await self.cmd_spawnevent(channel, nickname, args)
elif command == "startevent":
await self.cmd_startevent(channel, nickname, args)
elif command == "status":
await self.cmd_status(channel, nickname)
elif command == "uptime":
await self.cmd_uptime(channel, nickname)
elif command == "ping":
await self.cmd_ping(channel, nickname)
elif command == "heal":
await self.cmd_heal(channel, nickname)
async def cmd_reload(self, channel, nickname):
"""Reload bot modules (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
try:
# Trigger module reload in main bot
success = await self.bot.reload_modules()
if success:
self.send_message(channel, f"{nickname}: ✅ Modules reloaded successfully!")
else:
self.send_message(channel, f"{nickname}: ❌ Module reload failed!")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Reload error: {str(e)}")
def is_admin(self, nickname):
"""Check if user is admin"""
return nickname.lower() == ADMIN_USER.lower()
async def cmd_rate_stats(self, channel, nickname):
"""Show global rate limiting statistics"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not self.bot.rate_limiter:
self.send_message(channel, f"{nickname}: Rate limiter not available.")
return
try:
stats = self.bot.rate_limiter.get_global_stats()
response = f"{nickname}: 📊 Rate Limiter Stats:\n"
response += f"• Status: {'Enabled' if stats['enabled'] else 'Disabled'}\n"
response += f"• Requests this minute: {stats['requests_this_minute']}\n"
response += f"• Active users: {stats['active_users']}\n"
response += f"• Total requests: {stats['total_requests']}\n"
response += f"• Blocked requests: {stats['blocked_requests']}\n"
response += f"• Banned users: {stats['banned_users']}\n"
response += f"• Tracked users: {stats['tracked_users']}\n"
response += f"• Total violations: {stats['total_violations']}"
self.send_message(channel, response)
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error getting rate stats: {str(e)}")
async def cmd_rate_user(self, channel, nickname, args):
"""Show rate limiting stats for a specific user"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not args:
self.send_message(channel, f"{nickname}: Usage: !rate_user <username>")
return
if not self.bot.rate_limiter:
self.send_message(channel, f"{nickname}: Rate limiter not available.")
return
try:
target_user = args[0]
stats = self.bot.rate_limiter.get_user_stats(target_user)
response = f"{nickname}: 👤 Rate Stats for {stats['user']}:\n"
response += f"• Admin exemption: {'Yes' if stats['admin_exemption'] else 'No'}\n"
response += f"• Currently banned: {'Yes' if stats['is_banned'] else 'No'}\n"
if stats['ban_expires']:
response += f"• Ban expires: {stats['ban_expires']}\n"
response += f"• Total violations: {stats['violations']}\n"
if stats['buckets']:
response += f"• Available tokens: {stats['buckets']['tokens']}\n"
response += f"• Last request: {stats['buckets']['last_request']}"
else:
response += "• No recent activity"
self.send_message(channel, response)
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error getting user stats: {str(e)}")
async def cmd_rate_unban(self, channel, nickname, args):
"""Manually unban a user from rate limiting"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not args:
self.send_message(channel, f"{nickname}: Usage: !rate_unban <username>")
return
if not self.bot.rate_limiter:
self.send_message(channel, f"{nickname}: Rate limiter not available.")
return
try:
target_user = args[0]
success = self.bot.rate_limiter.unban_user(target_user)
if success:
self.send_message(channel, f"{nickname}: ✅ User {target_user} has been unbanned.")
else:
self.send_message(channel, f"{nickname}: User {target_user} was not banned.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error unbanning user: {str(e)}")
async def cmd_rate_reset(self, channel, nickname, args):
"""Reset rate limiting violations for a user"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not args:
self.send_message(channel, f"{nickname}: Usage: !rate_reset <username>")
return
if not self.bot.rate_limiter:
self.send_message(channel, f"{nickname}: Rate limiter not available.")
return
try:
target_user = args[0]
success = self.bot.rate_limiter.reset_user_violations(target_user)
if success:
self.send_message(channel, f"{nickname}: ✅ Violations reset for user {target_user}.")
else:
self.send_message(channel, f"{nickname}: No violations found for user {target_user}.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error resetting violations: {str(e)}")
async def cmd_weather(self, channel, nickname, args):
"""Check current weather in locations (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
try:
if args and args[0].lower() != "all":
# Check weather for specific location
location_name = " ".join(args)
weather = await self.database.get_location_weather_by_name(location_name)
if weather:
self.send_message(channel,
f"🌤️ {nickname}: {location_name} - {weather['weather_type']} "
f"(modifier: {weather['spawn_modifier']}x, "
f"until: {weather['active_until'][:16]})")
else:
self.send_message(channel, f"{nickname}: Location '{location_name}' not found or no weather data.")
else:
# Show weather for all locations
all_weather = await self.database.get_all_location_weather()
if all_weather:
weather_info = []
for w in all_weather:
weather_info.append(f"{w['location_name']}: {w['weather_type']} ({w['spawn_modifier']}x)")
self.send_message(channel, f"🌤️ {nickname}: Current weather - " + " | ".join(weather_info))
else:
self.send_message(channel, f"{nickname}: No weather data available.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error checking weather: {str(e)}")
async def cmd_setweather(self, channel, nickname, args):
"""Force change weather in a location or all locations (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not args:
self.send_message(channel,
f"{nickname}: Usage: !setweather <location|all> <weather_type> [duration_minutes]\n"
f"Weather types: sunny, rainy, storm, blizzard, earthquake, calm")
return
try:
import json
import random
import datetime
# Load weather patterns
with open("config/weather_patterns.json", "r") as f:
weather_data = json.load(f)
weather_types = list(weather_data["weather_types"].keys())
# Smart argument parsing - check if any arg is a weather type
location_arg = None
weather_type = None
duration = None
for i, arg in enumerate(args):
if arg.lower() in weather_types:
weather_type = arg.lower()
# Remove weather type from args for location parsing
remaining_args = args[:i] + args[i+1:]
break
if not weather_type:
self.send_message(channel, f"{nickname}: Please specify a valid weather type.")
return
# Parse location from remaining args
if remaining_args:
if remaining_args[0].lower() == "all":
location_arg = "all"
# Check if there's a duration after "all"
if len(remaining_args) > 1:
try:
duration = int(remaining_args[1])
except ValueError:
pass
else:
# Location name (might be multiple words)
location_words = []
for arg in remaining_args:
try:
# If it's a number, it's probably duration
duration = int(arg)
break
except ValueError:
# It's part of location name
location_words.append(arg)
location_arg = " ".join(location_words) if location_words else "all"
else:
location_arg = "all"
weather_config = weather_data["weather_types"][weather_type]
# Calculate duration
if not duration:
duration_range = weather_config.get("duration_minutes", [90, 180])
duration = random.randint(duration_range[0], duration_range[1])
end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration)
if location_arg.lower() == "all":
# Set weather for all locations
success = await self.database.set_weather_all_locations(
weather_type, end_time.isoformat(),
weather_config.get("spawn_modifier", 1.0),
",".join(weather_config.get("affected_types", []))
)
if success:
self.send_message(channel,
f"🌤️ {nickname}: Set {weather_type} weather for ALL locations! "
f"Duration: {duration} minutes, Modifier: {weather_config.get('spawn_modifier', 1.0)}x")
else:
self.send_message(channel, f"{nickname}: Failed to set weather for all locations.")
else:
# Set weather for specific location
location_name = location_arg if len(args) == 2 else " ".join(args[:-1])
success = await self.database.set_weather_for_location(
location_name, weather_type, end_time.isoformat(),
weather_config.get("spawn_modifier", 1.0),
",".join(weather_config.get("affected_types", []))
)
if success:
self.send_message(channel,
f"🌤️ {nickname}: Set {weather_type} weather for {location_name}! "
f"Duration: {duration} minutes, Modifier: {weather_config.get('spawn_modifier', 1.0)}x")
else:
self.send_message(channel, f"{nickname}: Failed to set weather for '{location_name}'. Location may not exist.")
except FileNotFoundError:
self.send_message(channel, f"{nickname}: ❌ Weather configuration file not found.")
except ValueError as e:
self.send_message(channel, f"{nickname}: ❌ Invalid duration: {str(e)}")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error setting weather: {str(e)}")
async def cmd_spawnevent(self, channel, nickname, args):
"""Force spawn an NPC event (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
# Default to difficulty 1 if no args provided
difficulty = 1
if args:
try:
difficulty = int(args[0])
if difficulty not in [1, 2, 3]:
self.send_message(channel, f"{nickname}: ❌ Difficulty must be 1, 2, or 3.")
return
except ValueError:
self.send_message(channel, f"{nickname}: ❌ Invalid difficulty. Use 1, 2, or 3.")
return
try:
# Get the NPC events manager from the bot
if hasattr(self.bot, 'npc_events') and self.bot.npc_events:
event_id = await self.bot.npc_events.force_spawn_event(difficulty)
if event_id:
self.send_message(channel, f"🎯 {nickname}: Spawned new NPC event! Check `!events` to see it.")
else:
self.send_message(channel, f"{nickname}: Failed to spawn NPC event.")
else:
self.send_message(channel, f"{nickname}: NPC events system not available.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error spawning event: {str(e)}")
async def cmd_startevent(self, channel, nickname, args):
"""Start a specific event type (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
# If no args provided, show available event types
if not args:
self.send_message(channel, f"{nickname}: Available types: resource_gathering, pet_rescue, community_project, emergency_response, festival_preparation, research_expedition, crisis_response, legendary_encounter, ancient_mystery")
return
event_type = args[0].lower()
valid_types = ["resource_gathering", "pet_rescue", "community_project", "emergency_response",
"festival_preparation", "research_expedition", "crisis_response",
"legendary_encounter", "ancient_mystery"]
if event_type not in valid_types:
self.send_message(channel, f"{nickname}: ❌ Invalid type. Available: {', '.join(valid_types)}")
return
# Optional difficulty parameter
difficulty = 1
if len(args) > 1:
try:
difficulty = int(args[1])
if difficulty not in [1, 2, 3]:
self.send_message(channel, f"{nickname}: ❌ Difficulty must be 1, 2, or 3.")
return
except ValueError:
self.send_message(channel, f"{nickname}: ❌ Invalid difficulty. Use 1, 2, or 3.")
return
try:
# Get the NPC events manager from the bot
if hasattr(self.bot, 'npc_events') and self.bot.npc_events:
event_id = await self.bot.npc_events.force_spawn_specific_event(event_type, difficulty)
if event_id:
self.send_message(channel, f"🎯 {nickname}: Started {event_type} event (ID: {event_id})! Check `!events` to see it.")
else:
self.send_message(channel, f"{nickname}: Failed to start {event_type} event.")
else:
self.send_message(channel, f"{nickname}: NPC events system not available.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error starting event: {str(e)}")
async def cmd_status(self, channel, nickname):
"""Show bot connection status (available to all users)"""
try:
# Check if connection manager exists
if hasattr(self.bot, 'connection_manager') and self.bot.connection_manager:
stats = self.bot.connection_manager.get_connection_stats()
connected = stats.get('connected', False)
state = stats.get('state', 'unknown')
status_emoji = "🟢" if connected else "🔴"
self.send_message(channel, f"{status_emoji} {nickname}: Bot status - {state.upper()}")
else:
self.send_message(channel, f"🟢 {nickname}: Bot is running")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error getting status: {str(e)}")
async def cmd_uptime(self, channel, nickname):
"""Show bot uptime (available to all users)"""
try:
# Check if bot has startup time
if hasattr(self.bot, 'startup_time'):
import datetime
uptime = datetime.datetime.now() - self.bot.startup_time
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
uptime_str = f"{days}d {hours}h {minutes}m"
elif hours > 0:
uptime_str = f"{hours}h {minutes}m"
else:
uptime_str = f"{minutes}m {seconds}s"
self.send_message(channel, f"⏱️ {nickname}: Bot uptime - {uptime_str}")
else:
self.send_message(channel, f"⏱️ {nickname}: Bot is running (uptime unknown)")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error getting uptime: {str(e)}")
async def cmd_ping(self, channel, nickname):
"""Test bot responsiveness (available to all users)"""
try:
import time
start_time = time.time()
# Simple responsiveness test
response_time = (time.time() - start_time) * 1000 # Convert to milliseconds
self.send_message(channel, f"🏓 {nickname}: Pong! Response time: {response_time:.1f}ms")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error with ping: {str(e)}")
async def cmd_heal(self, channel, nickname):
"""Heal active pets (available to all users with 1-hour cooldown)"""
try:
player = await self.require_player(channel, nickname)
if not player:
return
# Check cooldown
from datetime import datetime, timedelta
last_heal = await self.database.get_last_heal_time(player["id"])
if last_heal:
time_since_heal = datetime.now() - last_heal
if time_since_heal < timedelta(hours=1):
remaining = timedelta(hours=1) - time_since_heal
minutes_remaining = int(remaining.total_seconds() / 60)
self.send_message(channel, f"{nickname}: Heal command is on cooldown! {minutes_remaining} minutes remaining.")
return
# Get active pets
active_pets = await self.database.get_active_pets(player["id"])
if not active_pets:
self.send_message(channel, f"{nickname}: You don't have any active pets to heal!")
return
# Count how many pets need healing
pets_healed = 0
for pet in active_pets:
if pet["hp"] < pet["max_hp"]:
# Heal pet to full HP
await self.database.update_pet_hp(pet["id"], pet["max_hp"])
pets_healed += 1
if pets_healed == 0:
self.send_message(channel, f"{nickname}: All your active pets are already at full health!")
return
# Update cooldown
await self.database.update_last_heal_time(player["id"])
self.send_message(channel, f"💊 {nickname}: Healed {pets_healed} pet{'s' if pets_healed != 1 else ''} to full health! Next heal available in 1 hour.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error with heal command: {str(e)}")