#!/usr/bin/env python3
"""
PetBot Web Server
Provides web interface for bot data including help, player stats, and pet collections
"""
import os
import sys
import asyncio
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from threading import Thread
import time
import math
# Add the project directory to the path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from src.database import Database
from src.rate_limiter import RateLimiter, CommandCategory
class PetBotRequestHandler(BaseHTTPRequestHandler):
"""HTTP request handler for PetBot web server"""
# Class-level admin sessions storage
admin_sessions = {}
@property
def database(self):
"""Get database instance from server"""
return self.server.database
@property
def bot(self):
"""Get bot instance from server"""
return getattr(self.server, 'bot', None)
@property
def rate_limiter(self):
"""Get rate limiter from bot instance"""
bot = self.bot
return getattr(bot, 'rate_limiter', None) if bot else None
def get_client_ip(self):
"""Get client IP address for rate limiting"""
# Check for X-Forwarded-For header (in case of proxy)
forwarded_for = self.headers.get('X-Forwarded-For')
if forwarded_for:
return forwarded_for.split(',')[0].strip()
# Check for X-Real-IP header
real_ip = self.headers.get('X-Real-IP')
if real_ip:
return real_ip.strip()
# Fallback to client address
return self.client_address[0]
def check_rate_limit(self):
"""Check rate limit for web requests"""
if not self.rate_limiter:
return True, None
client_ip = self.get_client_ip()
# Use IP address as user identifier for web requests
user_identifier = f"web:{client_ip}"
# Run async rate limit check
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
allowed, message = loop.run_until_complete(
self.rate_limiter.check_rate_limit(user_identifier, CommandCategory.WEB)
)
return allowed, message
finally:
loop.close()
def send_rate_limit_error(self, message):
"""Send rate limit error response"""
self.send_response(429)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.send_header('Retry-After', '60')
self.end_headers()
content = f"""
"""
html = self.get_page_template("PetBot Game Hub", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html.encode())
def serve_help(self):
"""Serve the help page using unified template"""
content = """
š PetBot Commands
Complete guide to Pokemon-style pet collecting in IRC
š Getting Started
!start
Begin your pet collecting journey! Creates your trainer account and gives you your first starter pet.
Example: !start
!help
Get a link to this comprehensive command reference page.
Example: !help
!stats
View your basic trainer information including level, experience, and money.
Example: !stats
š Exploration & Travel
!explore
Search your current location for wild pets or items. You might find pets to battle/catch or discover useful items!
Example: !explore
!travel <location>
Move to a different location. Each area has unique pets and gyms. Some locations require achievements to unlock.
Example: !travel whispering woods
!where / !location
See which location you're currently in and get information about the area.
Example: !where
šŗļø Available Locations
Starter Town - Peaceful starting area (Fire/Water/Grass pets)
Whispering Woods - Ancient forest (Grass pets + new species: Vinewrap, Bloomtail)
Electric Canyon - Charged valley (Electric/Rock pets)
Thunderstorm - 2.0x Electric spawns (30-60 minutes)
Blizzard - 1.7x Ice/Water spawns (1-2 hours)
Earthquake - 1.8x Rock spawns (30-90 minutes)
Calm - Normal spawns (1.5-3 hours)
āļø Battle System
!catch / !capture
Attempt to catch a wild pet that appeared during exploration. Success depends on the pet's level and rarity.
Example: !catch
!battle
Start a turn-based battle with a wild pet. Defeat it to gain experience and money for your active pet.
Example: !battle
!attack <move>
Use a specific move during battle. Each move has different power, type, and effects.
Example: !attack flamethrower
!moves
View all available moves for your active pet, including their types and power levels.
Example: !moves
!flee
Attempt to escape from the current battle. Not always successful!
Example: !flee
šļø Gym Battles NEW!
!gym
List all gyms in your current location with your progress. Shows victories and next difficulty level.
Example: !gym
!gym list
Show all gyms across all locations with your badge collection progress.
Example: !gym list
!gym challenge "<name>"
Challenge a gym leader! You must be in the same location as the gym. Difficulty increases with each victory.
Example: !gym challenge "Forest Guardian"
!gym info "<name>"
Get detailed information about a gym including leader, theme, team, and badge details.
Example: !gym info "Storm Master"
š” Gym Strategy: Each gym specializes in a specific type. Bring pets with type advantages! The more you beat a gym, the harder it gets, but the better the rewards!
Access your team builder web interface for drag-and-drop team management with PIN verification.
Example: !team
!pets
View your complete pet collection with detailed stats and information via web interface.
Example: !pets
!activate <pet>
Add a pet to your active battle team. You can have multiple active pets for different situations.
Example: !activate flamey
!deactivate <pet>
Remove a pet from your active team and put it in storage.
Example: !deactivate aqua
!nickname <pet> <new_name>
Give a custom nickname to one of your pets. Nicknames must be 20 characters or less.
Example: !nickname flamey FireStorm
š Inventory System NEW!
!inventory / !inv / !items
View all items in your inventory organized by category. Shows quantities and item descriptions.
Example: !inventory
!use <item name>
Use a consumable item from your inventory. Items can heal pets, boost stats, or provide other benefits.
Example: !use Small Potion
šÆ Item Categories & Rarities
ā Common (15%) - Small Potions, basic healing items
ā Uncommon (8-12%) - Large Potions, battle boosters, special berries
ā Rare (3-6%) - Super Potions, speed elixirs, location treasures
ā Epic (2-3%) - Evolution stones, rare crystals, ancient artifacts
⦠Legendary (1%) - Lucky charms, ancient fossils, ultimate items
š” Item Discovery: Find items while exploring! Each location has unique treasures including rare Coin Pouches with 1-3 coins. Items stack in your inventory and can be used anytime.
š Achievements & Progress
!achievements
View your achievement progress and see which new locations you've unlocked.
Example: !achievements
šÆ Location Unlock Requirements
Pet Collector (5 pets) ā Unlocks Whispering Woods
Spark Collector (2 Electric species) ā Unlocks Electric Canyon
Rock Hound (3 Rock species) ā Unlocks Crystal Caves
"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
except FileNotFoundError:
self.serve_error_page("Help", "Help file not found")
except Exception as e:
self.serve_error_page("Help", f"Error loading help file: {str(e)}")
def serve_faq(self):
"""Serve the FAQ page using unified template"""
try:
with open('faq.html', 'r', encoding='utf-8') as f:
faq_content = f.read()
import re
# Extract CSS from faq.html
css_match = re.search(r'', faq_content, re.DOTALL)
faq_css = css_match.group(1) if css_match else ""
# Extract body content (everything between tags)
body_match = re.search(r']*>(.*?)', faq_content, re.DOTALL)
if body_match:
body_content = body_match.group(1)
# Remove the back link since we'll have the navigation bar
body_content = re.sub(r'.*?', '', body_content, flags=re.DOTALL)
else:
# Fallback: use original content if we can't parse it
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(faq_content.encode())
return
# Create template with merged CSS
html_content = f"""
PetBot - FAQ
{self.get_navigation_bar("faq")}
{body_content}
"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
except FileNotFoundError:
self.serve_error_page("FAQ", "FAQ file not found")
except Exception as e:
self.serve_error_page("FAQ", f"Error loading FAQ file: {str(e)}")
def serve_players(self):
"""Serve the players page with real data"""
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_error_page("Players", "Database not available")
return
# Fetch players data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
players_data = loop.run_until_complete(self.fetch_players_data(database))
loop.close()
self.serve_players_data(players_data)
except Exception as e:
print(f"Error fetching players data: {e}")
self.serve_error_page("Players", f"Error loading players: {str(e)}")
async def fetch_players_data(self, database):
"""Fetch all players data from database"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
# Get all players with basic stats
cursor = await db.execute("""
SELECT p.nickname, p.level, p.experience, p.money, p.created_at,
l.name as location_name,
(SELECT COUNT(*) FROM pets WHERE player_id = p.id) as pet_count,
(SELECT COUNT(*) FROM pets WHERE player_id = p.id AND is_active = 1) as active_pets,
(SELECT COUNT(*) FROM player_achievements WHERE player_id = p.id) as achievement_count
FROM players p
LEFT JOIN locations l ON p.current_location_id = l.id
ORDER BY p.level DESC, p.experience DESC
""")
rows = await cursor.fetchall()
# Convert SQLite rows to dictionaries properly
players = []
for row in rows:
player_dict = {
'nickname': row[0],
'level': row[1],
'experience': row[2],
'money': row[3],
'created_at': row[4],
'location_name': row[5],
'pet_count': row[6],
'active_pets': row[7],
'achievement_count': row[8]
}
players.append(player_dict)
return players
except Exception as e:
print(f"Database error fetching players: {e}")
return []
def serve_players_data(self, players_data):
"""Serve players page with real data"""
# Calculate statistics
total_players = len(players_data)
total_pets = sum(p['pet_count'] for p in players_data) if players_data else 0
total_achievements = sum(p['achievement_count'] for p in players_data) if players_data else 0
highest_level = max((p['level'] for p in players_data), default=0) if players_data else 0
# Build statistics cards
stats_content = f"""
š Total Players
{total_players}
š¾ Total Pets
{total_pets}
š Achievements
{total_achievements}
ā Highest Level
{highest_level}
"""
# Build players table HTML
if players_data:
players_html = ""
for i, player in enumerate(players_data, 1):
rank_emoji = {"1": "š„", "2": "š„", "3": "š„"}.get(str(i), f"{i}.")
players_html += f"""
'
# Data rows
content += ''
for i, player in enumerate(data):
row_data = row_formatter(player, i)
rank_class = f"rank-{i+1}" if i < 3 else ""
content += f'
'
for cell in row_data:
content += f'
{cell}
'
content += '
'
content += ''
content += '
'
content += '
'
return content
def serve_locations(self):
"""Serve the locations page with real data"""
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_error_page("Locations", "Database not available")
return
# Fetch locations data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
locations_data = loop.run_until_complete(self.fetch_locations_data(database))
player_locations = loop.run_until_complete(self.fetch_player_locations(database))
loop.close()
self.serve_locations_data(locations_data, player_locations)
except Exception as e:
print(f"Error fetching locations data: {e}")
self.serve_error_page("Locations", f"Error loading locations: {str(e)}")
async def fetch_locations_data(self, database):
"""Fetch all locations and their spawn data from database"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
# Get all locations
cursor = await db.execute("""
SELECT l.*,
GROUP_CONCAT(DISTINCT ps.name || ' (' || ps.type1 ||
CASE WHEN ps.type2 IS NOT NULL THEN '/' || ps.type2 ELSE '' END || ')') as spawns
FROM locations l
LEFT JOIN location_spawns ls ON l.id = ls.location_id
LEFT JOIN pet_species ps ON ls.species_id = ps.id
GROUP BY l.id
ORDER BY l.id
""")
rows = await cursor.fetchall()
# Convert SQLite rows to dictionaries properly
locations = []
for row in rows:
location_dict = {
'id': row[0],
'name': row[1],
'description': row[2],
'level_min': row[3],
'level_max': row[4],
'spawns': row[5] if len(row) > 5 else None
}
locations.append(location_dict)
return locations
except Exception as e:
print(f"Database error fetching locations: {e}")
return []
async def fetch_player_locations(self, database):
"""Fetch player locations for the interactive map"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT p.nickname, p.current_location_id, l.name as location_name
FROM players p
JOIN locations l ON p.current_location_id = l.id
ORDER BY p.nickname
""")
rows = await cursor.fetchall()
players = []
for row in rows:
player_dict = {
'nickname': row['nickname'],
'location_id': row['current_location_id'],
'location_name': row['location_name']
}
players.append(player_dict)
return players
except Exception as e:
print(f"Database error fetching player locations: {e}")
return []
def create_interactive_map(self, locations_data, player_locations):
"""Create an interactive SVG map showing player locations"""
if not locations_data:
return ""
# Define map layout - create a unique visual design
map_positions = {
1: {"x": 200, "y": 400, "shape": "circle", "color": "#4CAF50"}, # Starter Town - central
2: {"x": 100, "y": 200, "shape": "hexagon", "color": "#2E7D32"}, # Whispering Woods - forest
3: {"x": 400, "y": 150, "shape": "diamond", "color": "#FF9800"}, # Thunder Peaks - mountain
4: {"x": 550, "y": 300, "shape": "octagon", "color": "#795548"}, # Stone Caverns - cave
5: {"x": 300, "y": 500, "shape": "star", "color": "#2196F3"}, # Frozen Lake - ice
6: {"x": 500, "y": 450, "shape": "triangle", "color": "#F44336"} # Volcanic Crater - fire
}
# Create player location groups
location_players = {}
for player in player_locations or []:
loc_id = player['location_id']
if loc_id not in location_players:
location_players[loc_id] = []
location_players[loc_id].append(player['nickname'])
# SVG map content
svg_content = ""
# Add connecting paths between locations
paths = [
(1, 2), (1, 3), (1, 5), # Starter Town connections
(2, 5), (3, 4), (4, 6), (5, 6) # Other connections
]
for start, end in paths:
if start in map_positions and end in map_positions:
start_pos = map_positions[start]
end_pos = map_positions[end]
svg_content += f"""
"""
# Add location shapes
for location in locations_data:
loc_id = location['id']
if loc_id not in map_positions:
continue
pos = map_positions[loc_id]
players_here = location_players.get(loc_id, [])
player_count = len(players_here)
# Create shape based on type
shape_svg = self.create_location_shape(pos, location, player_count)
svg_content += shape_svg
# Add location label
svg_content += f"""
{location['name']}
"""
# Add player names if any
if players_here:
player_text = ", ".join(players_here)
svg_content += f"""
{player_text}
"""
return f"""
šŗļø Interactive World Map
Current player locations - shapes represent different terrain types
Towns
Forests
Mountains
Caves
Ice Areas
Volcanic
"""
def create_location_shape(self, pos, location, player_count):
"""Create SVG shape for a location based on its type"""
x, y = pos['x'], pos['y']
color = pos['color']
shape = pos['shape']
# Add glow effect if players are present
glow = 'filter="url(#glow)"' if player_count > 0 else ''
# Base size with scaling for player count
base_size = 25 + (player_count * 3)
if shape == "circle":
return f"""
"""
elif shape == "hexagon":
points = []
for i in range(6):
angle = i * 60 * math.pi / 180
px = x + base_size * math.cos(angle)
py = y + base_size * math.sin(angle)
points.append(f"{px},{py}")
return f"""
"""
elif shape == "diamond":
return f"""
"""
elif shape == "triangle":
return f"""
"""
elif shape == "star":
# Create 5-pointed star
points = []
for i in range(10):
angle = i * 36 * math.pi / 180
radius = base_size if i % 2 == 0 else base_size * 0.5
px = x + radius * math.cos(angle)
py = y + radius * math.sin(angle)
points.append(f"{px},{py}")
return f"""
"""
elif shape == "octagon":
points = []
for i in range(8):
angle = i * 45 * math.pi / 180
px = x + base_size * math.cos(angle)
py = y + base_size * math.sin(angle)
points.append(f"{px},{py}")
return f"""
"""
else:
# Default to circle
return f"""
"""
def serve_locations_data(self, locations_data, player_locations=None):
"""Serve locations page with real data using unified template"""
# Build locations HTML
locations_html = ""
if locations_data:
for location in locations_data:
spawns = location.get('spawns', 'No pets found')
if not spawns or spawns == 'None':
spawns = "No pets spawn here yet"
# Split spawns into a readable list and remove duplicates
if spawns != "No pets spawn here yet":
spawn_list = list(set([spawn.strip() for spawn in spawns.split(',') if spawn.strip()]))
spawn_list.sort() # Sort alphabetically for consistency
else:
spawn_list = []
spawn_badges = ""
visible_spawns = spawn_list[:6] # Show first 6
hidden_spawns = spawn_list[6:] # Hide the rest
# Add visible spawn badges
for spawn in visible_spawns:
spawn_badges += f'{spawn}'
# Add hidden spawn badges (initially hidden)
if hidden_spawns:
location_id = location['id']
for spawn in hidden_spawns:
spawn_badges += f'{spawn}'
# Add functional "show more" button
spawn_badges += f'+{len(hidden_spawns)} more'
if not spawn_badges:
spawn_badges = 'No pets spawn here yet'
locations_html += f"""
Explore all areas and discover what pets await you!
{map_html}
šÆ How Locations Work
Travel: Use !travel <location> to move between areas
Explore: Use !explore to find wild pets in your current location
Unlock: Some locations require achievements - catch specific pet types to unlock new areas!
Weather: Check !weather for conditions that boost certain pet spawn rates
{locations_html}
š” Use !wild <location> in #petz to see what pets spawn in a specific area
"""
# Add locations-specific CSS
additional_css = """
.map-section {
background: var(--bg-secondary);
border-radius: 15px;
padding: 30px;
margin: 30px 0;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
border: 1px solid var(--border-color);
}
.map-section h2 {
color: var(--text-accent);
text-align: center;
margin-bottom: 10px;
}
.map-container {
display: flex;
justify-content: center;
margin: 20px 0;
}
.map-container svg {
border-radius: 10px;
box-shadow: 0 4px 15px rgba(0,0,0,0.5);
max-width: 100%;
height: auto;
}
.map-legend {
display: flex;
justify-content: center;
flex-wrap: wrap;
gap: 20px;
margin-top: 20px;
}
.legend-item {
display: flex;
align-items: center;
gap: 8px;
color: var(--text-primary);
font-size: 0.9em;
}
.legend-shape {
width: 16px;
height: 16px;
border-radius: 3px;
border: 1px solid white;
}
.legend-shape.circle {
border-radius: 50%;
}
.legend-shape.hexagon {
border-radius: 3px;
transform: rotate(45deg);
}
.legend-shape.diamond {
transform: rotate(45deg);
}
.legend-shape.triangle {
clip-path: polygon(50% 0%, 0% 100%, 100% 100%);
}
.legend-shape.star {
clip-path: polygon(50% 0%, 61% 35%, 98% 35%, 68% 57%, 79% 91%, 50% 70%, 21% 91%, 32% 57%, 2% 35%, 39% 35%);
}
.legend-shape.octagon {
clip-path: polygon(30% 0%, 70% 0%, 100% 30%, 100% 70%, 70% 100%, 30% 100%, 0% 70%, 0% 30%);
}
.locations-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(400px, 1fr));
gap: 25px;
margin-top: 30px;
}
.location-card {
background: var(--bg-secondary);
border-radius: 15px;
overflow: hidden;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
border: 1px solid var(--border-color);
transition: transform 0.3s ease;
}
.location-card:hover {
transform: translateY(-5px);
}
.location-header {
background: var(--gradient-primary);
color: white;
padding: 20px;
display: flex;
justify-content: space-between;
align-items: center;
}
.location-header h3 {
margin: 0;
font-size: 1.3em;
font-weight: 700;
}
.location-id {
background: rgba(255,255,255,0.2);
padding: 5px 10px;
border-radius: 20px;
font-size: 0.8em;
}
.location-description {
padding: 20px;
color: var(--text-primary);
font-style: italic;
border-bottom: 1px solid var(--border-color);
}
.location-levels {
padding: 20px;
padding-bottom: 10px;
border-bottom: 1px solid var(--border-color);
}
.location-spawns {
padding: 20px;
}
.spawn-badge {
background: var(--bg-tertiary);
color: var(--text-accent);
padding: 4px 10px;
border-radius: 12px;
font-size: 0.85em;
margin: 3px;
display: inline-block;
border: 1px solid var(--border-color);
}
.hidden-spawn {
display: none;
}
.more-button {
background: var(--gradient-primary) !important;
color: white !important;
cursor: pointer;
transition: transform 0.2s ease;
}
.more-button:hover {
transform: scale(1.05);
}
.less-button {
background: #ff6b6b !important;
color: white !important;
cursor: pointer;
transition: transform 0.2s ease;
}
.less-button:hover {
transform: scale(1.05);
}
.info-section {
background: var(--bg-secondary);
border-radius: 15px;
padding: 25px;
margin-bottom: 30px;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
border: 1px solid var(--border-color);
}
.info-section h2 {
color: var(--text-accent);
margin-top: 0;
}
"""
# Get the unified template with additional CSS
html_content = self.get_page_template("Game Locations", content, "locations")
# Insert additional CSS before closing tag
html_content = html_content.replace("", additional_css + "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
def serve_petdex(self):
"""Serve the petdex page with all pet species data"""
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_error_page("Petdex", "Database not available")
return
# Parse URL parameters for sorting
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
sort_mode = query_params.get('sort', ['rarity'])[0] # Default to rarity
search_query = query_params.get('search', [''])[0] # Default to empty search
# Fetch petdex data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
petdex_data = loop.run_until_complete(self.fetch_petdex_data(database))
loop.close()
self.serve_petdex_data(petdex_data, sort_mode, search_query)
except Exception as e:
print(f"Error fetching petdex data: {e}")
self.serve_error_page("Petdex", f"Error loading petdex: {str(e)}")
async def fetch_petdex_data(self, database):
"""Fetch all pet species data from database"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
# Get all pet species with evolution information (no duplicates)
cursor = await db.execute("""
SELECT DISTINCT ps.id, ps.name, ps.type1, ps.type2, ps.base_hp, ps.base_attack,
ps.base_defense, ps.base_speed, ps.evolution_level, ps.evolution_species_id,
ps.rarity, ps.emoji,
evolve_to.name as evolves_to_name,
(SELECT COUNT(*) FROM location_spawns ls WHERE ls.species_id = ps.id) as location_count
FROM pet_species ps
LEFT JOIN pet_species evolve_to ON ps.evolution_species_id = evolve_to.id
ORDER BY ps.rarity ASC, ps.name ASC
""")
rows = await cursor.fetchall()
pets = []
for row in rows:
pet_dict = {
'id': row[0], 'name': row[1], 'type1': row[2], 'type2': row[3],
'base_hp': row[4], 'base_attack': row[5], 'base_defense': row[6],
'base_speed': row[7], 'evolution_level': row[8],
'evolution_species_id': row[9], 'rarity': row[10], 'emoji': row[11],
'evolves_to_name': row[12], 'location_count': row[13]
}
pets.append(pet_dict)
# Get spawn locations for each pet
for pet in pets:
cursor = await db.execute("""
SELECT l.name, ls.min_level, ls.max_level, ls.spawn_rate
FROM location_spawns ls
JOIN locations l ON ls.location_id = l.id
WHERE ls.species_id = ?
ORDER BY l.name ASC
""", (pet['id'],))
spawn_rows = await cursor.fetchall()
pet['spawn_locations'] = []
for spawn_row in spawn_rows:
spawn_dict = {
'location_name': spawn_row[0], 'min_level': spawn_row[1],
'max_level': spawn_row[2], 'spawn_rate': spawn_row[3]
}
pet['spawn_locations'].append(spawn_dict)
return pets
except Exception as e:
print(f"Database error fetching petdex: {e}")
return []
def remove_pet_duplicates(self, pets_list):
"""Remove duplicate pets based on ID and sort by name"""
seen_ids = set()
unique_pets = []
for pet in pets_list:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_pets.append(pet)
return sorted(unique_pets, key=lambda x: x['name'])
def serve_petdex_data(self, petdex_data, sort_mode='rarity', search_query=''):
"""Serve petdex page with all pet species data"""
# Remove duplicates from input data first
petdex_data = self.remove_pet_duplicates(petdex_data)
# Apply search filter if provided
if search_query:
search_query = search_query.lower()
filtered_data = []
for pet in petdex_data:
# Search in name, type1, type2
if (search_query in pet['name'].lower() or
search_query in pet['type1'].lower() or
(pet['type2'] and search_query in pet['type2'].lower())):
filtered_data.append(pet)
petdex_data = filtered_data
# Build pet cards HTML grouped by rarity
rarity_names = {1: "Common", 2: "Uncommon", 3: "Rare", 4: "Epic", 5: "Legendary"}
rarity_colors = {1: "#ffffff", 2: "#1eff00", 3: "#0070dd", 4: "#a335ee", 5: "#ff8000"}
# Calculate statistics
total_species = len(petdex_data)
type_counts = {}
for pet in petdex_data:
if pet['type1'] not in type_counts:
type_counts[pet['type1']] = 0
type_counts[pet['type1']] += 1
if pet['type2'] and pet['type2'] not in type_counts:
type_counts[pet['type2']] = 0
if pet['type2']:
type_counts[pet['type2']] += 1
# Build statistics section
stats_content = f"""
š Total Species
{total_species}
šØ Types
{len(type_counts)}
ā Rarities
{len(set(pet['rarity'] for pet in petdex_data))}
𧬠Evolutions
{len([p for p in petdex_data if p['evolution_level']])}
"""
# Sort and group pets based on sort_mode
petdex_html = ""
total_species = len(petdex_data)
if sort_mode == 'type':
# Group by type1
pets_by_type = {}
for pet in petdex_data:
pet_type = pet['type1']
if pet_type not in pets_by_type:
pets_by_type[pet_type] = []
# Check for duplicates within this type
if pet not in pets_by_type[pet_type]:
pets_by_type[pet_type].append(pet)
# Sort each type group by name and remove any remaining duplicates
for type_name in pets_by_type:
# Remove duplicates based on pet ID and sort by name
seen_ids = set()
unique_pets = []
for pet in pets_by_type[type_name]:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_pets.append(pet)
pets_by_type[type_name] = sorted(unique_pets, key=lambda x: x['name'])
type_colors = {
'Fire': '#F08030', 'Water': '#6890F0', 'Grass': '#78C850', 'Electric': '#F8D030',
'Psychic': '#F85888', 'Ice': '#98D8D8', 'Dragon': '#7038F8', 'Dark': '#705848',
'Fighting': '#C03028', 'Poison': '#A040A0', 'Ground': '#E0C068', 'Flying': '#A890F0',
'Bug': '#A8B820', 'Rock': '#B8A038', 'Ghost': '#705898', 'Steel': '#B8B8D0',
'Normal': '#A8A878', 'Fairy': '#EE99AC'
}
for type_name in sorted(pets_by_type.keys()):
pets_in_type = pets_by_type[type_name]
type_color = type_colors.get(type_name, '#A8A878')
petdex_html += f"""
{type_name} Type ({len(pets_in_type)} species)
"""
for pet in pets_in_type:
type_str = pet['type1']
if pet['type2']:
type_str += f" / {pet['type2']}"
petdex_html += f"""
"""
elif sort_mode == 'location':
# Group by spawn locations
pets_by_location = {}
pets_no_location = []
for pet in petdex_data:
if pet['spawn_locations']:
for location in pet['spawn_locations']:
loc_name = location['location_name']
if loc_name not in pets_by_location:
pets_by_location[loc_name] = []
# Check for duplicates within this location
if pet not in pets_by_location[loc_name]:
pets_by_location[loc_name].append(pet)
else:
pets_no_location.append(pet)
# Sort each location group by name and remove any remaining duplicates
for location_name in pets_by_location:
# Remove duplicates based on pet ID and sort by name
seen_ids = set()
unique_pets = []
for pet in pets_by_location[location_name]:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_pets.append(pet)
pets_by_location[location_name] = sorted(unique_pets, key=lambda x: x['name'])
location_colors = {
'Starter Town': '#4CAF50',
'Whispering Woods': '#2E7D32',
'Thunder Peaks': '#FF9800',
'Stone Caverns': '#795548',
'Frozen Lake': '#2196F3',
'Volcanic Crater': '#F44336'
}
for location_name in sorted(pets_by_location.keys()):
pets_in_location = pets_by_location[location_name]
location_color = location_colors.get(location_name, '#A8A878')
petdex_html += f"""
"""
for pet in pets_in_location:
type_str = pet['type1']
if pet['type2']:
type_str += f" / {pet['type2']}"
rarity_color = rarity_colors.get(pet['rarity'], '#ffffff')
# Get level range for this location
level_range = ""
for location in pet['spawn_locations']:
if location['location_name'] == location_name:
level_range = f"Lv.{location['min_level']}-{location['max_level']}"
break
petdex_html += f"""
"""
# Add pets with no location at the end (remove duplicates)
if pets_no_location:
seen_ids = set()
unique_no_location = []
for pet in pets_no_location:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_no_location.append(pet)
pets_no_location = sorted(unique_no_location, key=lambda x: x['name'])
if pets_no_location:
petdex_html += f"""
"""
elif sort_mode == 'all':
# Show all pets in a grid format without grouping (duplicates already removed)
sorted_pets = sorted(petdex_data, key=lambda x: (x['rarity'], x['name']))
petdex_html += f"""
š All Pet Species ({len(sorted_pets)} total)
"""
for pet in sorted_pets:
type_str = pet['type1']
if pet['type2']:
type_str += f" / {pet['type2']}"
rarity_color = rarity_colors.get(pet['rarity'], '#ffffff')
# Get all spawn locations
location_text = ""
if pet['spawn_locations']:
locations = [f"{loc['location_name']} (Lv.{loc['min_level']}-{loc['max_level']})"
for loc in pet['spawn_locations'][:2]]
if len(pet['spawn_locations']) > 2:
locations.append(f"+{len(pet['spawn_locations']) - 2} more")
location_text = f"š {', '.join(locations)}"
else:
location_text = "š Location Unknown"
petdex_html += f"""
"""
else: # Default to rarity sorting
pets_by_rarity = {}
for pet in petdex_data:
rarity = pet['rarity']
if rarity not in pets_by_rarity:
pets_by_rarity[rarity] = []
# Check for duplicates within this rarity
if pet not in pets_by_rarity[rarity]:
pets_by_rarity[rarity].append(pet)
# Sort each rarity group by name and remove any remaining duplicates
for rarity in pets_by_rarity:
# Remove duplicates based on pet ID and sort by name
seen_ids = set()
unique_pets = []
for pet in pets_by_rarity[rarity]:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_pets.append(pet)
pets_by_rarity[rarity] = sorted(unique_pets, key=lambda x: x['name'])
for rarity in sorted(pets_by_rarity.keys()):
pets_in_rarity = pets_by_rarity[rarity]
rarity_name = rarity_names.get(rarity, f"Rarity {rarity}")
rarity_color = rarity_colors.get(rarity, "#ffffff")
petdex_html += f"""
{rarity_name} ({len(pets_in_rarity)} species)
"""
for pet in pets_in_rarity:
# Build type display
type_str = pet['type1']
if pet['type2']:
type_str += f"/{pet['type2']}"
# Build evolution info
evolution_info = ""
if pet['evolution_level'] and pet['evolves_to_name']:
evolution_info = f" Evolves: Level {pet['evolution_level']} ā {pet['evolves_to_name']}"
elif pet['evolution_level']:
evolution_info = f" Evolves: Level {pet['evolution_level']}"
# Build spawn locations
spawn_info = ""
if pet['spawn_locations']:
locations = [f"{loc['location_name']} (Lv.{loc['min_level']}-{loc['max_level']})"
for loc in pet['spawn_locations'][:3]]
if len(pet['spawn_locations']) > 3:
locations.append(f"+{len(pet['spawn_locations']) - 3} more")
spawn_info = f" Found in: {', '.join(locations)}"
else:
spawn_info = " Found in: Not yet available"
# Calculate total base stats
total_stats = pet['base_hp'] + pet['base_attack'] + pet['base_defense'] + pet['base_speed']
petdex_html += f"""
š Found {len(petdex_data)} pets matching "{search_query}"
' if search_query else ''}
"""
# Determine header text based on sort mode
if sort_mode == 'type':
header_text = "š Pet Collection by Type"
description = "š· Pets are organized by their primary type. Each type has different strengths and weaknesses!"
elif sort_mode == 'name':
header_text = "š Pet Collection (A-Z)"
description = "š¤ All pets sorted alphabetically by name. Perfect for finding specific species!"
elif sort_mode == 'location':
header_text = "š Pet Collection by Location"
description = "šŗļø Pets are organized by where they can be found. Use !travel <location> to visit these areas!"
elif sort_mode == 'all':
header_text = "š Complete Pet Collection"
description = "š All pets displayed in a comprehensive grid view with locations and stats!"
else:
header_text = "š Pet Collection by Rarity"
description = "š Pets are organized by rarity. Use !wild <location> in #petz to see what spawns where!"
# Combine all content
content = f"""
š Petdex
Complete encyclopedia of all available pets
{stats_content}
{search_interface}
{header_text}
{description}
{petdex_html}
"""
html = self.get_page_template("Petdex", content, "petdex")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html.encode())
def serve_player_profile(self, nickname):
"""Serve individual player profile page"""
# URL decode the nickname in case it has special characters
from urllib.parse import unquote
nickname = unquote(nickname)
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_player_error(nickname, "Database not available")
return
# Fetch player data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
player_data = loop.run_until_complete(self.fetch_player_data(database, nickname))
loop.close()
if player_data is None:
self.serve_player_not_found(nickname)
return
self.serve_player_data(nickname, player_data)
except Exception as e:
print(f"Error fetching player data for {nickname}: {e}")
self.serve_player_error(nickname, f"Error loading player data: {str(e)}")
return
async def fetch_player_data(self, database, nickname):
"""Fetch all player data from database"""
try:
# Get player info
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
db.row_factory = aiosqlite.Row
# Get player basic info
cursor = await db.execute("""
SELECT p.*, l.name as location_name, l.description as location_desc
FROM players p
LEFT JOIN locations l ON p.current_location_id = l.id
WHERE p.nickname = ?
""", (nickname,))
player = await cursor.fetchone()
if not player:
return None
# Convert Row to dict
player_dict = dict(player)
# Get player pets
cursor = await db.execute("""
SELECT p.*, ps.name as species_name, ps.type1, ps.type2, ps.emoji
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ?
ORDER BY p.is_active DESC, p.level DESC, p.id ASC
""", (player_dict['id'],))
pets_rows = await cursor.fetchall()
pets = [dict(row) for row in pets_rows]
# Get player achievements
cursor = await db.execute("""
SELECT pa.*, a.name as achievement_name, a.description as achievement_desc
FROM player_achievements pa
JOIN achievements a ON pa.achievement_id = a.id
WHERE pa.player_id = ?
ORDER BY pa.completed_at DESC
""", (player_dict['id'],))
achievements_rows = await cursor.fetchall()
achievements = [dict(row) for row in achievements_rows]
# Get player inventory
cursor = await db.execute("""
SELECT i.name, i.description, i.category, i.rarity, pi.quantity
FROM player_inventory pi
JOIN items i ON pi.item_id = i.id
WHERE pi.player_id = ?
ORDER BY i.rarity DESC, i.name ASC
""", (player_dict['id'],))
inventory_rows = await cursor.fetchall()
inventory = [dict(row) for row in inventory_rows]
# Get player gym badges
cursor = await db.execute("""
SELECT g.name, g.badge_name, g.badge_icon, l.name as location_name,
pgb.victories, pgb.first_victory_date, pgb.highest_difficulty
FROM player_gym_battles pgb
JOIN gyms g ON pgb.gym_id = g.id
JOIN locations l ON g.location_id = l.id
WHERE pgb.player_id = ? AND pgb.victories > 0
ORDER BY pgb.first_victory_date ASC
""", (player_dict['id'],))
gym_badges_rows = await cursor.fetchall()
gym_badges = [dict(row) for row in gym_badges_rows]
# Get player encounters using database method
encounters = []
try:
# Use the existing database method which handles row factory properly
temp_encounters = await database.get_player_encounters(player_dict['id'])
for enc in temp_encounters:
encounter_dict = {
'species_name': enc['species_name'],
'type1': enc['type1'],
'type2': enc['type2'],
'rarity': enc['rarity'],
'total_encounters': enc['total_encounters'],
'caught_count': enc['caught_count'],
'first_encounter_date': enc['first_encounter_date']
}
encounters.append(encounter_dict)
except Exception as e:
print(f"Error fetching encounters: {e}")
encounters = []
# Get encounter stats
try:
encounter_stats = await database.get_encounter_stats(player_dict['id'])
except Exception as e:
print(f"Error fetching encounter stats: {e}")
encounter_stats = {
'species_encountered': 0,
'total_encounters': 0,
'total_species': 0,
'completion_percentage': 0.0
}
return {
'player': player_dict,
'pets': pets,
'achievements': achievements,
'inventory': inventory,
'gym_badges': gym_badges,
'encounters': encounters,
'encounter_stats': encounter_stats
}
except Exception as e:
print(f"Database error fetching player {nickname}: {e}")
return None
def serve_player_pets(self, nickname):
"""Serve pet management page for a player"""
try:
print(f"DEBUG: serve_player_pets called with nickname: '{nickname}'")
# Get player data using database method directly
player = asyncio.run(self.database.get_player(nickname))
print(f"DEBUG: Player result: {player}")
if not player:
print(f"DEBUG: Player not found for: '{nickname}'")
self.serve_player_not_found(nickname)
return
# Get player pets for management
player_pets = asyncio.run(self.database.get_player_pets_for_rename(player['id']))
# Render the pets management page
self.serve_pets_management_interface(nickname, player_pets)
except Exception as e:
print(f"Error serving player pets page: {e}")
self.serve_player_error(nickname, str(e))
def get_iv_grade(self, iv_value):
"""Get color grade for IV value"""
if iv_value >= 27: # 27-31 (Perfect)
return "perfect"
elif iv_value >= 21: # 21-26 (Excellent)
return "excellent"
elif iv_value >= 15: # 15-20 (Good)
return "good"
elif iv_value >= 10: # 10-14 (Fair)
return "fair"
else: # 0-9 (Poor)
return "poor"
def serve_pets_management_interface(self, nickname, pets):
"""Serve the pet management interface"""
if not pets:
self.serve_no_pets_error(nickname)
return
# Generate pet cards
pet_cards = []
for pet in pets:
status_badge = ""
if pet.get('is_active'):
team_order = pet.get('team_order', 0)
if team_order > 0:
status_badge = f'Team #{team_order}'
else:
status_badge = 'Active'
else:
status_badge = 'Storage'
fainted_badge = ""
if pet.get('fainted_at'):
fainted_badge = 'š Fainted'
current_name = pet.get('nickname') or pet.get('species_name')
pet_id = pet.get('id')
pet_card = f"""
"""
# Add no-pets-specific CSS
additional_css = """
.main-container {
text-align: center;
max-width: 800px;
margin: 0 auto;
}
.no-pets-message {
background: var(--bg-secondary);
padding: 40px;
border-radius: 15px;
border: 2px solid var(--warning-color);
margin-top: 30px;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
}
.no-pets-message h2 {
color: var(--warning-color);
margin-top: 0;
}
"""
html_content = self.get_page_template(f"Team Builder - {nickname}", content, "players")
html_content = html_content.replace("", additional_css + "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
def serve_teambuilder_interface(self, nickname, pets):
"""Serve the full interactive team builder interface"""
active_pets = [pet for pet in pets if pet['is_active']]
inactive_pets = [pet for pet in pets if not pet['is_active']]
# Get team configurations for team selection interface
import asyncio
from src.database import Database
database = Database()
# Get event loop
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Get player and team configurations
player = loop.run_until_complete(database.get_player(nickname))
team_configs = []
if player:
team_configs = loop.run_until_complete(database.get_player_team_configurations(player['id']))
# Debug logging
print(f"Team Builder Debug for {nickname}:")
print(f"Total pets: {len(pets)}")
active_names = [f"{pet['nickname'] or pet['species_name']} (ID:{pet['id']}, is_active:{pet['is_active']})" for pet in active_pets]
inactive_names = [f"{pet['nickname'] or pet['species_name']} (ID:{pet['id']}, is_active:{pet['is_active']})" for pet in inactive_pets]
print(f"Active pets: {len(active_pets)} - {active_names}")
print(f"Inactive pets: {len(inactive_pets)} - {inactive_names}")
# Generate detailed pet cards with debugging
def make_pet_card(pet, is_active):
name = pet['nickname'] or pet['species_name']
status = "Active" if is_active else "Storage"
status_class = "active" if is_active else "storage"
type_str = pet['type1']
if pet['type2']:
type_str += f"/{pet['type2']}"
# Get emoji for the pet species
emoji = pet.get('emoji', 'š¾') # Default to paw emoji if none specified
# Debug logging
print(f"Making pet card for {name} (ID: {pet['id']}): is_active={pet['is_active']}, passed_is_active={is_active}, status_class={status_class}, team_order={pet.get('team_order', 'None')}")
# Calculate HP percentage for health bar
hp_percent = (pet['hp'] / pet['max_hp']) * 100 if pet['max_hp'] > 0 else 0
hp_color = "#4CAF50" if hp_percent > 60 else "#FF9800" if hp_percent > 25 else "#f44336"
return f"""
{emoji} {name}
{status}
Level {pet['level']} {pet['species_name']}
{type_str}
HP: {pet['hp']}/{pet['max_hp']}
ATK{pet['attack']}
DEF{pet['defense']}
SPD{pet['speed']}
EXP{pet['experience']}
{'š' if pet['happiness'] > 70 else 'š' if pet['happiness'] > 40 else 'š'}Happiness: {pet['happiness']}/100
"""
# Create 6 numbered slots and place pets in their positions
team_slots = [''] * 6 # Initialize 6 empty slots
# Place active pets in their team_order positions
for pet in active_pets:
team_order = pet.get('team_order')
if team_order and 1 <= team_order <= 6:
team_slots[team_order - 1] = make_pet_card(pet, True)
storage_cards = ''.join(make_pet_card(pet, False) for pet in inactive_pets)
# Old template removed - using new unified template system below
# Generate storage pets HTML first
storage_pets_html = ""
for pet in inactive_pets:
storage_pets_html += make_pet_card(pet, False)
# Generate active pets HTML for team slots
active_pets_html = ""
for pet in active_pets:
if pet.get('team_order'):
active_pets_html += make_pet_card(pet, True)
# Create content using string concatenation instead of f-strings to avoid CSS brace issues
team_builder_content = """
š¾ Team Builder
Choose a team to edit, then drag pets between Active Team and Storage.
Changes are saved securely with PIN verification via IRC
š PIN Verification Required
A 6-digit PIN has been sent to you via IRC private message.
Enter the PIN below to confirm your team changes:
š” How to use:
⢠Drag pets to team slots
⢠Double-click to move pets
⢠Empty slots show placeholders
"""
# Generate team cards HTML
print(f"Debug: Generating team cards for {len(team_configs)} configs")
team_cards_html = ""
# If no team configs exist, create default slots with Team 1 showing current active team
if not team_configs:
print("Debug: No team configs found, creating default empty slots")
for slot in range(1, 4):
# For Team 1, show current active pets
if slot == 1:
pet_previews = ""
active_count = 0
for pos in range(1, 7):
# Find pet in position pos
pet_in_slot = next((pet for pet in active_pets if pet.get('team_order') == pos), None)
if pet_in_slot:
pet_name = pet_in_slot['nickname'] or pet_in_slot['species_name']
pet_emoji = pet_in_slot.get('emoji', 'š¾')
pet_previews += f'
{pet_emoji} {pet_name}
'
active_count += 1
else:
pet_previews += '
Empty
'
status_text = f"{active_count}/6 pets" if active_count > 0 else "Empty team"
else:
# Teams 2 and 3 are empty by default
pet_previews = '
'''
else:
for config in team_configs:
print(f"Debug: Processing config: {config}")
pet_previews = ""
# Special handling for Team 1 - show current active team instead of saved config
if config['slot'] == 1:
active_count = 0
for pos in range(1, 7):
# Find pet in position pos from current active team
pet_in_slot = next((pet for pet in active_pets if pet.get('team_order') == pos), None)
if pet_in_slot:
pet_name = pet_in_slot['nickname'] or pet_in_slot['species_name']
pet_emoji = pet_in_slot.get('emoji', 'š¾')
pet_previews += f'
{pet_emoji} {pet_name}
'
active_count += 1
else:
pet_previews += '
Empty
'
status_text = f"{active_count}/6 pets" if active_count > 0 else "Empty team"
else:
# For Teams 2 and 3, use saved configuration data
if config['team_data']:
for pos in range(1, 7):
if str(pos) in config['team_data'] and config['team_data'][str(pos)]:
pet_info = config['team_data'][str(pos)]
pet_emoji = pet_info.get('emoji', 'š¾')
pet_previews += f'
{pet_emoji} {pet_info["name"]}
'
else:
pet_previews += '
Empty
'
else:
pet_previews = '
Empty
' * 6
status_text = f"{config['pet_count']}/6 pets" if config['pet_count'] > 0 else "Empty team"
active_class = "active" if config['slot'] == 1 else "" # Default to team 1 as active
team_cards_html += f'''
{config['name']}
{status_text}
{pet_previews}
'''
# Replace placeholder with actual team cards
team_builder_content = team_builder_content.replace('', team_cards_html)
# Get the unified template
html_content = self.get_page_template(f"Team Builder - {nickname}", team_builder_content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
def serve_test_teambuilder(self, nickname):
"""Serve the test team builder interface with simplified team management"""
from urllib.parse import unquote
nickname = unquote(nickname)
print(f"DEBUG: serve_test_teambuilder called with nickname: {nickname}")
try:
from src.database import Database
database = Database()
# Get event loop
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
player_data = loop.run_until_complete(self.fetch_player_data(database, nickname))
if player_data is None:
self.serve_player_not_found(nickname)
return
pets = player_data['pets']
if not pets:
self.serve_test_teambuilder_no_pets(nickname)
return
self.serve_test_teambuilder_interface(nickname, pets)
except Exception as e:
print(f"Error loading test team builder for {nickname}: {e}")
self.serve_player_error(nickname, f"Error loading test team builder: {str(e)}")
def serve_test_teambuilder_no_pets(self, nickname):
"""Show message when player has no pets using unified template"""
content = f"""
š¾ Test Team Builder
Simplified team management (Test Version)
š¾ No Pets Found
{nickname}, you need to catch some pets before using the team builder!
Head to the IRC channel and use !explore to find wild pets!
"""
html_content = self.get_page_template(f"Test Team Builder - {nickname}", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
def serve_test_teambuilder_interface(self, nickname, pets):
"""Serve the simplified test team builder interface"""
# Get team configurations for this player
import asyncio
try:
from src.database import Database
database = Database()
# Get event loop
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print(f"Debug: Getting player data for {nickname}")
# Get player info
player = loop.run_until_complete(database.get_player(nickname))
if not player:
self.serve_player_error(nickname, "Player not found")
return
print(f"Debug: Player found: {player}")
# Get team configurations
team_configs = loop.run_until_complete(database.get_player_team_configurations(player['id']))
print(f"Debug: Team configs: {team_configs}")
# Create the simplified interface
print(f"Debug: Creating content with {len(pets)} pets")
# TEMPORARY: Use simple content to test
content = f"""
Test Team Builder - {nickname}
Found {len(pets)} pets and {len(team_configs)} team configs
First pet: {pets[0]['nickname'] if pets else 'No pets'}
"""
print("Debug: Content created successfully")
html_content = self.get_page_template(f"Test Team Builder - {nickname}", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
except Exception as e:
print(f"Error in serve_test_teambuilder_interface: {e}")
self.serve_player_error(nickname, f"Error loading interface: {str(e)}")
def create_test_teambuilder_content(self, nickname, pets, team_configs):
"""Create the simplified test team builder HTML content"""
import json
# Pre-process pets data for JavaScript
pets_data = []
for pet in pets:
pets_data.append({
'id': pet['id'],
'name': pet['nickname'],
'level': pet['level'],
'type_primary': pet['type1'],
'rarity': 1
})
pets_json = json.dumps(pets_data)
# Build team cards for each configuration
team_cards_html = ""
for config in team_configs:
pet_previews = ""
if config['team_data']:
for pos in range(1, 7):
if str(pos) in config['team_data'] and config['team_data'][str(pos)]:
pet_info = config['team_data'][str(pos)]
pet_previews += f'
Choose a team to edit, make changes, and save with PIN verification
Choose Team to Edit
Select one of your 3 teams to edit. Each team can have up to 6 pets.
{team_cards_html}
Editing Team 1
š PIN Verification Required
A 6-digit PIN has been sent to you via IRC private message.
Enter the PIN below to confirm your team changes:
'''
def handle_team_save(self, nickname):
"""Handle team save request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
# Parse JSON data
import json
try:
team_data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_save_async(nickname, team_data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_team_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_save_async(self, nickname, save_data):
"""Async handler for team save"""
try:
# Extract team data and slot from new structure
if isinstance(save_data, dict) and 'teamData' in save_data:
team_data = save_data['teamData']
team_slot = save_data.get('teamSlot', 1) # Default to slot 1
else:
# Backwards compatibility - old format
team_data = save_data
team_slot = 1
# Validate team slot
if not isinstance(team_slot, int) or team_slot < 1 or team_slot > 3:
return {"success": False, "error": "Invalid team slot. Must be 1, 2, or 3"}
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate team composition
validation = await self.database.validate_team_composition(player["id"], team_data)
if not validation["valid"]:
return {"success": False, "error": validation["error"]}
# Create pending team change with PIN (include team slot info)
import json
change_data = {
'teamData': team_data,
'teamSlot': team_slot
}
result = await self.database.create_pending_team_change(
player["id"],
json.dumps(change_data)
)
if result["success"]:
# Send PIN via IRC
self.send_pin_via_irc(nickname, result["pin_code"])
return {
"success": True,
"message": "PIN sent to your IRC private messages",
"expires_in_minutes": 10
}
else:
return result
except Exception as e:
print(f"Error in _handle_team_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_verify(self, nickname):
"""Handle PIN verification and apply team changes"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No PIN provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
# Parse JSON data
import json
try:
data = json.loads(post_data)
pin_code = data.get("pin", "").strip()
except (json.JSONDecodeError, AttributeError):
self.send_json_response({"success": False, "error": "Invalid data format"}, 400)
return
if not pin_code:
self.send_json_response({"success": False, "error": "PIN code is required"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_verify_async(nickname, pin_code))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_team_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_verify_async(self, nickname, pin_code):
"""Async handler for PIN verification"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Apply team changes with PIN verification
result = await self.database.apply_team_change(player["id"], pin_code)
if result["success"]:
return {
"success": True,
"message": f"Team changes applied successfully! {result['changes_applied']} pets updated.",
"changes_applied": result["changes_applied"]
}
else:
return result
except Exception as e:
print(f"Error in _handle_team_verify_async: {e}")
return {"success": False, "error": str(e)}
def send_pin_via_irc(self, nickname, pin_code):
"""Send PIN to player via IRC private message"""
print(f"š PIN for {nickname}: {pin_code}")
# Try to send via IRC bot if available
if self.bot and hasattr(self.bot, 'send_message_sync'):
try:
# Send PIN via private message using sync wrapper
self.bot.send_message_sync(nickname, f"š Team Builder PIN: {pin_code}")
self.bot.send_message_sync(nickname, f"š” Enter this PIN on the web page to confirm your team changes. PIN expires in 10 minutes.")
print(f"ā PIN sent to {nickname} via IRC")
except Exception as e:
print(f"ā Failed to send PIN via IRC: {e}")
else:
print(f"ā No IRC bot available to send PIN to {nickname}")
print(f"š” Manual PIN for {nickname}: {pin_code}")
def handle_team_config_save(self, nickname, slot):
"""Handle saving team configuration to a slot"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
# Parse JSON data
import json
try:
data = json.loads(post_data)
config_name = data.get("name", f"Team Config {slot}")
team_data = data.get("team", [])
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_config_save_async(nickname, slot_num, config_name, team_data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_team_config_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_config_save_async(self, nickname, slot_num, config_name, team_data):
"""Async handler for team configuration save"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Save configuration
import json
success = await self.database.save_team_configuration(
player["id"], slot_num, config_name, json.dumps(team_data)
)
if success:
return {"success": True, "message": f"Team configuration '{config_name}' saved to slot {slot_num}"}
else:
return {"success": False, "error": "Failed to save team configuration"}
except Exception as e:
print(f"Error in _handle_team_config_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_config_load(self, nickname, slot):
"""Handle loading team configuration from a slot"""
try:
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_config_load_async(nickname, slot_num))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 404 if "not found" in result.get("error", "") else 400)
except Exception as e:
print(f"Error in handle_team_config_load: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_config_load_async(self, nickname, slot_num):
"""Async handler for team configuration load"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Load configuration
config = await self.database.load_team_configuration(player["id"], slot_num)
if config:
import json
team_data = json.loads(config["team_data"])
return {
"success": True,
"config_name": config["config_name"],
"team_data": team_data,
"updated_at": config["updated_at"]
}
else:
return {"success": False, "error": f"No team configuration found in slot {slot_num}"}
except Exception as e:
print(f"Error in _handle_team_config_load_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_config_rename(self, nickname, slot):
"""Handle renaming team configuration in a slot"""
try:
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Get the new name from request body
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
import json
try:
data = json.loads(post_data.decode('utf-8'))
new_name = data.get('new_name', '').strip()
if not new_name:
self.send_json_response({"success": False, "error": "Configuration name cannot be empty"}, 400)
return
if len(new_name) > 50:
self.send_json_response({"success": False, "error": "Configuration name too long (max 50 characters)"}, 400)
return
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_config_rename_async(nickname, slot_num, new_name))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 404 if "not found" in result.get("error", "") else 400)
except Exception as e:
print(f"Error in handle_team_config_rename: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_config_rename_async(self, nickname, slot_num, new_name):
"""Async handler for team configuration rename"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Check if configuration exists in the slot
existing_config = await self.database.load_team_configuration(player["id"], slot_num)
if not existing_config:
return {"success": False, "error": f"No team configuration found in slot {slot_num}"}
# Rename the configuration
success = await self.database.rename_team_configuration(player["id"], slot_num, new_name)
if success:
return {
"success": True,
"message": f"Configuration renamed to '{new_name}'",
"new_name": new_name
}
else:
return {"success": False, "error": "Failed to rename configuration"}
except Exception as e:
print(f"Error in _handle_team_config_rename_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_config_apply(self, nickname, slot):
"""Handle applying team configuration to active team"""
try:
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_config_apply_async(nickname, slot_num))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 404 if "not found" in result.get("error", "") else 400)
except Exception as e:
print(f"Error in handle_team_config_apply: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_config_apply_async(self, nickname, slot_num):
"""Async handler for team configuration apply"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Apply the team configuration
result = await self.database.apply_team_configuration(player["id"], slot_num)
return result
except Exception as e:
print(f"Error in _handle_team_config_apply_async: {e}")
return {"success": False, "error": str(e)}
def handle_test_team_save(self, nickname):
"""Handle test team builder save request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
team_slot = data.get('team_slot')
team_data = data.get('team_data', {})
# Validate team slot
if not isinstance(team_slot, int) or team_slot < 1 or team_slot > 3:
self.send_json_response({"success": False, "error": "Invalid team slot. Must be 1, 2, or 3"}, 400)
return
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_test_team_save_async(nickname, team_slot, team_data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_test_team_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_test_team_save_async(self, nickname, team_slot, team_data):
"""Async handler for test team save"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Generate PIN and store pending change
import json
team_json = json.dumps(team_data)
config_name = f"Team {team_slot}"
result = await self.database.save_team_configuration(
player["id"], team_slot, config_name, team_json
)
if result:
# Generate PIN for verification (using existing PIN system)
pin_result = await self.database.create_team_change_pin(
player["id"], team_json
)
if pin_result["success"]:
# Send PIN to IRC
if hasattr(self.server, 'bot') and self.server.bot:
self.server.bot.send_private_message(
nickname,
f"š Team {team_slot} Save PIN: {pin_result['pin_code']} (expires in 10 minutes)"
)
return {"success": True, "message": "PIN sent to IRC"}
else:
return {"success": False, "error": "Failed to generate PIN"}
else:
return {"success": False, "error": "Failed to save team configuration"}
except Exception as e:
print(f"Error in _handle_test_team_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_test_team_verify(self, nickname):
"""Handle test team builder PIN verification"""
try:
# Get PIN from request body
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
import json
try:
data = json.loads(post_data.decode('utf-8'))
pin_code = data.get('pin', '').strip()
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid request data"}, 400)
return
if not pin_code or len(pin_code) != 6:
self.send_json_response({"success": False, "error": "PIN must be 6 digits"}, 400)
return
# Run async verification
import asyncio
result = asyncio.run(self._handle_test_team_verify_async(nickname, pin_code))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_test_team_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_test_team_verify_async(self, nickname, pin_code):
"""Async handler for test team PIN verification"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Verify PIN
result = await self.database.verify_team_change_pin(player["id"], pin_code)
if result["success"]:
return {"success": True, "message": "Team configuration saved successfully!"}
else:
return {"success": False, "error": result.get("error", "Invalid PIN")}
except Exception as e:
print(f"Error in _handle_test_team_verify_async: {e}")
return {"success": False, "error": str(e)}
def handle_individual_team_save(self, nickname, team_slot):
"""Handle individual team save request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
team_identifier = data.get('team_identifier', team_slot)
is_active_team = data.get('is_active_team', False)
pets = data.get('pets', [])
# Validate team slot
if team_slot not in ['1', '2', '3', 'active']:
self.send_json_response({"success": False, "error": "Invalid team slot"}, 400)
return
# Convert team_slot to numeric for database operations
if team_slot == 'active':
team_slot_num = 1
is_active_team = True
else:
team_slot_num = int(team_slot)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_individual_team_save_async(nickname, team_slot_num, pets, is_active_team))
if result["success"]:
self.send_json_response({"requires_pin": True, "message": "PIN sent to IRC"}, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_individual_team_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_individual_team_save_async(self, nickname, team_slot, pets, is_active_team):
"""Async handler for individual team save"""
try:
# Get player
player = await self.server.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate pets exist and belong to player
if pets:
player_pets = await self.server.database.get_player_pets(player['id'])
player_pet_ids = [pet['id'] for pet in player_pets]
for pet_data in pets:
pet_id = pet_data.get('pet_id')
if not pet_id:
continue
# Convert pet_id to int for comparison with database IDs
try:
pet_id_int = int(pet_id)
except (ValueError, TypeError):
return {"success": False, "error": f"Invalid pet ID: {pet_id}"}
# Check if pet belongs to player
if pet_id_int not in player_pet_ids:
return {"success": False, "error": f"Pet {pet_id} not found or doesn't belong to you"}
# Convert pets array to the expected format for database
# Expected format: {"pet_id": position, "pet_id": position, ...}
team_changes = {}
if pets: # Ensure pets is not None or empty
for pet_data in pets:
if isinstance(pet_data, dict): # Ensure pet_data is a dictionary
pet_id = str(pet_data.get('pet_id')) # Ensure pet_id is string
position = pet_data.get('position', False) # Position or False for inactive
if pet_id and pet_id != 'None': # Only add valid pet IDs
team_changes[pet_id] = position
# Generate PIN and store pending change
import json
team_data = {
'teamSlot': int(team_slot), # Convert to int and use expected key name
'teamData': team_changes, # Use the dictionary format expected by database
'is_active_team': is_active_team
}
# Generate PIN
pin_result = await self.server.database.generate_verification_pin(player["id"], "team_change", json.dumps(team_data))
pin_code = pin_result.get("pin_code")
# Send PIN via IRC
self.send_pin_via_irc(nickname, pin_code)
return {"success": True, "requires_pin": True}
except Exception as e:
print(f"Error in _handle_individual_team_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_individual_team_verify(self, nickname, team_slot):
"""Handle individual team PIN verification"""
try:
# Get PIN from request body
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
import json
try:
data = json.loads(post_data.decode('utf-8'))
pin_code = data.get('pin', '').strip()
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid request data"}, 400)
return
if not pin_code or len(pin_code) != 6:
self.send_json_response({"success": False, "error": "PIN must be 6 digits"}, 400)
return
# Run async verification
import asyncio
result = asyncio.run(self._handle_individual_team_verify_async(nickname, pin_code))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_individual_team_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_individual_team_verify_async(self, nickname, pin_code):
"""Async handler for individual team PIN verification"""
try:
# Get player
player = await self.server.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Verify PIN and apply changes using simplified method
result = await self.server.database.apply_individual_team_change(player["id"], pin_code)
if result["success"]:
return {"success": True, "message": "Team saved successfully!"}
else:
return {"success": False, "error": result.get("error", "Invalid PIN")}
except Exception as e:
print(f"Error in _handle_individual_team_verify_async: {e}")
return {"success": False, "error": str(e)}
def handle_pet_rename_request(self, nickname):
"""Handle pet rename request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
try:
result = asyncio.run(self._handle_pet_rename_request_async(nickname, data))
except Exception as async_error:
print(f"Async error in pet rename: {async_error}")
self.send_json_response({"success": False, "error": f"Async error: {str(async_error)}"}, 500)
return
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_pet_rename_request: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_pet_rename_request_async(self, nickname, data):
"""Async handler for pet rename request"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate required fields
if "pet_id" not in data or "new_nickname" not in data:
return {"success": False, "error": "Missing pet_id or new_nickname"}
pet_id = data["pet_id"]
new_nickname = data["new_nickname"]
# Request pet rename with PIN
result = await self.database.request_pet_rename(player["id"], pet_id, new_nickname)
if result["success"]:
# Send PIN via IRC
self.send_pet_rename_pin_via_irc(nickname, result["pin"])
return {
"success": True,
"message": f"PIN sent to {nickname} via IRC. Check your messages!"
}
else:
return result
except Exception as e:
print(f"Error in _handle_pet_rename_request_async: {e}")
return {"success": False, "error": str(e)}
def handle_pet_rename_verify(self, nickname):
"""Handle PIN verification for pet rename"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_pet_rename_verify_async(nickname, data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_pet_rename_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_pet_rename_verify_async(self, nickname, data):
"""Async handler for pet rename PIN verification"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate required field
if "pin" not in data:
return {"success": False, "error": "Missing PIN"}
pin_code = data["pin"]
# Verify PIN and apply pet rename
result = await self.database.verify_pet_rename(player["id"], pin_code)
if result["success"]:
return {
"success": True,
"message": f"Pet renamed to '{result['new_nickname']}' successfully!",
"new_nickname": result["new_nickname"]
}
else:
return result
except Exception as e:
print(f"Error in _handle_pet_rename_verify_async: {e}")
return {"success": False, "error": str(e)}
def send_pet_rename_pin_via_irc(self, nickname, pin_code):
"""Send pet rename PIN to player via IRC private message"""
print(f"š Pet rename PIN for {nickname}: {pin_code}")
# Try to send via IRC bot if available
if self.bot and hasattr(self.bot, 'send_message_sync'):
try:
# Send PIN via private message using sync wrapper
self.bot.send_message_sync(nickname, f"š Pet Rename PIN: {pin_code}")
self.bot.send_message_sync(nickname, f"š” Enter this PIN on the web page to confirm your pet rename. PIN expires in 15 seconds.")
print(f"ā Pet rename PIN sent to {nickname} via IRC")
except Exception as e:
print(f"ā Failed to send pet rename PIN via IRC: {e}")
else:
print(f"ā No IRC bot available to send pet rename PIN to {nickname}")
print(f"š” Manual pet rename PIN for {nickname}: {pin_code}")
def serve_admin_login(self):
"""Serve the admin login page"""
import sys
sys.path.append('.')
from config import ADMIN_USER
content = """
š Admin Control Panel
Authorized access only
Authentication Required
This area is restricted to bot administrators.
A PIN will be sent to your IRC private messages
"""
html = self.get_page_template("Admin Login", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html.encode())
def handle_admin_auth(self):
"""Handle admin authentication request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
nickname = data.get('nickname', '').strip()
# Verify admin user
import sys
sys.path.append('.')
from config import ADMIN_USER
if nickname.lower() != ADMIN_USER.lower():
self.send_json_response({"success": False, "error": "Access denied"}, 403)
return
# Generate PIN
import random
pin = ''.join([str(random.randint(0, 9)) for _ in range(6)])
# Store PIN with expiration (15 minutes)
import time
expiry = time.time() + (15 * 60)
# Store in database for verification
import asyncio
result = asyncio.run(self._store_admin_pin_async(nickname, pin, expiry))
if result:
# Send PIN via IRC
self.send_admin_pin_via_irc(nickname, pin)
self.send_json_response({"success": True, "message": "PIN sent via IRC"})
else:
self.send_json_response({"success": False, "error": "Failed to generate PIN"}, 500)
except Exception as e:
print(f"Error in handle_admin_auth: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _store_admin_pin_async(self, nickname, pin, expiry):
"""Store admin PIN in database"""
try:
import aiosqlite
# Create temporary admin_pins table if it doesn't exist
async with aiosqlite.connect(self.database.db_path) as db:
# Create table if it doesn't exist
await db.execute("""
CREATE TABLE IF NOT EXISTS admin_pins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nickname TEXT NOT NULL,
pin_code TEXT NOT NULL,
expires_at REAL NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert admin PIN
await db.execute("""
INSERT INTO admin_pins (nickname, pin_code, expires_at)
VALUES (?, ?, ?)
""", (nickname, pin, expiry))
await db.commit()
return True
except Exception as e:
print(f"Error storing admin PIN: {e}")
return False
def handle_admin_verify(self):
"""Handle admin PIN verification"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
nickname = data.get('nickname', '').strip()
pin = data.get('pin', '').strip()
# Verify PIN
import asyncio
result = asyncio.run(self._verify_admin_pin_async(nickname, pin))
if result:
# Create session token
import hashlib
import time
session_token = hashlib.sha256(f"{nickname}:{pin}:{time.time()}".encode()).hexdigest()
# Store session
self.admin_sessions[session_token] = {
'nickname': nickname,
'expires': time.time() + (60 * 60) # 1 hour session
}
# Set cookie
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Set-Cookie', f'admin_session={session_token}; Path=/admin; HttpOnly')
self.end_headers()
self.wfile.write(json.dumps({"success": True}).encode())
else:
self.send_json_response({"success": False, "error": "Invalid or expired PIN"}, 401)
except Exception as e:
print(f"Error in handle_admin_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _verify_admin_pin_async(self, nickname, pin):
"""Verify admin PIN from database"""
try:
import aiosqlite
import time
current_time = time.time()
# Check for valid PIN
async with aiosqlite.connect(self.database.db_path) as db:
cursor = await db.execute("""
SELECT pin_code FROM admin_pins
WHERE nickname = ? AND pin_code = ? AND expires_at > ?
""", (nickname, pin, current_time))
result = await cursor.fetchone()
if result:
# Delete used PIN
await db.execute("""
DELETE FROM admin_pins
WHERE nickname = ? AND pin_code = ?
""", (nickname, pin))
await db.commit()
return True
return False
except Exception as e:
print(f"Error verifying admin PIN: {e}")
return False
def send_admin_pin_via_irc(self, nickname, pin_code):
"""Send admin PIN to user via IRC private message"""
print(f"š Admin PIN for {nickname}: {pin_code}")
# Try to send via IRC bot if available
if self.bot and hasattr(self.bot, 'send_message_sync'):
try:
# Send PIN via private message
self.bot.send_message_sync(nickname, f"š Admin Panel PIN: {pin_code}")
self.bot.send_message_sync(nickname, f"ā ļø This PIN expires in 15 minutes. Do not share it with anyone!")
self.bot.send_message_sync(nickname, f"š” Enter this PIN at the admin login page to access the control panel.")
print(f"ā Admin PIN sent to {nickname} via IRC")
except Exception as e:
print(f"ā Failed to send admin PIN via IRC: {e}")
else:
print(f"ā No IRC bot available to send admin PIN to {nickname}")
print(f"š” Manual admin PIN for {nickname}: {pin_code}")
def check_admin_session(self):
"""Check if user has valid admin session"""
# Get cookie
cookie_header = self.headers.get('Cookie', '')
session_token = None
for cookie in cookie_header.split(';'):
if cookie.strip().startswith('admin_session='):
session_token = cookie.strip()[14:]
break
if not session_token:
return None
# Check if session is valid
import time
session = self.admin_sessions.get(session_token)
if session and session['expires'] > time.time():
# Extend session
session['expires'] = time.time() + (60 * 60)
return session['nickname']
# Invalid or expired session
if session_token in self.admin_sessions:
del self.admin_sessions[session_token]
return None
def serve_admin_dashboard(self):
"""Serve the admin dashboard page"""
# Check admin session
admin_user = self.check_admin_session()
if not admin_user:
# Redirect to login
self.send_response(302)
self.send_header('Location', '/admin')
self.end_headers()
return
# Get system statistics
import asyncio
stats = asyncio.run(self._get_admin_stats_async())
content = f"""
š® Admin Control Panel
Welcome, {admin_user}!
š System Statistics
š„ Total Players
{stats['total_players']}
š¾ Total Pets
{stats['total_pets']}
āļø Active Battles
{stats['active_battles']}
š¾ Database Size
{stats['db_size']}
š„ Player Management
Search Player
š§ System Controls
Database Management
IRC Management
š Game Management
Weather Control
Rate Limiting
"""
html = self.get_page_template("Admin Dashboard", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html.encode())
def _get_location_options(self, locations):
"""Generate HTML options for locations"""
options = ""
for location in locations:
options += f''
return options
async def _get_admin_stats_async(self):
"""Get admin dashboard statistics"""
import aiosqlite
import os
stats = {
'total_players': 0,
'total_pets': 0,
'active_battles': 0,
'db_size': '0 MB',
'total_achievements': 0,
'total_badges': 0,
'total_items': 0,
'locations': []
}
try:
async with aiosqlite.connect(self.database.db_path) as db:
# Get player count
cursor = await db.execute("SELECT COUNT(*) FROM players")
stats['total_players'] = (await cursor.fetchone())[0]
# Get pet count
cursor = await db.execute("SELECT COUNT(*) FROM pets")
stats['total_pets'] = (await cursor.fetchone())[0]
# Get active battles (if table exists)
try:
cursor = await db.execute("SELECT COUNT(*) FROM active_battles")
stats['active_battles'] = (await cursor.fetchone())[0]
except:
stats['active_battles'] = 0
# Get achievement count
try:
cursor = await db.execute("SELECT COUNT(*) FROM player_achievements")
stats['total_achievements'] = (await cursor.fetchone())[0]
except:
stats['total_achievements'] = 0
# Get badge count (if table exists)
try:
cursor = await db.execute("SELECT COUNT(*) FROM player_badges")
stats['total_badges'] = (await cursor.fetchone())[0]
except:
stats['total_badges'] = 0
# Get item count (if table exists)
try:
cursor = await db.execute("SELECT COUNT(*) FROM player_items")
stats['total_items'] = (await cursor.fetchone())[0]
except:
stats['total_items'] = 0
# Get locations
cursor = await db.execute("SELECT id, name FROM locations ORDER BY name")
locations = await cursor.fetchall()
stats['locations'] = [{'id': loc[0], 'name': loc[1]} for loc in locations]
# Get database size
if os.path.exists(self.database.db_path):
size_bytes = os.path.getsize(self.database.db_path)
stats['db_size'] = f"{size_bytes / 1024 / 1024:.2f} MB"
except Exception as e:
print(f"Error getting admin stats: {e}")
return stats
def handle_admin_api(self, endpoint):
"""Handle admin API requests"""
print(f"Admin API request: {endpoint}")
# Check admin session
admin_user = self.check_admin_session()
if not admin_user:
print(f"Unauthorized admin API request for endpoint: {endpoint}")
self.send_json_response({"success": False, "error": "Unauthorized"}, 401)
return
print(f"Authorized admin API request from {admin_user} for endpoint: {endpoint}")
# Get POST data if it's a POST request
content_length = int(self.headers.get('Content-Length', 0))
post_data = {}
if content_length > 0 and self.command == 'POST':
try:
import json
post_data = json.loads(self.rfile.read(content_length).decode('utf-8'))
except:
pass
# Parse query parameters for GET requests
query_params = {}
if '?' in endpoint:
endpoint, query_string = endpoint.split('?', 1)
for param in query_string.split('&'):
if '=' in param:
key, value = param.split('=', 1)
query_params[key] = value
# Route to appropriate handler
if endpoint.startswith('player/'):
# Handle player endpoints - support both info and updates
player_path = endpoint[7:] # Remove 'player/' prefix
if player_path.endswith('/update'):
# Player update endpoint
player_name = player_path[:-7] # Remove '/update' suffix
if self.command == 'POST':
self.handle_admin_player_update(player_name, post_data)
else:
self.send_json_response({"success": False, "error": "Method not allowed"}, 405)
else:
# Player info endpoint
self.handle_admin_player_get(player_path)
elif endpoint == 'backup':
if self.command == 'POST':
self.handle_admin_backup_create()
else:
self.send_json_response({"success": False, "error": "Method not allowed"}, 405)
elif endpoint == 'backups':
self.handle_admin_backups_list()
elif endpoint == 'broadcast':
self.handle_admin_broadcast(post_data)
elif endpoint == 'irc-status':
print(f"IRC status endpoint hit!")
# Add a simple test first
try:
print(f"Calling handle_admin_irc_status...")
self.handle_admin_irc_status()
print(f"handle_admin_irc_status completed")
except Exception as e:
print(f"Exception in IRC status handler: {e}")
import traceback
traceback.print_exc()
# Send a simple fallback response
self.send_json_response({
"success": False,
"error": "IRC status handler failed",
"details": str(e)
}, 500)
elif endpoint == 'weather':
self.handle_admin_weather_set(post_data)
elif endpoint == 'rate-stats':
self.handle_admin_rate_stats(query_params)
elif endpoint == 'rate-reset':
self.handle_admin_rate_reset(post_data)
elif endpoint == 'test':
# Simple test endpoint
import datetime
self.send_json_response({
"success": True,
"message": "Test endpoint working",
"timestamp": str(datetime.datetime.now())
})
else:
self.send_json_response({"success": False, "error": "Unknown endpoint"}, 404)
def handle_admin_player_search(self, data):
"""Search for a player"""
nickname = data.get('nickname', '').strip()
if not nickname:
self.send_json_response({"success": False, "error": "No nickname provided"})
return
import asyncio
player = asyncio.run(self.database.get_player(nickname))
if player:
# Get additional stats
pet_count = asyncio.run(self._get_player_pet_count_async(player['id']))
self.send_json_response({
"success": True,
"player": {
"nickname": player['nickname'],
"level": player['level'],
"money": player['money'],
"pet_count": pet_count,
"experience": player['experience']
}
})
else:
self.send_json_response({"success": False, "error": "Player not found"})
async def _get_player_pet_count_async(self, player_id):
"""Get player's pet count"""
import aiosqlite
async with aiosqlite.connect(self.database.db_path) as db:
cursor = await db.execute("SELECT COUNT(*) FROM pets WHERE player_id = ?", (player_id,))
return (await cursor.fetchone())[0]
def handle_admin_backup_create(self):
"""Create a database backup"""
if self.bot and hasattr(self.bot, 'backup_manager'):
import asyncio
result = asyncio.run(self.bot.backup_manager.create_backup("manual", "Admin web interface"))
if result['success']:
self.send_json_response({
"success": True,
"message": f"Backup created: {result['filename']}"
})
else:
self.send_json_response({
"success": False,
"error": result.get('error', 'Backup failed')
})
else:
self.send_json_response({
"success": False,
"error": "Backup system not available"
})
def handle_admin_weather_set(self, data):
"""Set weather for a location"""
location = data.get('location', '').strip()
weather = data.get('weather', '').strip()
if not location or not weather:
self.send_json_response({"success": False, "error": "Missing location or weather"})
return
# Execute weather change using database directly
try:
import asyncio
result = asyncio.run(self._set_weather_for_location_async(location, weather))
if result.get("success"):
self.send_json_response({
"success": True,
"message": result.get("message", f"Weather set to {weather} in {location}")
})
else:
self.send_json_response({
"success": False,
"error": result.get("error", "Failed to set weather")
})
except Exception as e:
print(f"Error setting weather: {e}")
self.send_json_response({
"success": False,
"error": f"Weather system error: {str(e)}"
})
async def _set_weather_for_location_async(self, location, weather):
"""Async helper to set weather for location"""
try:
import json
import datetime
import random
# Load weather patterns
try:
with open("config/weather_patterns.json", "r") as f:
weather_data = json.load(f)
except FileNotFoundError:
return {
"success": False,
"error": "Weather configuration file not found"
}
# Validate weather type
weather_types = list(weather_data["weather_types"].keys())
if weather not in weather_types:
return {
"success": False,
"error": f"Invalid weather type. Valid types: {', '.join(weather_types)}"
}
weather_config = weather_data["weather_types"][weather]
# Calculate duration (3 hours = 180 minutes)
duration_range = weather_config.get("duration_minutes", [90, 180])
duration = random.randint(duration_range[0], duration_range[1])
end_time = datetime.datetime.now() + datetime.timedelta(minutes=duration)
# Set weather for the location
result = await self.database.set_weather_for_location(
location, weather, end_time.isoformat(),
weather_config.get("spawn_modifier", 1.0),
",".join(weather_config.get("affected_types", []))
)
if result.get("success"):
# Announce weather change if it actually changed and bot is available
if result.get("changed") and self.bot and hasattr(self.bot, 'game_engine'):
await self.bot.game_engine.announce_weather_change(
location, result.get("previous_weather"), weather, "web"
)
return {
"success": True,
"message": f"Weather set to {weather} in {location} for {duration} minutes"
}
else:
return {
"success": False,
"error": result.get("error", "Failed to set weather")
}
except Exception as e:
print(f"Error in _set_weather_for_location_async: {e}")
return {
"success": False,
"error": str(e)
}
def handle_admin_announce(self, data):
"""Send announcement to IRC"""
message = data.get('message', '').strip()
if not message:
self.send_json_response({"success": False, "error": "No message provided"})
return
if self.bot and hasattr(self.bot, 'send_message_sync'):
try:
# Send to main channel
self.bot.send_message_sync("#petz", f"š¢ ANNOUNCEMENT: {message}")
self.send_json_response({"success": True, "message": "Announcement sent"})
except Exception as e:
self.send_json_response({"success": False, "error": str(e)})
else:
self.send_json_response({"success": False, "error": "IRC not available"})
def handle_admin_monitor(self, monitor_type):
"""Get monitoring data"""
# TODO: Implement real monitoring data
self.send_json_response({
"success": True,
"type": monitor_type,
"data": []
})
def handle_admin_player_get(self, nickname):
"""Get player information"""
if not nickname:
self.send_json_response({"success": False, "error": "No nickname provided"}, 400)
return
try:
import asyncio
result = asyncio.run(self._get_player_info_async(nickname))
if result["success"]:
self.send_json_response(result)
else:
self.send_json_response(result, 404)
except Exception as e:
print(f"Error in handle_admin_player_get: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
def handle_admin_player_update(self, nickname, data):
"""Update player information"""
if not nickname:
self.send_json_response({"success": False, "error": "No nickname provided"}, 400)
return
if not data:
self.send_json_response({"success": False, "error": "No update data provided"}, 400)
return
try:
import asyncio
result = asyncio.run(self._update_player_async(nickname, data))
if result["success"]:
self.send_json_response(result)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_admin_player_update: {e}")
import traceback
traceback.print_exc()
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _update_player_async(self, nickname, data):
"""Update player information asynchronously"""
try:
# Validate input data
allowed_fields = ['level', 'experience', 'money']
updates = {}
for field in allowed_fields:
if field in data:
value = data[field]
# Validate each field
if field == 'level':
if not isinstance(value, int) or value < 1 or value > 100:
return {"success": False, "error": "Level must be between 1 and 100"}
updates[field] = value
elif field == 'experience':
if not isinstance(value, int) or value < 0:
return {"success": False, "error": "Experience cannot be negative"}
updates[field] = value
elif field == 'money':
if not isinstance(value, int) or value < 0:
return {"success": False, "error": "Money cannot be negative"}
updates[field] = value
if not updates:
return {"success": False, "error": "No valid fields to update"}
# Check if player exists
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Update player data
success = await self.database.update_player_admin(player["id"], updates)
if success:
return {
"success": True,
"message": f"Updated {', '.join(updates.keys())} for player {nickname}",
"updated_fields": list(updates.keys())
}
else:
return {"success": False, "error": "Failed to update player data"}
except Exception as e:
print(f"Error updating player: {e}")
return {"success": False, "error": str(e)}
async def _get_player_info_async(self, nickname):
"""Get player information asynchronously"""
try:
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Get additional stats
pets = await self.database.get_player_pets(player["id"])
# Get location name if current_location_id is set
location_name = "Unknown"
if player.get("current_location_id"):
try:
location_data = await self.database.get_location_by_id(player["current_location_id"])
if location_data:
location_name = location_data["name"]
else:
location_name = f"Location ID {player['current_location_id']}"
except Exception as loc_error:
print(f"Error resolving location: {loc_error}")
location_name = f"Location ID {player['current_location_id']}"
# Get team composition
team_info = await self.database.get_team_composition(player["id"])
return {
"success": True,
"player": {
"nickname": player["nickname"],
"level": player["level"],
"experience": player["experience"],
"money": player["money"],
"current_location": location_name,
"pet_count": len(pets),
"active_pets": team_info.get("active_pets", 0),
"total_pets": team_info.get("total_pets", 0)
}
}
except Exception as e:
print(f"Error getting player info: {e}")
return {"success": False, "error": str(e)}
def handle_admin_backups_list(self):
"""List available backups"""
try:
import os
backup_dir = "backups"
if not os.path.exists(backup_dir):
self.send_json_response({
"success": True,
"backups": []
})
return
backups = []
for filename in os.listdir(backup_dir):
if filename.endswith('.gz'):
filepath = os.path.join(backup_dir, filename)
size = os.path.getsize(filepath)
# Convert size to human readable format
if size < 1024:
size_str = f"{size} B"
elif size < 1024 * 1024:
size_str = f"{size / 1024:.1f} KB"
else:
size_str = f"{size / (1024 * 1024):.1f} MB"
backups.append({
"name": filename,
"size": size_str
})
# Sort by name (newest first)
backups.sort(key=lambda x: x['name'], reverse=True)
self.send_json_response({
"success": True,
"backups": backups[:10] # Show only last 10 backups
})
except Exception as e:
print(f"Error listing backups: {e}")
self.send_json_response({"success": False, "error": str(e)}, 500)
def handle_admin_broadcast(self, data):
"""Send IRC broadcast message"""
message = data.get('message', '').strip()
if not message:
self.send_json_response({"success": False, "error": "No message provided"}, 400)
return
try:
# Send to IRC channel via bot
if self.bot and hasattr(self.bot, 'send_message_sync'):
from config import IRC_CONFIG
channel = IRC_CONFIG.get("channel", "#petz")
self.bot.send_message_sync(channel, f"š¢ Admin Announcement: {message}")
self.send_json_response({
"success": True,
"message": "Broadcast sent successfully"
})
else:
self.send_json_response({"success": False, "error": "IRC bot not available"}, 500)
except Exception as e:
print(f"Error sending broadcast: {e}")
self.send_json_response({"success": False, "error": str(e)}, 500)
def handle_admin_irc_status(self):
"""Get comprehensive IRC connection status and activity"""
try:
print(f"IRC status request - checking bot availability...")
bot = self.bot
print(f"Bot instance: {bot}")
if bot and hasattr(bot, 'connection_manager') and bot.connection_manager:
connection_manager = bot.connection_manager
print(f"Connection manager: {connection_manager}")
if hasattr(connection_manager, 'get_connection_stats'):
try:
stats = connection_manager.get_connection_stats()
print(f"Got comprehensive connection stats")
# Return comprehensive IRC status
response_data = {
"success": True,
"irc_status": stats
}
print(f"Sending comprehensive IRC response")
self.send_json_response(response_data)
except Exception as stats_error:
print(f"Error getting connection stats: {stats_error}")
import traceback
traceback.print_exc()
self.send_json_response({
"success": False,
"error": f"Failed to get connection stats: {str(stats_error)}"
}, 500)
else:
print("Connection manager has no get_connection_stats method")
self.send_json_response({
"success": False,
"error": "Connection manager missing get_connection_stats method"
}, 500)
else:
print("No bot instance or connection manager available")
self.send_json_response({
"success": True,
"irc_status": {
"connected": False,
"state": "disconnected",
"error": "Bot instance or connection manager not available"
}
})
except Exception as e:
print(f"Error getting IRC status: {e}")
import traceback
traceback.print_exc()
try:
self.send_json_response({"success": False, "error": str(e)}, 500)
except Exception as json_error:
print(f"Failed to send JSON error response: {json_error}")
# Send a basic HTTP error response if JSON fails
self.send_error(500, f"IRC status error: {str(e)}")
def handle_admin_rate_stats(self, query_params):
"""Get rate limiting statistics"""
try:
username = query_params.get('user', '').strip()
if self.bot and hasattr(self.bot, 'rate_limiter') and self.bot.rate_limiter:
if username:
# Get stats for specific user
user_stats = self.bot.rate_limiter.get_user_stats(username)
self.send_json_response({
"success": True,
"stats": {
"violations": user_stats.get("violations", 0),
"banned": user_stats.get("banned", False),
"last_violation": user_stats.get("last_violation", "Never")
}
})
else:
# Get global stats
global_stats = self.bot.rate_limiter.get_global_stats()
self.send_json_response({
"success": True,
"stats": {
"total_users": global_stats.get("total_users", 0),
"active_bans": global_stats.get("active_bans", 0),
"total_violations": global_stats.get("total_violations", 0)
}
})
else:
self.send_json_response({
"success": True,
"stats": {
"total_users": 0,
"active_bans": 0,
"total_violations": 0
}
})
except Exception as e:
print(f"Error getting rate stats: {e}")
self.send_json_response({"success": False, "error": str(e)}, 500)
def handle_admin_rate_reset(self, data):
"""Reset rate limiting for user or globally"""
try:
username = data.get('user', '').strip() if data.get('user') else None
if self.bot and hasattr(self.bot, 'rate_limiter') and self.bot.rate_limiter:
if username:
# Reset for specific user
success = self.bot.rate_limiter.reset_user(username)
if success:
self.send_json_response({
"success": True,
"message": f"Rate limits reset for user: {username}"
})
else:
self.send_json_response({"success": False, "error": f"User {username} not found"}, 404)
else:
# Global reset
self.bot.rate_limiter.reset_all()
self.send_json_response({
"success": True,
"message": "All rate limits reset successfully"
})
else:
self.send_json_response({"success": False, "error": "Rate limiter not available"}, 500)
except Exception as e:
print(f"Error resetting rate limits: {e}")
self.send_json_response({"success": False, "error": str(e)}, 500)
# ================================================================
# NEW TEAM BUILDER METHODS - Separated Team Management
# ================================================================
def serve_team_selection_hub(self, nickname):
"""Serve the team selection hub showing all teams with swap options."""
try:
# Get database and bot from server
database = self.server.database if hasattr(self.server, 'database') else None
bot = self.server.bot if hasattr(self.server, 'bot') else None
if not database:
self.send_error(500, "Database not available")
return
# Get team management service
if not hasattr(self, 'team_service'):
from src.team_management import TeamManagementService
from src.pin_authentication import PinAuthenticationService
pin_service = PinAuthenticationService(database, bot)
self.team_service = TeamManagementService(database, pin_service)
# Get team overview
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
player = loop.run_until_complete(database.get_player(nickname))
if not player:
self.send_error(404, "Player not found")
return
team_overview = loop.run_until_complete(self.team_service.get_team_overview(player["id"]))
if not team_overview["success"]:
self.send_error(500, f"Failed to load teams: {team_overview['error']}")
return
teams = team_overview["teams"]
finally:
loop.close()
# Generate team hub HTML
content = self.generate_team_hub_content(nickname, teams)
full_page = self.get_page_template(f"Team Management - {nickname}", content, "teambuilder")
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(full_page.encode('utf-8'))
except Exception as e:
print(f"Error serving team selection hub: {e}")
self.send_error(500, "Internal server error")
def serve_individual_team_editor(self, nickname, team_identifier):
"""Serve individual team editor page."""
try:
# Get database and bot from server
database = self.server.database if hasattr(self.server, 'database') else None
bot = self.server.bot if hasattr(self.server, 'bot') else None
if not database:
self.send_error(500, "Database not available")
return
# Get team management service
if not hasattr(self, 'team_service'):
from src.team_management import TeamManagementService
from src.pin_authentication import PinAuthenticationService
pin_service = PinAuthenticationService(database, bot)
self.team_service = TeamManagementService(database, pin_service)
# Get team data
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
player = loop.run_until_complete(database.get_player(nickname))
if not player:
self.send_error(404, "Player not found")
return
team_data = loop.run_until_complete(
self.team_service.get_individual_team_data(player["id"], team_identifier)
)
if not team_data["success"]:
self.send_error(500, f"Failed to load team: {team_data['error']}")
return
# Get player's pets for the editor
player_pets = loop.run_until_complete(database.get_player_pets(player["id"]))
finally:
loop.close()
# Generate individual team editor HTML
content = self.generate_individual_team_editor_content(nickname, team_identifier, team_data, player_pets)
full_page = self.get_page_template(f"{team_data['team_name']} - {nickname}", content, "teambuilder")
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(full_page.encode('utf-8'))
except Exception as e:
print(f"Error serving individual team editor: {e}")
self.send_error(500, "Internal server error")
def generate_team_hub_content(self, nickname, teams):
"""Generate HTML content for team selection hub."""
return f'''
š Team Management Hub
Manage your teams and swap configurations with PIN verification
'''
def _generate_team_preview(self, team_data, team_identifier):
"""Generate preview for a single team."""
team_name = team_data.get('name', f'Team {team_identifier}')
pet_count = team_data.get('count', 0)
is_active = team_data.get('is_active', False)
if is_active:
actions = f'''
āļø Edit Active Team
'''
else:
actions = f'''
āļø Edit Team {team_identifier}
'''
status = "š ACTIVE TEAM" if is_active else f"š¾ Saved Team"
return f'''
{team_name}
{status}
š¾ {pet_count} pets
{actions}
'''
def _generate_active_team_display(self, active_team_data, nickname):
"""Generate detailed display for active team with individual pet cards."""
pets_dict = active_team_data.get('pets', {})
pet_count = active_team_data.get('count', 0)
# Convert dictionary format to list for consistent processing
pets = []
if isinstance(pets_dict, dict):
# Active team format: {"1": {pet_data}, "2": {pet_data}}
for position, pet_data in pets_dict.items():
if pet_data:
pet_data['team_order'] = int(position)
pets.append(pet_data)
elif isinstance(pets_dict, list):
# Saved team format: [{pet_data}, {pet_data}]
pets = pets_dict
if not pets or pet_count == 0:
return f'''
'''
# Generate individual pet cards for active team
pet_cards = []
for pet in pets:
# Handle both active team format and saved team format
name = pet.get('nickname') or pet.get('name') or pet.get('species_name', 'Unknown')
level = pet.get('level', 1)
hp = pet.get('hp', 0)
max_hp = pet.get('max_hp', 0)
attack = pet.get('attack', 0)
defense = pet.get('defense', 0)
speed = pet.get('speed', 0)
happiness = pet.get('happiness', 50)
species_name = pet.get('species_name', 'Unknown')
# Handle type field variations between active and saved teams
type1 = (pet.get('type1') or pet.get('type_primary') or
pet.get('type1', 'Normal'))
type2 = (pet.get('type2') or pet.get('type_secondary') or
pet.get('type2'))
team_order = pet.get('team_order', 0)
# Calculate HP percentage for health bar
hp_percent = (hp / max_hp) * 100 if max_hp > 0 else 0
hp_color = "#4CAF50" if hp_percent > 60 else "#FF9800" if hp_percent > 25 else "#f44336"
# Happiness emoji
if happiness >= 80:
happiness_emoji = "š"
elif happiness >= 60:
happiness_emoji = "š"
elif happiness >= 40:
happiness_emoji = "š"
elif happiness >= 20:
happiness_emoji = "š"
else:
happiness_emoji = "š¢"
# Type display
type_display = type1
if type2:
type_display += f"/{type2}"
pet_card = f'''
'''
def generate_individual_team_editor_content(self, nickname, team_identifier, team_data, player_pets):
"""Generate HTML content for individual team editor."""
team_name = team_data.get('team_name', 'Unknown Team')
is_active_team = team_data.get('is_active_team', False)
team_pets = team_data.get('pets', [])
# Separate pets into team and storage
team_pet_ids = [p['id'] for p in team_pets]
storage_pets = [p for p in player_pets if p['id'] not in team_pet_ids]
# Helper function to create detailed pet card
def make_pet_card(pet, in_team=False):
name = pet.get('nickname') or pet.get('species_name', 'Unknown')
pet_id = pet.get('id', 0)
level = pet.get('level', 1)
species = pet.get('species_name', 'Unknown')
# Type info
type_str = pet.get('type1', 'Normal')
if pet.get('type2'):
type_str += f"/{pet['type2']}"
# HP calculation
hp = pet.get('hp', 0)
max_hp = pet.get('max_hp', 1)
hp_percent = (hp / max_hp * 100) if max_hp > 0 else 0
hp_color = "#4CAF50" if hp_percent > 60 else "#FF9800" if hp_percent > 25 else "#f44336"
# Get pet moves
moves = pet.get('moves', [])
moves_html = ''
if moves:
moves_html = '
Moves: ' + ', '.join([m.get('name', 'Unknown') for m in moves[:4]]) + '
'
return f"""
{pet.get('emoji', 'š¾')} {name}
Lv.{level}
{species}
{type_str}
HP{hp}/{max_hp}
ATK{pet.get('attack', 0)}
DEF{pet.get('defense', 0)}
SPD{pet.get('speed', 0)}
{moves_html}
{'š' if pet.get('happiness', 50) > 70 else 'š' if pet.get('happiness', 50) > 40 else 'š'}Happiness: {pet.get('happiness', 50)}/100
"""
# Create team slots (6 slots)
team_slots_html = ''
for i in range(1, 7):
# Find pet in this slot
slot_pet = None
for pet in team_pets:
if pet.get('team_order') == i or (i == 1 and not any(p.get('team_order') == 1 for p in team_pets) and team_pets and pet == team_pets[0]):
slot_pet = pet
break
if slot_pet:
team_slots_html += f"""
#{i}
{make_pet_card(slot_pet, True)}
"""
else:
team_slots_html += f"""
#{i}
āDrop pet here
"""
# Create storage pet cards
storage_cards_html = ''.join([make_pet_card(pet, False) for pet in storage_pets])
if not storage_cards_html:
storage_cards_html = '
No pets in storage. All your pets are in teams!
'
return f'''
āļø {team_name}
{"ā” Active battle team" if is_active_team else f"š¾ Saved team configuration (Slot {team_identifier})"}
'''
class PetBotWebServer:
"""Standalone web server for PetBot"""
def __init__(self, database=None, port=8080, bot=None):
self.database = database or Database()
self.port = port
self.bot = bot
self.server = None
def run(self):
"""Start the web server"""
self.server = HTTPServer(('0.0.0.0', self.port), PetBotRequestHandler)
self.server.database = self.database
self.server.bot = self.bot
print(f'š Starting PetBot web server on http://0.0.0.0:{self.port}')
print(f'š” Accessible from WSL at: http://172.27.217.61:{self.port}')
print(f'š” Accessible from Windows at: http://localhost:{self.port}')
print('')
print('š Public access at: http://petz.rdx4.com/')
print('')
self.server.serve_forever()
def start_in_thread(self):
"""Start the web server in a background thread"""
import threading
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
def stop(self):
"""Stop the web server"""
if self.server:
self.server.shutdown()
self.server.server_close()
def run_standalone():
"""Run the web server in standalone mode"""
import sys
port = 8080
if len(sys.argv) > 1:
try:
port = int(sys.argv[1])
except ValueError:
print('Usage: python webserver.py [port]')
sys.exit(1)
server = PetBotWebServer(port=port)
print('š PetBot Web Server')
print('=' * 50)
print(f'Port: {port}')
print('')
print('š Local URLs:')
print(f' http://localhost:{port}/ - Game Hub (local)')
print(f' http://localhost:{port}/help - Command Help (local)')
print(f' http://localhost:{port}/players - Player List (local)')
print(f' http://localhost:{port}/leaderboard - Leaderboard (local)')
print(f' http://localhost:{port}/locations - Locations (local)')
print('')
print('š Public URLs:')
print(' http://petz.rdx4.com/ - Game Hub')
print(' http://petz.rdx4.com/help - Command Help')
print(' http://petz.rdx4.com/players - Player List')
print(' http://petz.rdx4.com/leaderboard - Leaderboard')
print(' http://petz.rdx4.com/locations - Locations')
print('')
print('Press Ctrl+C to stop')
try:
server.run()
except KeyboardInterrupt:
print('\nā Web server stopped')
class PetBotWebServer:
"""Standalone web server for PetBot"""
def __init__(self, database=None, port=8080, bot=None):
self.database = database or Database()
self.port = port
self.bot = bot
self.server = None
def run(self):
"""Start the web server"""
self.server = HTTPServer(('0.0.0.0', self.port), PetBotRequestHandler)
self.server.database = self.database
self.server.bot = self.bot
print(f'š Starting PetBot web server on http://0.0.0.0:{self.port}')
print(f'š” Accessible from WSL at: http://172.27.217.61:{self.port}')
print(f'š” Accessible from Windows at: http://localhost:{self.port}')
print('')
try:
self.server.serve_forever()
except KeyboardInterrupt:
print('\nš Server stopped')
finally:
self.server.server_close()
def start_in_thread(self):
"""Start the web server in a separate thread"""
import threading
def run_server():
self.server = HTTPServer(('0.0.0.0', self.port), PetBotRequestHandler)
self.server.database = self.database
self.server.bot = self.bot
try:
self.server.serve_forever()
except Exception as e:
print(f"Web server error: {e}")
finally:
self.server.server_close()
self.server_thread = threading.Thread(target=run_server, daemon=True)
self.server_thread.start()
def stop(self):
"""Stop the web server"""
if self.server:
print('š Stopping web server...')
self.server.shutdown()
self.server.server_close()
def run_standalone():
"""Run the web server in standalone mode"""
import sys
port = 8080
if len(sys.argv) > 1:
try:
port = int(sys.argv[1])
except ValueError:
print('Usage: python webserver.py [port]')
sys.exit(1)
server = PetBotWebServer(port=port)
print('š PetBot Web Server')
print('=' * 50)
print(f'Port: {port}')
print('')
print('š Local URLs:')
print(f' http://localhost:{port}/ - Game Hub (local)')
print(f' http://localhost:{port}/help - Command Help (local)')
print(f' http://localhost:{port}/players - Player List (local)')
print(f' http://localhost:{port}/leaderboard - Leaderboard (local)')
print(f' http://localhost:{port}/locations - Locations (local)')
print('')
print('š Public URLs:')
print(' http://petz.rdx4.com/ - Game Hub')
print(' http://petz.rdx4.com/help - Command Help')
print(' http://petz.rdx4.com/players - Player List')
print(' http://petz.rdx4.com/leaderboard - Leaderboard')
print(' http://petz.rdx4.com/locations - Locations')
print('')
print('š± Example Player Profile:')
print(' http://petz.rdx4.com/player/megasconed')
print('')
print('āļø Press Ctrl+C to stop the server')
print('')
try:
server.run()
except KeyboardInterrupt:
print('\nš Shutting down web server...')
server.stop()
if __name__ == '__main__':
run_standalone()