Petbot/modules/admin.py
megaproxy 86902c6b83 Clean up IRC command architecture and eliminate redundancy
Major cleanup of the modular command system:

**Legacy Code Removal:**
- Removed all legacy command handlers from src/bot.py (74 lines)
- Eliminated outdated command implementations superseded by modular system
- Maintained backward compatibility while cleaning up codebase

**Duplicate Command Consolidation:**
- Removed duplicate status/uptime/ping commands from admin.py
- Kept comprehensive implementations in connection_monitor.py
- Eliminated 3 redundant commands and ~70 lines of duplicate code

**Admin System Standardization:**
- Updated backup_commands.py to use central ADMIN_USER from config.py
- Updated connection_monitor.py to use central ADMIN_USER from config.py
- Removed hardcoded admin user lists across modules
- Ensured consistent admin privilege checking system-wide

**Benefits:**
- Cleaner, more maintainable command structure
- No duplicate functionality across modules
- Consistent admin configuration management
- Reduced codebase size while maintaining all functionality
- Better separation of concerns in modular architecture

This cleanup addresses technical debt identified in the command audit
and prepares the codebase for future development.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-17 17:00:06 +00:00

455 lines
No EOL
21 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Admin commands module for PetBot"""
import sys
import os
# Add parent directory to path for config import
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from .base_module import BaseModule
from config import ADMIN_USER # Import admin user from central config
# =============================================================================
# ADMIN CONFIGURATION
# =============================================================================
# To change the admin user, edit config.py in the project root
# Current admin user: {ADMIN_USER}
# =============================================================================
class Admin(BaseModule):
"""Handles admin-only commands like reload"""
def get_commands(self):
return ["reload", "rate_stats", "rate_user", "rate_unban", "rate_reset", "weather", "setweather", "spawnevent", "startevent", "heal"]
async def handle_command(self, channel, nickname, command, args):
if command == "reload":
await self.cmd_reload(channel, nickname)
elif command == "rate_stats":
await self.cmd_rate_stats(channel, nickname)
elif command == "rate_user":
await self.cmd_rate_user(channel, nickname, args)
elif command == "rate_unban":
await self.cmd_rate_unban(channel, nickname, args)
elif command == "rate_reset":
await self.cmd_rate_reset(channel, nickname, args)
elif command == "weather":
await self.cmd_weather(channel, nickname, args)
elif command == "setweather":
await self.cmd_setweather(channel, nickname, args)
elif command == "spawnevent":
await self.cmd_spawnevent(channel, nickname, args)
elif command == "startevent":
await self.cmd_startevent(channel, nickname, args)
elif command == "heal":
await self.cmd_heal(channel, nickname)
async def cmd_reload(self, channel, nickname):
"""Reload bot modules (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
try:
# Trigger module reload in main bot
success = await self.bot.reload_modules()
if success:
self.send_message(channel, f"{nickname}: ✅ Modules reloaded successfully!")
else:
self.send_message(channel, f"{nickname}: ❌ Module reload failed!")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Reload error: {str(e)}")
def is_admin(self, nickname):
"""Check if user is admin"""
return nickname.lower() == ADMIN_USER.lower()
async def cmd_rate_stats(self, channel, nickname):
"""Show global rate limiting statistics"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not self.bot.rate_limiter:
self.send_message(channel, f"{nickname}: Rate limiter not available.")
return
try:
stats = self.bot.rate_limiter.get_global_stats()
response = f"{nickname}: 📊 Rate Limiter Stats:\n"
response += f"• Status: {'Enabled' if stats['enabled'] else 'Disabled'}\n"
response += f"• Requests this minute: {stats['requests_this_minute']}\n"
response += f"• Active users: {stats['active_users']}\n"
response += f"• Total requests: {stats['total_requests']}\n"
response += f"• Blocked requests: {stats['blocked_requests']}\n"
response += f"• Banned users: {stats['banned_users']}\n"
response += f"• Tracked users: {stats['tracked_users']}\n"
response += f"• Total violations: {stats['total_violations']}"
self.send_message(channel, response)
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error getting rate stats: {str(e)}")
async def cmd_rate_user(self, channel, nickname, args):
"""Show rate limiting stats for a specific user"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not args:
self.send_message(channel, f"{nickname}: Usage: !rate_user <username>")
return
if not self.bot.rate_limiter:
self.send_message(channel, f"{nickname}: Rate limiter not available.")
return
try:
target_user = args[0]
stats = self.bot.rate_limiter.get_user_stats(target_user)
response = f"{nickname}: 👤 Rate Stats for {stats['user']}:\n"
response += f"• Admin exemption: {'Yes' if stats['admin_exemption'] else 'No'}\n"
response += f"• Currently banned: {'Yes' if stats['is_banned'] else 'No'}\n"
if stats['ban_expires']:
response += f"• Ban expires: {stats['ban_expires']}\n"
response += f"• Total violations: {stats['violations']}\n"
if stats['buckets']:
response += f"• Available tokens: {stats['buckets']['tokens']}\n"
response += f"• Last request: {stats['buckets']['last_request']}"
else:
response += "• No recent activity"
self.send_message(channel, response)
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error getting user stats: {str(e)}")
async def cmd_rate_unban(self, channel, nickname, args):
"""Manually unban a user from rate limiting"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not args:
self.send_message(channel, f"{nickname}: Usage: !rate_unban <username>")
return
if not self.bot.rate_limiter:
self.send_message(channel, f"{nickname}: Rate limiter not available.")
return
try:
target_user = args[0]
success = self.bot.rate_limiter.unban_user(target_user)
if success:
self.send_message(channel, f"{nickname}: ✅ User {target_user} has been unbanned.")
else:
self.send_message(channel, f"{nickname}: User {target_user} was not banned.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error unbanning user: {str(e)}")
async def cmd_rate_reset(self, channel, nickname, args):
"""Reset rate limiting violations for a user"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not args:
self.send_message(channel, f"{nickname}: Usage: !rate_reset <username>")
return
if not self.bot.rate_limiter:
self.send_message(channel, f"{nickname}: Rate limiter not available.")
return
try:
target_user = args[0]
success = self.bot.rate_limiter.reset_user_violations(target_user)
if success:
self.send_message(channel, f"{nickname}: ✅ Violations reset for user {target_user}.")
else:
self.send_message(channel, f"{nickname}: No violations found for user {target_user}.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error resetting violations: {str(e)}")
async def cmd_weather(self, channel, nickname, args):
"""Check current weather in locations (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
try:
if args and args[0].lower() != "all":
# Check weather for specific location
location_name = " ".join(args)
weather = await self.database.get_location_weather_by_name(location_name)
if weather:
self.send_message(channel,
f"🌤️ {nickname}: {location_name} - {weather['weather_type']} "
f"(modifier: {weather['spawn_modifier']}x, "
f"until: {weather['active_until'][:16]})")
else:
self.send_message(channel, f"{nickname}: Location '{location_name}' not found or no weather data.")
else:
# Show weather for all locations
all_weather = await self.database.get_all_location_weather()
if all_weather:
weather_info = []
for w in all_weather:
weather_info.append(f"{w['location_name']}: {w['weather_type']} ({w['spawn_modifier']}x)")
self.send_message(channel, f"🌤️ {nickname}: Current weather - " + " | ".join(weather_info))
else:
self.send_message(channel, f"{nickname}: No weather data available.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error checking weather: {str(e)}")
async def cmd_setweather(self, channel, nickname, args):
"""Force change weather in a location or all locations (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
if not args:
self.send_message(channel,
f"{nickname}: Usage: !setweather <location|all> <weather_type> [duration_minutes]\n"
f"Weather types: sunny, rainy, storm, blizzard, earthquake, calm")
return
try:
import json
import random
import datetime
# Load weather patterns
with open("config/weather_patterns.json", "r") as f:
weather_data = json.load(f)
weather_types = list(weather_data["weather_types"].keys())
# Smart argument parsing - check if any arg is a weather type
location_arg = None
weather_type = None
duration = None
for i, arg in enumerate(args):
if arg.lower() in weather_types:
weather_type = arg.lower()
# Remove weather type from args for location parsing
remaining_args = args[:i] + args[i+1:]
break
if not weather_type:
self.send_message(channel, f"{nickname}: Please specify a valid weather type.")
return
# Parse location from remaining args
if remaining_args:
if remaining_args[0].lower() == "all":
location_arg = "all"
# Check if there's a duration after "all"
if len(remaining_args) > 1:
try:
duration = int(remaining_args[1])
except ValueError:
pass
else:
# Location name (might be multiple words)
location_words = []
for arg in remaining_args:
try:
# If it's a number, it's probably duration
duration = int(arg)
break
except ValueError:
# It's part of location name
location_words.append(arg)
location_arg = " ".join(location_words) if location_words else "all"
else:
location_arg = "all"
weather_config = weather_data["weather_types"][weather_type]
# Calculate duration
if not duration:
duration_range = weather_config.get("duration_minutes", [90, 180])
duration = random.randint(duration_range[0], duration_range[1])
end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration)
if location_arg.lower() == "all":
# Set weather for all locations
success = await self.database.set_weather_all_locations(
weather_type, end_time.isoformat(),
weather_config.get("spawn_modifier", 1.0),
",".join(weather_config.get("affected_types", []))
)
if success:
self.send_message(channel,
f"🌤️ {nickname}: Set {weather_type} weather for ALL locations! "
f"Duration: {duration} minutes, Modifier: {weather_config.get('spawn_modifier', 1.0)}x")
else:
self.send_message(channel, f"{nickname}: Failed to set weather for all locations.")
else:
# Set weather for specific location
location_name = location_arg if len(args) == 2 else " ".join(args[:-1])
result = await self.database.set_weather_for_location(
location_name, weather_type, end_time.isoformat(),
weather_config.get("spawn_modifier", 1.0),
",".join(weather_config.get("affected_types", []))
)
if result.get("success"):
self.send_message(channel,
f"🌤️ {nickname}: Set {weather_type} weather for {location_name}! "
f"Duration: {duration} minutes, Modifier: {weather_config.get('spawn_modifier', 1.0)}x")
# Announce weather change if it actually changed
if result.get("changed"):
await self.game_engine.announce_weather_change(
location_name, result.get("previous_weather"), weather_type, "admin"
)
else:
self.send_message(channel, f"{nickname}: Failed to set weather for '{location_name}'. {result.get('error', 'Location may not exist.')}")
except FileNotFoundError:
self.send_message(channel, f"{nickname}: ❌ Weather configuration file not found.")
except ValueError as e:
self.send_message(channel, f"{nickname}: ❌ Invalid duration: {str(e)}")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error setting weather: {str(e)}")
async def cmd_spawnevent(self, channel, nickname, args):
"""Force spawn an NPC event (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
# Default to difficulty 1 if no args provided
difficulty = 1
if args:
try:
difficulty = int(args[0])
if difficulty not in [1, 2, 3]:
self.send_message(channel, f"{nickname}: ❌ Difficulty must be 1, 2, or 3.")
return
except ValueError:
self.send_message(channel, f"{nickname}: ❌ Invalid difficulty. Use 1, 2, or 3.")
return
try:
# Get the NPC events manager from the bot
if hasattr(self.bot, 'npc_events') and self.bot.npc_events:
event_id = await self.bot.npc_events.force_spawn_event(difficulty)
if event_id:
self.send_message(channel, f"🎯 {nickname}: Spawned new NPC event! Check `!events` to see it.")
else:
self.send_message(channel, f"{nickname}: Failed to spawn NPC event.")
else:
self.send_message(channel, f"{nickname}: NPC events system not available.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error spawning event: {str(e)}")
async def cmd_startevent(self, channel, nickname, args):
"""Start a specific event type (admin only)"""
if not self.is_admin(nickname):
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
return
# If no args provided, show available event types
if not args:
self.send_message(channel, f"{nickname}: Available types: resource_gathering, pet_rescue, community_project, emergency_response, festival_preparation, research_expedition, crisis_response, legendary_encounter, ancient_mystery")
return
event_type = args[0].lower()
valid_types = ["resource_gathering", "pet_rescue", "community_project", "emergency_response",
"festival_preparation", "research_expedition", "crisis_response",
"legendary_encounter", "ancient_mystery"]
if event_type not in valid_types:
self.send_message(channel, f"{nickname}: ❌ Invalid type. Available: {', '.join(valid_types)}")
return
# Optional difficulty parameter
difficulty = 1
if len(args) > 1:
try:
difficulty = int(args[1])
if difficulty not in [1, 2, 3]:
self.send_message(channel, f"{nickname}: ❌ Difficulty must be 1, 2, or 3.")
return
except ValueError:
self.send_message(channel, f"{nickname}: ❌ Invalid difficulty. Use 1, 2, or 3.")
return
try:
# Get the NPC events manager from the bot
if hasattr(self.bot, 'npc_events') and self.bot.npc_events:
event_id = await self.bot.npc_events.force_spawn_specific_event(event_type, difficulty)
if event_id:
self.send_message(channel, f"🎯 {nickname}: Started {event_type} event (ID: {event_id})! Check `!events` to see it.")
else:
self.send_message(channel, f"{nickname}: Failed to start {event_type} event.")
else:
self.send_message(channel, f"{nickname}: NPC events system not available.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error starting event: {str(e)}")
async def cmd_heal(self, channel, nickname):
"""Heal active pets (available to all users with 1-hour cooldown)"""
try:
player = await self.require_player(channel, nickname)
if not player:
return
# Check cooldown
from datetime import datetime, timedelta
last_heal = await self.database.get_last_heal_time(player["id"])
if last_heal:
time_since_heal = datetime.now() - last_heal
if time_since_heal < timedelta(hours=1):
remaining = timedelta(hours=1) - time_since_heal
minutes_remaining = int(remaining.total_seconds() / 60)
self.send_message(channel, f"{nickname}: Heal command is on cooldown! {minutes_remaining} minutes remaining.")
return
# Get active pets
active_pets = await self.database.get_active_pets(player["id"])
if not active_pets:
self.send_message(channel, f"{nickname}: You don't have any active pets to heal!")
return
# Count how many pets need healing
pets_healed = 0
for pet in active_pets:
if pet["hp"] < pet["max_hp"]:
# Heal pet to full HP
await self.database.update_pet_hp(pet["id"], pet["max_hp"])
pets_healed += 1
if pets_healed == 0:
self.send_message(channel, f"{nickname}: All your active pets are already at full health!")
return
# Update cooldown
await self.database.update_last_heal_time(player["id"])
self.send_message(channel, f"💊 {nickname}: Healed {pets_healed} pet{'s' if pets_healed != 1 else ''} to full health! Next heal available in 1 hour.")
except Exception as e:
self.send_message(channel, f"{nickname}: ❌ Error with heal command: {str(e)}")