'''
else:
# Regular nav link
nav_links += f'{page_name}'
# Add authentication status to navigation
auth_links = ""
authenticated_player = self.check_player_session()
if authenticated_player:
# Player is logged in - show profile link and logout
auth_links = f'''
Thunderstorm - 2.0x Electric spawns (30-60 minutes)
Blizzard - 1.7x Ice/Water spawns (1-2 hours)
Earthquake - 1.8x Rock spawns (30-90 minutes)
Calm - Normal spawns (1.5-3 hours)
āļø Battle System
!catch / !capture
Attempt to catch a wild pet that appeared during exploration. Success depends on the pet's level and rarity.
Example: !catch
!battle
Start a turn-based battle with a wild pet. Defeat it to gain experience and money for your active pet.
Example: !battle
!attack <move>
Use a specific move during battle. Each move has different power, type, and effects.
Example: !attack flamethrower
!moves
View all available moves for your active pet, including their types and power levels.
Example: !moves
!flee
Attempt to escape from the current battle. Not always successful!
Example: !flee
šļø Gym Battles NEW!
!gym
List all gyms in your current location with your progress. Shows victories and next difficulty level.
Example: !gym
!gym list
Show all gyms across all locations with your badge collection progress.
Example: !gym list
!gym challenge "<name>"
Challenge a gym leader! You must be in the same location as the gym. Difficulty increases with each victory.
Example: !gym challenge "Forest Guardian"
!gym info "<name>"
Get detailed information about a gym including leader, theme, team, and badge details.
Example: !gym info "Storm Master"
š” Gym Strategy: Each gym specializes in a specific type. Bring pets with type advantages! The more you beat a gym, the harder it gets, but the better the rewards!
Access your team builder web interface for drag-and-drop team management with PIN verification.
Example: !team
!pets
View your complete pet collection with detailed stats and information via web interface.
Example: !pets
!activate <pet>
Add a pet to your active battle team. You can have multiple active pets for different situations.
Example: !activate flamey
!deactivate <pet>
Remove a pet from your active team and put it in storage.
Example: !deactivate aqua
!nickname <pet> <new_name>
Give a custom nickname to one of your pets. Nicknames must be 20 characters or less.
Example: !nickname flamey FireStorm
š Inventory System NEW!
!inventory / !inv / !items
View all items in your inventory organized by category. Shows quantities and item descriptions.
Example: !inventory
!use <item name>
Use a consumable item from your inventory. Items can heal pets, boost stats, or provide other benefits.
Example: !use Small Potion
šÆ Item Categories & Rarities
ā Common (15%) - Small Potions, basic healing items
ā Uncommon (8-12%) - Large Potions, battle boosters, special berries
ā Rare (3-6%) - Super Potions, speed elixirs, location treasures
ā Epic (2-3%) - Evolution stones, rare crystals, ancient artifacts
⦠Legendary (1%) - Lucky charms, ancient fossils, ultimate items
š” Item Discovery: Find items while exploring! Each location has unique treasures including rare Coin Pouches with 1-3 coins. Items stack in your inventory and can be used anytime.
š Achievements & Progress
!achievements
View your achievement progress and see which new locations you've unlocked.
Example: !achievements
šÆ Location Unlock Requirements
Pet Collector (5 pets) ā Unlocks Whispering Woods
Spark Collector (2 Electric species) ā Unlocks Electric Canyon
Rock Hound (3 Rock species) ā Unlocks Crystal Caves
"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html_content.encode())
except FileNotFoundError:
self.serve_error_page("Help", "Help file not found")
except Exception as e:
self.serve_error_page("Help", f"Error loading help file: {str(e)}")
def serve_faq(self):
"""Serve the FAQ page using unified template"""
try:
with open('faq.html', 'r', encoding='utf-8') as f:
faq_content = f.read()
# Extract CSS from faq.html
css_match = re.search(r'', faq_content, re.DOTALL)
faq_css = css_match.group(1) if css_match else ""
# Extract body content (everything between tags)
body_match = re.search(r']*>(.*?)', faq_content, re.DOTALL)
if body_match:
body_content = body_match.group(1)
# Remove the back link since we'll have the navigation bar
body_content = re.sub(r'.*?', '', body_content, flags=re.DOTALL)
else:
# Fallback: use original content if we can't parse it
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(faq_content.encode())
return
# Create template with merged CSS
html_content = f"""
PetBot - FAQ
{self.get_navigation_bar("faq")}
{body_content}
"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html_content.encode())
except FileNotFoundError:
self.serve_error_page("FAQ", "FAQ file not found")
except Exception as e:
self.serve_error_page("FAQ", f"Error loading FAQ file: {str(e)}")
def serve_players(self):
"""Serve the players page with real data"""
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_error_page("Players", "Database not available")
return
# Fetch players data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
players_data = loop.run_until_complete(self.fetch_players_data(database))
loop.close()
self.serve_players_data(players_data)
except Exception as e:
print(f"Error fetching players data: {e}")
self.serve_error_page("Players", f"Error loading players: {str(e)}")
async def fetch_players_data(self, database):
"""Fetch all players data from database"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
# Get all players with basic stats
cursor = await db.execute("""
SELECT p.nickname, p.level, p.experience, p.money, p.created_at,
l.name as location_name,
(SELECT COUNT(*) FROM pets WHERE player_id = p.id) as pet_count,
(SELECT COUNT(*) FROM pets WHERE player_id = p.id AND is_active = 1) as active_pets,
(SELECT COUNT(*) FROM player_achievements WHERE player_id = p.id) as achievement_count
FROM players p
LEFT JOIN locations l ON p.current_location_id = l.id
ORDER BY p.level DESC, p.experience DESC
""")
rows = await cursor.fetchall()
# Convert SQLite rows to dictionaries properly
players = []
for row in rows:
player_dict = {
'nickname': row[0],
'level': row[1],
'experience': row[2],
'money': row[3],
'created_at': row[4],
'location_name': row[5],
'pet_count': row[6],
'active_pets': row[7],
'achievement_count': row[8]
}
players.append(player_dict)
return players
except Exception as e:
print(f"Database error fetching players: {e}")
return []
def serve_players_data(self, players_data):
"""Serve players page with real data"""
# Calculate statistics
total_players = len(players_data)
total_pets = sum(p['pet_count'] for p in players_data) if players_data else 0
total_achievements = sum(p['achievement_count'] for p in players_data) if players_data else 0
highest_level = max((p['level'] for p in players_data), default=0) if players_data else 0
# Build statistics cards
stats_content = f"""
š Total Players
{total_players}
š¾ Total Pets
{total_pets}
š Achievements
{total_achievements}
ā Highest Level
{highest_level}
"""
# Build players table HTML
if players_data:
players_html = ""
for i, player in enumerate(players_data, 1):
rank_emoji = {"1": "š„", "2": "š„", "3": "š„"}.get(str(i), f"{i}.")
safe_nickname = escape_attr(player['nickname'])
safe_nickname_display = escape_html(player['nickname'])
safe_location = escape_html(player.get('location_name', 'Unknown'))
players_html += f"""
'
# Data rows
content += ''
for i, player in enumerate(data):
row_data = row_formatter(player, i)
rank_class = f"rank-{i+1}" if i < 3 else ""
content += f'
'
for cell in row_data:
content += f'
{cell}
'
content += '
'
content += ''
content += '
'
content += '
'
return content
def serve_locations(self):
"""Serve the locations page with real data"""
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_error_page("Locations", "Database not available")
return
# Fetch locations data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
locations_data = loop.run_until_complete(self.fetch_locations_data(database))
player_locations = loop.run_until_complete(self.fetch_player_locations(database))
loop.close()
self.serve_locations_data(locations_data, player_locations)
except Exception as e:
print(f"Error fetching locations data: {e}")
self.serve_error_page("Locations", f"Error loading locations: {str(e)}")
async def fetch_locations_data(self, database):
"""Fetch all locations and their spawn data from database"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
# Get all locations
cursor = await db.execute("""
SELECT l.*,
GROUP_CONCAT(DISTINCT ps.name || ' (' || ps.type1 ||
CASE WHEN ps.type2 IS NOT NULL THEN '/' || ps.type2 ELSE '' END || ')') as spawns
FROM locations l
LEFT JOIN location_spawns ls ON l.id = ls.location_id
LEFT JOIN pet_species ps ON ls.species_id = ps.id
GROUP BY l.id
ORDER BY l.id
""")
rows = await cursor.fetchall()
# Convert SQLite rows to dictionaries properly
locations = []
for row in rows:
location_dict = {
'id': row[0],
'name': row[1],
'description': row[2],
'level_min': row[3],
'level_max': row[4],
'spawns': row[5] if len(row) > 5 else None
}
locations.append(location_dict)
return locations
except Exception as e:
print(f"Database error fetching locations: {e}")
return []
async def fetch_player_locations(self, database):
"""Fetch player locations for the interactive map"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT p.nickname, p.current_location_id, l.name as location_name
FROM players p
JOIN locations l ON p.current_location_id = l.id
ORDER BY p.nickname
""")
rows = await cursor.fetchall()
players = []
for row in rows:
player_dict = {
'nickname': row['nickname'],
'location_id': row['current_location_id'],
'location_name': row['location_name']
}
players.append(player_dict)
return players
except Exception as e:
print(f"Database error fetching player locations: {e}")
return []
def create_interactive_map(self, locations_data, player_locations):
"""Create an interactive SVG map showing player locations"""
if not locations_data:
return ""
# Define map layout - create a unique visual design
map_positions = {
1: {"x": 200, "y": 400, "shape": "circle", "color": "#4CAF50"}, # Starter Town - central
2: {"x": 100, "y": 200, "shape": "hexagon", "color": "#2E7D32"}, # Whispering Woods - forest
3: {"x": 400, "y": 150, "shape": "diamond", "color": "#FF9800"}, # Thunder Peaks - mountain
4: {"x": 550, "y": 300, "shape": "octagon", "color": "#795548"}, # Stone Caverns - cave
5: {"x": 300, "y": 500, "shape": "star", "color": "#2196F3"}, # Frozen Lake - ice
6: {"x": 500, "y": 450, "shape": "triangle", "color": "#F44336"} # Volcanic Crater - fire
}
# Create player location groups
location_players = {}
for player in player_locations or []:
loc_id = player['location_id']
if loc_id not in location_players:
location_players[loc_id] = []
location_players[loc_id].append(player['nickname'])
# SVG map content
svg_content = ""
# Add connecting paths between locations
paths = [
(1, 2), (1, 3), (1, 5), # Starter Town connections
(2, 5), (3, 4), (4, 6), (5, 6) # Other connections
]
for start, end in paths:
if start in map_positions and end in map_positions:
start_pos = map_positions[start]
end_pos = map_positions[end]
svg_content += f"""
"""
# Add location shapes
for location in locations_data:
loc_id = location['id']
if loc_id not in map_positions:
continue
pos = map_positions[loc_id]
players_here = location_players.get(loc_id, [])
player_count = len(players_here)
# Create shape based on type
shape_svg = self.create_location_shape(pos, location, player_count)
svg_content += shape_svg
# Add location label
svg_content += f"""
{location['name']}
"""
# Add player names if any
if players_here:
player_text = ", ".join(players_here)
svg_content += f"""
{player_text}
"""
return f"""
šŗļø Interactive World Map
Current player locations - shapes represent different terrain types
Towns
Forests
Mountains
Caves
Ice Areas
Volcanic
"""
def create_location_shape(self, pos, location, player_count):
"""Create SVG shape for a location based on its type"""
x, y = pos['x'], pos['y']
color = pos['color']
shape = pos['shape']
# Add glow effect if players are present
glow = 'filter="url(#glow)"' if player_count > 0 else ''
# Base size with scaling for player count
base_size = 25 + (player_count * 3)
if shape == "circle":
return f"""
"""
elif shape == "hexagon":
points = []
for i in range(6):
angle = i * 60 * math.pi / 180
px = x + base_size * math.cos(angle)
py = y + base_size * math.sin(angle)
points.append(f"{px},{py}")
return f"""
"""
elif shape == "diamond":
return f"""
"""
elif shape == "triangle":
return f"""
"""
elif shape == "star":
# Create 5-pointed star
points = []
for i in range(10):
angle = i * 36 * math.pi / 180
radius = base_size if i % 2 == 0 else base_size * 0.5
px = x + radius * math.cos(angle)
py = y + radius * math.sin(angle)
points.append(f"{px},{py}")
return f"""
"""
elif shape == "octagon":
points = []
for i in range(8):
angle = i * 45 * math.pi / 180
px = x + base_size * math.cos(angle)
py = y + base_size * math.sin(angle)
points.append(f"{px},{py}")
return f"""
"""
else:
# Default to circle
return f"""
"""
def serve_locations_data(self, locations_data, player_locations=None):
"""Serve locations page with real data using unified template"""
# Build locations HTML
locations_html = ""
if locations_data:
for location in locations_data:
spawns = location.get('spawns', 'No pets found')
if not spawns or spawns == 'None':
spawns = "No pets spawn here yet"
# Split spawns into a readable list and remove duplicates
if spawns != "No pets spawn here yet":
spawn_list = list(set([spawn.strip() for spawn in spawns.split(',') if spawn.strip()]))
spawn_list.sort() # Sort alphabetically for consistency
else:
spawn_list = []
spawn_badges = ""
visible_spawns = spawn_list[:6] # Show first 6
hidden_spawns = spawn_list[6:] # Hide the rest
# Add visible spawn badges
for spawn in visible_spawns:
spawn_badges += f'{spawn}'
# Add hidden spawn badges (initially hidden)
if hidden_spawns:
location_id = location['id']
for spawn in hidden_spawns:
spawn_badges += f'{spawn}'
# Add functional "show more" button
spawn_badges += f'+{len(hidden_spawns)} more'
if not spawn_badges:
spawn_badges = 'No pets spawn here yet'
locations_html += f"""
Explore all areas and discover what pets await you!
{map_html}
šÆ How Locations Work
Travel: Use !travel <location> to move between areas
Explore: Use !explore to find wild pets in your current location
Unlock: Some locations require achievements - catch specific pet types to unlock new areas!
Weather: Check !weather for conditions that boost certain pet spawn rates
{locations_html}
š” Use !wild <location> in #petz to see what pets spawn in a specific area
"""
# Add locations-specific CSS
additional_css = """
.map-section {
background: var(--bg-secondary);
border-radius: 15px;
padding: 30px;
margin: 30px 0;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
border: 1px solid var(--border-color);
}
.map-section h2 {
color: var(--text-accent);
text-align: center;
margin-bottom: 10px;
}
.map-container {
display: flex;
justify-content: center;
margin: 20px 0;
}
.map-container svg {
border-radius: 10px;
box-shadow: 0 4px 15px rgba(0,0,0,0.5);
max-width: 100%;
height: auto;
}
.map-legend {
display: flex;
justify-content: center;
flex-wrap: wrap;
gap: 20px;
margin-top: 20px;
}
.legend-item {
display: flex;
align-items: center;
gap: 8px;
color: var(--text-primary);
font-size: 0.9em;
}
.legend-shape {
width: 16px;
height: 16px;
border-radius: 3px;
border: 1px solid white;
}
.legend-shape.circle {
border-radius: 50%;
}
.legend-shape.hexagon {
border-radius: 3px;
transform: rotate(45deg);
}
.legend-shape.diamond {
transform: rotate(45deg);
}
.legend-shape.triangle {
clip-path: polygon(50% 0%, 0% 100%, 100% 100%);
}
.legend-shape.star {
clip-path: polygon(50% 0%, 61% 35%, 98% 35%, 68% 57%, 79% 91%, 50% 70%, 21% 91%, 32% 57%, 2% 35%, 39% 35%);
}
.legend-shape.octagon {
clip-path: polygon(30% 0%, 70% 0%, 100% 30%, 100% 70%, 70% 100%, 30% 100%, 0% 70%, 0% 30%);
}
.locations-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(400px, 1fr));
gap: 25px;
margin-top: 30px;
}
.location-card {
background: var(--bg-secondary);
border-radius: 15px;
overflow: hidden;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
border: 1px solid var(--border-color);
transition: transform 0.3s ease;
}
.location-card:hover {
transform: translateY(-5px);
}
.location-header {
background: var(--gradient-primary);
color: white;
padding: 20px;
display: flex;
justify-content: space-between;
align-items: center;
}
.location-header h3 {
margin: 0;
font-size: 1.3em;
font-weight: 700;
}
.location-id {
background: rgba(255,255,255,0.2);
padding: 5px 10px;
border-radius: 20px;
font-size: 0.8em;
}
.location-description {
padding: 20px;
color: var(--text-primary);
font-style: italic;
border-bottom: 1px solid var(--border-color);
}
.location-levels {
padding: 20px;
padding-bottom: 10px;
border-bottom: 1px solid var(--border-color);
}
.location-spawns {
padding: 20px;
}
.spawn-badge {
background: var(--bg-tertiary);
color: var(--text-accent);
padding: 4px 10px;
border-radius: 12px;
font-size: 0.85em;
margin: 3px;
display: inline-block;
border: 1px solid var(--border-color);
}
.hidden-spawn {
display: none;
}
.more-button {
background: var(--gradient-primary) !important;
color: white !important;
cursor: pointer;
transition: transform 0.2s ease;
}
.more-button:hover {
transform: scale(1.05);
}
.less-button {
background: #ff6b6b !important;
color: white !important;
cursor: pointer;
transition: transform 0.2s ease;
}
.less-button:hover {
transform: scale(1.05);
}
.info-section {
background: var(--bg-secondary);
border-radius: 15px;
padding: 25px;
margin-bottom: 30px;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
border: 1px solid var(--border-color);
}
.info-section h2 {
color: var(--text-accent);
margin-top: 0;
}
"""
# Get the unified template with additional CSS
html_content = self.get_page_template("Game Locations", content, "locations")
# Insert additional CSS before closing tag
html_content = html_content.replace("", additional_css + "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html_content.encode())
def serve_petdex(self):
"""Serve the petdex page with all pet species data"""
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_error_page("Petdex", "Database not available")
return
# Parse URL parameters for sorting
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
sort_mode = query_params.get('sort', ['rarity'])[0] # Default to rarity
search_query = query_params.get('search', [''])[0] # Default to empty search
# Fetch petdex data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
petdex_data = loop.run_until_complete(self.fetch_petdex_data(database))
loop.close()
self.serve_petdex_data(petdex_data, sort_mode, search_query)
except Exception as e:
print(f"Error fetching petdex data: {e}")
self.serve_error_page("Petdex", f"Error loading petdex: {str(e)}")
async def fetch_petdex_data(self, database):
"""Fetch all pet species data from database"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
# Get all pet species with evolution information (no duplicates)
cursor = await db.execute("""
SELECT DISTINCT ps.id, ps.name, ps.type1, ps.type2, ps.base_hp, ps.base_attack,
ps.base_defense, ps.base_speed, ps.evolution_level, ps.evolution_species_id,
ps.rarity, ps.emoji,
evolve_to.name as evolves_to_name,
(SELECT COUNT(*) FROM location_spawns ls WHERE ls.species_id = ps.id) as location_count
FROM pet_species ps
LEFT JOIN pet_species evolve_to ON ps.evolution_species_id = evolve_to.id
ORDER BY ps.rarity ASC, ps.name ASC
""")
rows = await cursor.fetchall()
pets = []
for row in rows:
pet_dict = {
'id': row[0], 'name': row[1], 'type1': row[2], 'type2': row[3],
'base_hp': row[4], 'base_attack': row[5], 'base_defense': row[6],
'base_speed': row[7], 'evolution_level': row[8],
'evolution_species_id': row[9], 'rarity': row[10], 'emoji': row[11],
'evolves_to_name': row[12], 'location_count': row[13]
}
pets.append(pet_dict)
# Get spawn locations for each pet
for pet in pets:
cursor = await db.execute("""
SELECT l.name, ls.min_level, ls.max_level, ls.spawn_rate
FROM location_spawns ls
JOIN locations l ON ls.location_id = l.id
WHERE ls.species_id = ?
ORDER BY l.name ASC
""", (pet['id'],))
spawn_rows = await cursor.fetchall()
pet['spawn_locations'] = []
for spawn_row in spawn_rows:
spawn_dict = {
'location_name': spawn_row[0], 'min_level': spawn_row[1],
'max_level': spawn_row[2], 'spawn_rate': spawn_row[3]
}
pet['spawn_locations'].append(spawn_dict)
return pets
except Exception as e:
print(f"Database error fetching petdex: {e}")
return []
def remove_pet_duplicates(self, pets_list):
"""Remove duplicate pets based on ID and sort by name"""
seen_ids = set()
unique_pets = []
for pet in pets_list:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_pets.append(pet)
return sorted(unique_pets, key=lambda x: x['name'])
def serve_petdex_data(self, petdex_data, sort_mode='rarity', search_query=''):
"""Serve petdex page with all pet species data"""
# Remove duplicates from input data first
petdex_data = self.remove_pet_duplicates(petdex_data)
# Apply search filter if provided
if search_query:
search_query = search_query.lower()
filtered_data = []
for pet in petdex_data:
# Search in name, type1, type2
if (search_query in pet['name'].lower() or
search_query in pet['type1'].lower() or
(pet['type2'] and search_query in pet['type2'].lower())):
filtered_data.append(pet)
petdex_data = filtered_data
# Build pet cards HTML grouped by rarity
rarity_names = {1: "Common", 2: "Uncommon", 3: "Rare", 4: "Epic", 5: "Legendary"}
rarity_colors = {1: "#ffffff", 2: "#1eff00", 3: "#0070dd", 4: "#a335ee", 5: "#ff8000"}
# Calculate statistics
total_species = len(petdex_data)
type_counts = {}
for pet in petdex_data:
if pet['type1'] not in type_counts:
type_counts[pet['type1']] = 0
type_counts[pet['type1']] += 1
if pet['type2'] and pet['type2'] not in type_counts:
type_counts[pet['type2']] = 0
if pet['type2']:
type_counts[pet['type2']] += 1
# Build statistics section
stats_content = f"""
š Total Species
{total_species}
šØ Types
{len(type_counts)}
ā Rarities
{len(set(pet['rarity'] for pet in petdex_data))}
𧬠Evolutions
{len([p for p in petdex_data if p['evolution_level']])}
"""
# Sort and group pets based on sort_mode
petdex_html = ""
total_species = len(petdex_data)
if sort_mode == 'type':
# Group by type1
pets_by_type = {}
for pet in petdex_data:
pet_type = pet['type1']
if pet_type not in pets_by_type:
pets_by_type[pet_type] = []
# Check for duplicates within this type
if pet not in pets_by_type[pet_type]:
pets_by_type[pet_type].append(pet)
# Sort each type group by name and remove any remaining duplicates
for type_name in pets_by_type:
# Remove duplicates based on pet ID and sort by name
seen_ids = set()
unique_pets = []
for pet in pets_by_type[type_name]:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_pets.append(pet)
pets_by_type[type_name] = sorted(unique_pets, key=lambda x: x['name'])
type_colors = {
'Fire': '#F08030', 'Water': '#6890F0', 'Grass': '#78C850', 'Electric': '#F8D030',
'Psychic': '#F85888', 'Ice': '#98D8D8', 'Dragon': '#7038F8', 'Dark': '#705848',
'Fighting': '#C03028', 'Poison': '#A040A0', 'Ground': '#E0C068', 'Flying': '#A890F0',
'Bug': '#A8B820', 'Rock': '#B8A038', 'Ghost': '#705898', 'Steel': '#B8B8D0',
'Normal': '#A8A878', 'Fairy': '#EE99AC'
}
for type_name in sorted(pets_by_type.keys()):
pets_in_type = pets_by_type[type_name]
type_color = type_colors.get(type_name, '#A8A878')
petdex_html += f"""
{type_name} Type ({len(pets_in_type)} species)
"""
for pet in pets_in_type:
type_str = pet['type1']
if pet['type2']:
type_str += f" / {pet['type2']}"
petdex_html += f"""
"""
elif sort_mode == 'location':
# Group by spawn locations
pets_by_location = {}
pets_no_location = []
for pet in petdex_data:
if pet['spawn_locations']:
for location in pet['spawn_locations']:
loc_name = location['location_name']
if loc_name not in pets_by_location:
pets_by_location[loc_name] = []
# Check for duplicates within this location
if pet not in pets_by_location[loc_name]:
pets_by_location[loc_name].append(pet)
else:
pets_no_location.append(pet)
# Sort each location group by name and remove any remaining duplicates
for location_name in pets_by_location:
# Remove duplicates based on pet ID and sort by name
seen_ids = set()
unique_pets = []
for pet in pets_by_location[location_name]:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_pets.append(pet)
pets_by_location[location_name] = sorted(unique_pets, key=lambda x: x['name'])
location_colors = {
'Starter Town': '#4CAF50',
'Whispering Woods': '#2E7D32',
'Thunder Peaks': '#FF9800',
'Stone Caverns': '#795548',
'Frozen Lake': '#2196F3',
'Volcanic Crater': '#F44336'
}
for location_name in sorted(pets_by_location.keys()):
pets_in_location = pets_by_location[location_name]
location_color = location_colors.get(location_name, '#A8A878')
petdex_html += f"""
"""
for pet in pets_in_location:
type_str = pet['type1']
if pet['type2']:
type_str += f" / {pet['type2']}"
rarity_color = rarity_colors.get(pet['rarity'], '#ffffff')
# Get level range for this location
level_range = ""
for location in pet['spawn_locations']:
if location['location_name'] == location_name:
level_range = f"Lv.{location['min_level']}-{location['max_level']}"
break
petdex_html += f"""
"""
# Add pets with no location at the end (remove duplicates)
if pets_no_location:
seen_ids = set()
unique_no_location = []
for pet in pets_no_location:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_no_location.append(pet)
pets_no_location = sorted(unique_no_location, key=lambda x: x['name'])
if pets_no_location:
petdex_html += f"""
"""
elif sort_mode == 'all':
# Show all pets in a grid format without grouping (duplicates already removed)
sorted_pets = sorted(petdex_data, key=lambda x: (x['rarity'], x['name']))
petdex_html += f"""
š All Pet Species ({len(sorted_pets)} total)
"""
for pet in sorted_pets:
type_str = pet['type1']
if pet['type2']:
type_str += f" / {pet['type2']}"
rarity_color = rarity_colors.get(pet['rarity'], '#ffffff')
# Get all spawn locations
location_text = ""
if pet['spawn_locations']:
locations = [f"{loc['location_name']} (Lv.{loc['min_level']}-{loc['max_level']})"
for loc in pet['spawn_locations'][:2]]
if len(pet['spawn_locations']) > 2:
locations.append(f"+{len(pet['spawn_locations']) - 2} more")
location_text = f"š {', '.join(locations)}"
else:
location_text = "š Location Unknown"
petdex_html += f"""
"""
else: # Default to rarity sorting
pets_by_rarity = {}
for pet in petdex_data:
rarity = pet['rarity']
if rarity not in pets_by_rarity:
pets_by_rarity[rarity] = []
# Check for duplicates within this rarity
if pet not in pets_by_rarity[rarity]:
pets_by_rarity[rarity].append(pet)
# Sort each rarity group by name and remove any remaining duplicates
for rarity in pets_by_rarity:
# Remove duplicates based on pet ID and sort by name
seen_ids = set()
unique_pets = []
for pet in pets_by_rarity[rarity]:
if pet['id'] not in seen_ids:
seen_ids.add(pet['id'])
unique_pets.append(pet)
pets_by_rarity[rarity] = sorted(unique_pets, key=lambda x: x['name'])
for rarity in sorted(pets_by_rarity.keys()):
pets_in_rarity = pets_by_rarity[rarity]
rarity_name = rarity_names.get(rarity, f"Rarity {rarity}")
rarity_color = rarity_colors.get(rarity, "#ffffff")
petdex_html += f"""
{rarity_name} ({len(pets_in_rarity)} species)
"""
for pet in pets_in_rarity:
# Build type display
type_str = pet['type1']
if pet['type2']:
type_str += f"/{pet['type2']}"
# Build evolution info
evolution_info = ""
if pet['evolution_level'] and pet['evolves_to_name']:
evolution_info = f" Evolves: Level {pet['evolution_level']} ā {pet['evolves_to_name']}"
elif pet['evolution_level']:
evolution_info = f" Evolves: Level {pet['evolution_level']}"
# Build spawn locations
spawn_info = ""
if pet['spawn_locations']:
locations = [f"{loc['location_name']} (Lv.{loc['min_level']}-{loc['max_level']})"
for loc in pet['spawn_locations'][:3]]
if len(pet['spawn_locations']) > 3:
locations.append(f"+{len(pet['spawn_locations']) - 3} more")
spawn_info = f" Found in: {', '.join(locations)}"
else:
spawn_info = " Found in: Not yet available"
# Calculate total base stats
total_stats = pet['base_hp'] + pet['base_attack'] + pet['base_defense'] + pet['base_speed']
petdex_html += f"""
š Found {len(petdex_data)} pets matching "{search_query}"
' if search_query else ''}
"""
# Determine header text based on sort mode
if sort_mode == 'type':
header_text = "š Pet Collection by Type"
description = "š· Pets are organized by their primary type. Each type has different strengths and weaknesses!"
elif sort_mode == 'name':
header_text = "š Pet Collection (A-Z)"
description = "š¤ All pets sorted alphabetically by name. Perfect for finding specific species!"
elif sort_mode == 'location':
header_text = "š Pet Collection by Location"
description = "šŗļø Pets are organized by where they can be found. Use !travel <location> to visit these areas!"
elif sort_mode == 'all':
header_text = "š Complete Pet Collection"
description = "š All pets displayed in a comprehensive grid view with locations and stats!"
else:
header_text = "š Pet Collection by Rarity"
description = "š Pets are organized by rarity. Use !wild <location> in #petz to see what spawns where!"
# Combine all content
content = f"""
š Petdex
Complete encyclopedia of all available pets
{stats_content}
{search_interface}
{header_text}
{description}
{petdex_html}
"""
html = self.get_page_template("Petdex", content, "petdex")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html.encode())
def serve_player_profile(self, nickname):
"""Serve individual player profile page"""
# URL decode the nickname in case it has special characters
from urllib.parse import unquote
nickname = unquote(nickname)
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_player_error(nickname, "Database not available")
return
# Fetch player data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
player_data = loop.run_until_complete(self.fetch_player_data(database, nickname))
loop.close()
if player_data is None:
self.serve_player_not_found(nickname)
return
self.serve_player_data(nickname, player_data)
except Exception as e:
print(f"Error fetching player data for {nickname}: {e}")
self.serve_player_error(nickname, f"Error loading player data: {str(e)}")
return
async def fetch_player_data(self, database, nickname):
"""Fetch all player data from database"""
try:
# Get player info
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
db.row_factory = aiosqlite.Row
# Get player basic info
cursor = await db.execute("""
SELECT p.*, l.name as location_name, l.description as location_desc
FROM players p
LEFT JOIN locations l ON p.current_location_id = l.id
WHERE p.nickname = ?
""", (nickname,))
player = await cursor.fetchone()
if not player:
return None
# Convert Row to dict
player_dict = dict(player)
# Get player pets
cursor = await db.execute("""
SELECT p.*, ps.name as species_name, ps.type1, ps.type2, ps.emoji
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ?
ORDER BY p.is_active DESC, p.level DESC, p.id ASC
""", (player_dict['id'],))
pets_rows = await cursor.fetchall()
pets = [dict(row) for row in pets_rows]
# Get player achievements
cursor = await db.execute("""
SELECT pa.*, a.name as achievement_name, a.description as achievement_desc
FROM player_achievements pa
JOIN achievements a ON pa.achievement_id = a.id
WHERE pa.player_id = ?
ORDER BY pa.completed_at DESC
""", (player_dict['id'],))
achievements_rows = await cursor.fetchall()
achievements = [dict(row) for row in achievements_rows]
# Get player inventory
cursor = await db.execute("""
SELECT i.name, i.description, i.category, i.rarity, pi.quantity
FROM player_inventory pi
JOIN items i ON pi.item_id = i.id
WHERE pi.player_id = ?
ORDER BY i.rarity DESC, i.name ASC
""", (player_dict['id'],))
inventory_rows = await cursor.fetchall()
inventory = [dict(row) for row in inventory_rows]
# Get player gym badges
cursor = await db.execute("""
SELECT g.name, g.badge_name, g.badge_icon, l.name as location_name,
pgb.victories, pgb.first_victory_date, pgb.highest_difficulty
FROM player_gym_battles pgb
JOIN gyms g ON pgb.gym_id = g.id
JOIN locations l ON g.location_id = l.id
WHERE pgb.player_id = ? AND pgb.victories > 0
ORDER BY pgb.first_victory_date ASC
""", (player_dict['id'],))
gym_badges_rows = await cursor.fetchall()
gym_badges = [dict(row) for row in gym_badges_rows]
# Get player encounters using database method
encounters = []
try:
# Use the existing database method which handles row factory properly
temp_encounters = await database.get_player_encounters(player_dict['id'])
for enc in temp_encounters:
encounter_dict = {
'species_name': enc['species_name'],
'type1': enc['type1'],
'type2': enc['type2'],
'rarity': enc['rarity'],
'total_encounters': enc['total_encounters'],
'caught_count': enc['caught_count'],
'first_encounter_date': enc['first_encounter_date']
}
encounters.append(encounter_dict)
except Exception as e:
print(f"Error fetching encounters: {e}")
encounters = []
# Get encounter stats
try:
encounter_stats = await database.get_encounter_stats(player_dict['id'])
except Exception as e:
print(f"Error fetching encounter stats: {e}")
encounter_stats = {
'species_encountered': 0,
'total_encounters': 0,
'total_species': 0,
'completion_percentage': 0.0
}
return {
'player': player_dict,
'pets': pets,
'achievements': achievements,
'inventory': inventory,
'gym_badges': gym_badges,
'encounters': encounters,
'encounter_stats': encounter_stats
}
except Exception as e:
print(f"Database error fetching player {nickname}: {e}")
return None
def serve_player_pets(self, nickname):
"""Serve pet management page for a player"""
try:
# Get player data using database method directly
player = asyncio.run(self.database.get_player(nickname))
if not player:
self.serve_player_not_found(nickname)
return
# Get player pets for management
player_pets = asyncio.run(self.database.get_player_pets_for_rename(player['id']))
# Render the pets management page
self.serve_pets_management_interface(nickname, player_pets)
except Exception as e:
print(f"Error serving player pets page: {e}")
self.serve_player_error(nickname, str(e))
def get_iv_grade(self, iv_value):
"""Get color grade for IV value"""
if iv_value >= 27: # 27-31 (Perfect)
return "perfect"
elif iv_value >= 21: # 21-26 (Excellent)
return "excellent"
elif iv_value >= 15: # 15-20 (Good)
return "good"
elif iv_value >= 10: # 10-14 (Fair)
return "fair"
else: # 0-9 (Poor)
return "poor"
def serve_pets_management_interface(self, nickname, pets):
"""Serve the pet management interface"""
if not pets:
self.serve_no_pets_error(nickname)
return
# Generate pet cards
pet_cards = []
for pet in pets:
status_badge = ""
if pet.get('is_active'):
team_order = pet.get('team_order', 0)
if team_order > 0:
status_badge = f'Team #{team_order}'
else:
status_badge = 'Active'
else:
status_badge = 'Storage'
fainted_badge = ""
if pet.get('fainted_at'):
fainted_badge = 'š Fainted'
current_name = escape_html(pet.get('nickname') or pet.get('species_name'))
pet_id = pet.get('id')
safe_species = escape_html(pet.get('species_name'))
pet_card = f"""
{'š' if pet['happiness'] > 70 else 'š' if pet['happiness'] > 40 else 'š'}Happiness: {pet['happiness']}/100
"""
# Create 6 numbered slots and place pets in their positions
team_slots = [''] * 6 # Initialize 6 empty slots
# Place active pets in their team_order positions
for pet in active_pets:
team_order = pet.get('team_order')
if team_order and 1 <= team_order <= 6:
team_slots[team_order - 1] = make_pet_card(pet, True)
storage_cards = ''.join(make_pet_card(pet, False) for pet in inactive_pets)
# Old template removed - using new unified template system below
# Generate storage pets HTML first
storage_pets_html = ""
for pet in inactive_pets:
storage_pets_html += make_pet_card(pet, False)
# Generate active pets HTML for team slots
active_pets_html = ""
for pet in active_pets:
if pet.get('team_order'):
active_pets_html += make_pet_card(pet, True)
# Create content using string concatenation instead of f-strings to avoid CSS brace issues
team_builder_content = """
š¾ Team Builder
Choose a team to edit, then drag pets between Active Team and Storage.
Changes are saved securely with PIN verification via IRC
š PIN Verification Required
A 6-digit PIN has been sent to you via IRC private message.
Enter the PIN below to confirm your team changes:
š” How to use:
⢠Drag pets to team slots
⢠Double-click to move pets
⢠Empty slots show placeholders
"""
# Generate team cards HTML
print(f"Debug: Generating team cards for {len(team_configs)} configs")
team_cards_html = ""
# If no team configs exist, create default slots with Team 1 showing current active team
if not team_configs:
print("Debug: No team configs found, creating default empty slots")
for slot in range(1, 4):
# For Team 1, show current active pets
if slot == 1:
pet_previews = ""
active_count = 0
for pos in range(1, 7):
# Find pet in position pos
pet_in_slot = next((pet for pet in active_pets if pet.get('team_order') == pos), None)
if pet_in_slot:
pet_name = pet_in_slot['nickname'] or pet_in_slot['species_name']
pet_emoji = pet_in_slot.get('emoji', 'š¾')
pet_previews += f'
{pet_emoji} {pet_name}
'
active_count += 1
else:
pet_previews += '
Empty
'
status_text = f"{active_count}/6 pets" if active_count > 0 else "Empty team"
else:
# Teams 2 and 3 are empty by default
pet_previews = '
'''
else:
for config in team_configs:
print(f"Debug: Processing config: {config}")
pet_previews = ""
# Special handling for Team 1 - show current active team instead of saved config
if config['slot'] == 1:
active_count = 0
for pos in range(1, 7):
# Find pet in position pos from current active team
pet_in_slot = next((pet for pet in active_pets if pet.get('team_order') == pos), None)
if pet_in_slot:
pet_name = pet_in_slot['nickname'] or pet_in_slot['species_name']
pet_emoji = pet_in_slot.get('emoji', 'š¾')
pet_previews += f'
{pet_emoji} {pet_name}
'
active_count += 1
else:
pet_previews += '
Empty
'
status_text = f"{active_count}/6 pets" if active_count > 0 else "Empty team"
else:
# For Teams 2 and 3, use saved configuration data
if config['team_data']:
for pos in range(1, 7):
if str(pos) in config['team_data'] and config['team_data'][str(pos)]:
pet_info = config['team_data'][str(pos)]
pet_emoji = pet_info.get('emoji', 'š¾')
pet_previews += f'
'''
# Replace placeholder with actual team cards
team_builder_content = team_builder_content.replace('', team_cards_html)
# Get the unified template
html_content = self.get_page_template(f"Team Builder - {nickname}", team_builder_content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html_content.encode())
def serve_test_teambuilder(self, nickname):
"""Serve the test team builder interface with simplified team management"""
from urllib.parse import unquote
nickname = unquote(nickname)
try:
from src.database import Database
database = Database()
# Get event loop
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
player_data = loop.run_until_complete(self.fetch_player_data(database, nickname))
if player_data is None:
self.serve_player_not_found(nickname)
return
pets = player_data['pets']
if not pets:
self.serve_test_teambuilder_no_pets(nickname)
return
self.serve_test_teambuilder_interface(nickname, pets)
except Exception as e:
print(f"Error loading test team builder for {nickname}: {e}")
self.serve_player_error(nickname, f"Error loading test team builder: {str(e)}")
def serve_test_teambuilder_no_pets(self, nickname):
"""Show message when player has no pets using unified template"""
content = f"""
š¾ Test Team Builder
Simplified team management (Test Version)
š¾ No Pets Found
{nickname}, you need to catch some pets before using the team builder!
Head to the IRC channel and use !explore to find wild pets!
"""
html_content = self.get_page_template(f"Test Team Builder - {nickname}", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html_content.encode())
def serve_test_teambuilder_interface(self, nickname, pets):
"""Serve the simplified test team builder interface"""
# Get team configurations for this player
import asyncio
try:
from src.database import Database
database = Database()
# Get event loop
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print(f"Debug: Getting player data for {nickname}")
# Get player info
player = loop.run_until_complete(database.get_player(nickname))
if not player:
self.serve_player_error(nickname, "Player not found")
return
print(f"Debug: Player found: {player}")
# Get team configurations
team_configs = loop.run_until_complete(database.get_player_team_configurations(player['id']))
print(f"Debug: Team configs: {team_configs}")
# Create the simplified interface
print(f"Debug: Creating content with {len(pets)} pets")
# TEMPORARY: Use simple content to test
content = f"""
Test Team Builder - {nickname}
Found {len(pets)} pets and {len(team_configs)} team configs
First pet: {pets[0]['nickname'] if pets else 'No pets'}
"""
print("Debug: Content created successfully")
html_content = self.get_page_template(f"Test Team Builder - {nickname}", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html_content.encode())
except Exception as e:
print(f"Error in serve_test_teambuilder_interface: {e}")
self.serve_player_error(nickname, f"Error loading interface: {str(e)}")
def create_test_teambuilder_content(self, nickname, pets, team_configs):
"""Create the simplified test team builder HTML content"""
import json
# Pre-process pets data for JavaScript - escape user data
pets_data = []
for pet in pets:
pets_data.append({
'id': pet['id'],
'name': pet['nickname'], # Will be safely encoded by safe_json()
'level': pet['level'],
'type_primary': pet['type1'],
'rarity': 1
})
pets_json = safe_json(pets_data)
# Build team cards for each configuration
team_cards_html = ""
for config in team_configs:
pet_previews = ""
if config['team_data']:
for pos in range(1, 7):
if str(pos) in config['team_data'] and config['team_data'][str(pos)]:
pet_info = config['team_data'][str(pos)]
pet_previews += f'
Choose a team to edit, make changes, and save with PIN verification
Choose Team to Edit
Select one of your 3 teams to edit. Each team can have up to 6 pets.
{team_cards_html}
Editing Team 1
š PIN Verification Required
A 6-digit PIN has been sent to you via IRC private message.
Enter the PIN below to confirm your team changes:
'''
def handle_team_save(self, nickname):
"""Handle team save request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
# Parse JSON data
import json
try:
team_data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_save_async(nickname, team_data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_team_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_save_async(self, nickname, save_data):
"""Async handler for team save"""
try:
# Extract team data and slot from new structure
if isinstance(save_data, dict) and 'teamData' in save_data:
team_data = save_data['teamData']
team_slot = save_data.get('teamSlot', 1) # Default to slot 1
else:
# Backwards compatibility - old format
team_data = save_data
team_slot = 1
# Validate team slot
if not isinstance(team_slot, int) or team_slot < 1 or team_slot > 3:
return {"success": False, "error": "Invalid team slot. Must be 1, 2, or 3"}
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate team composition
validation = await self.database.validate_team_composition(player["id"], team_data)
if not validation["valid"]:
return {"success": False, "error": validation["error"]}
# Use unified PIN service to request verification
change_data = {
'teamData': team_data,
'teamSlot': team_slot
}
result = await self.pin_service.request_verification(
player_id=player["id"],
nickname=nickname,
action_type="team_change",
action_data=change_data,
expiration_minutes=10
)
if result["success"]:
return {
"success": True,
"message": "PIN sent to your IRC private messages",
"expires_in_minutes": result["expires_in_minutes"]
}
else:
return {"success": False, "error": result.get("error", "Failed to generate PIN")}
except Exception as e:
print(f"Error in _handle_team_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_verify(self, nickname):
"""Handle PIN verification and apply team changes"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No PIN provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
# Parse JSON data
import json
try:
data = json.loads(post_data)
pin_code = data.get("pin", "").strip()
except (json.JSONDecodeError, AttributeError):
self.send_json_response({"success": False, "error": "Invalid data format"}, 400)
return
if not pin_code:
self.send_json_response({"success": False, "error": "PIN code is required"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_verify_async(nickname, pin_code))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_team_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_verify_async(self, nickname, pin_code):
"""Async handler for PIN verification"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Define callback to apply team changes
async def apply_team_changes(player_id, action_data):
# Validate team composition again before applying
validation = await self.database.validate_team_composition(player_id, action_data['teamData'])
if not validation["valid"]:
raise Exception(validation["error"])
# Apply the team changes directly using database operations
import aiosqlite
team_changes = action_data['teamData']
team_slot = action_data.get('teamSlot', 1)
# Validate team slot
if not isinstance(team_slot, int) or team_slot < 1 or team_slot > 3:
raise Exception("Invalid team slot. Must be 1, 2, or 3")
# Apply team changes atomically
async with aiosqlite.connect(self.database.db_path) as db:
try:
# Begin transaction
await db.execute("BEGIN TRANSACTION")
if team_slot == 1:
# Team 1: Apply directly to active team (immediate effect)
# First, deactivate all pets for this player
await db.execute("""
UPDATE pets SET is_active = FALSE, team_order = NULL
WHERE player_id = ?
""", (player_id,))
# Then activate and position the selected pets
for pet_id, position in team_changes.items():
if position: # If position is a number (1-6), pet is active
await db.execute("""
UPDATE pets SET is_active = TRUE, team_order = ?
WHERE id = ? AND player_id = ?
""", (position, int(pet_id), player_id))
changes_applied = sum(1 for pos in team_changes.values() if pos)
else:
# Teams 2-3: Save as configuration
import json
# Get pet details for the configuration
pets_list = []
for pet_id, position in team_changes.items():
if position:
cursor = await db.execute("""
SELECT nickname, species_name, level, happiness
FROM pets WHERE id = ? AND player_id = ?
""", (int(pet_id), player_id))
pet_row = await cursor.fetchone()
if pet_row:
pet_dict = {
"id": int(pet_id),
"nickname": pet_row[0],
"species": pet_row[1],
"level": pet_row[2],
"happiness": pet_row[3],
"position": position
}
pets_list.append(pet_dict)
# Save configuration
await db.execute("""
INSERT OR REPLACE INTO team_configurations
(player_id, slot_number, config_name, team_data, updated_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
""", (player_id, team_slot, f"Team {team_slot}", json.dumps(pets_list)))
changes_applied = len(pets_list)
await db.commit()
return {
"success": True,
"changes_applied": changes_applied,
"team_slot": team_slot
}
except Exception as e:
await db.execute("ROLLBACK")
raise Exception(f"Database error: {str(e)}")
# Use unified PIN service to verify and execute
result = await self.pin_service.verify_and_execute(
player_id=player["id"],
pin_code=pin_code,
action_type="team_change",
action_callback=apply_team_changes
)
if result["success"]:
callback_result = result.get("callback_result", {})
changes_applied = callback_result.get("changes_applied", 0) if callback_result else 0
return {
"success": True,
"message": f"Team changes applied successfully! {changes_applied} pets updated.",
"changes_applied": changes_applied
}
else:
return {"success": False, "error": result.get("error", "PIN verification failed")}
except Exception as e:
print(f"Error in _handle_team_verify_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_config_save(self, nickname, slot):
"""Handle saving team configuration to a slot"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
# Parse JSON data
import json
try:
data = json.loads(post_data)
config_name = data.get("name", f"Team Config {slot}")
team_data = data.get("team", [])
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_config_save_async(nickname, slot_num, config_name, team_data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_team_config_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_config_save_async(self, nickname, slot_num, config_name, team_data):
"""Async handler for team configuration save"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Save configuration
import json
success = await self.database.save_team_configuration(
player["id"], slot_num, config_name, json.dumps(team_data)
)
if success:
return {"success": True, "message": f"Team configuration '{config_name}' saved to slot {slot_num}"}
else:
return {"success": False, "error": "Failed to save team configuration"}
except Exception as e:
print(f"Error in _handle_team_config_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_config_load(self, nickname, slot):
"""Handle loading team configuration from a slot"""
try:
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_config_load_async(nickname, slot_num))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 404 if "not found" in result.get("error", "") else 400)
except Exception as e:
print(f"Error in handle_team_config_load: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_config_load_async(self, nickname, slot_num):
"""Async handler for team configuration load"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Load configuration
config = await self.database.load_team_configuration(player["id"], slot_num)
if config:
import json
team_data = json.loads(config["team_data"])
return {
"success": True,
"config_name": config["config_name"],
"team_data": team_data,
"updated_at": config["updated_at"]
}
else:
return {"success": False, "error": f"No team configuration found in slot {slot_num}"}
except Exception as e:
print(f"Error in _handle_team_config_load_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_config_rename(self, nickname, slot):
"""Handle renaming team configuration in a slot"""
try:
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Get the new name from request body
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
import json
try:
data = json.loads(post_data.decode('utf-8'))
new_name = data.get('new_name', '').strip()
if not new_name:
self.send_json_response({"success": False, "error": "Configuration name cannot be empty"}, 400)
return
if len(new_name) > 50:
self.send_json_response({"success": False, "error": "Configuration name too long (max 50 characters)"}, 400)
return
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_config_rename_async(nickname, slot_num, new_name))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 404 if "not found" in result.get("error", "") else 400)
except Exception as e:
print(f"Error in handle_team_config_rename: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_config_rename_async(self, nickname, slot_num, new_name):
"""Async handler for team configuration rename"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Check if configuration exists in the slot
existing_config = await self.database.load_team_configuration(player["id"], slot_num)
if not existing_config:
return {"success": False, "error": f"No team configuration found in slot {slot_num}"}
# Rename the configuration
success = await self.database.rename_team_configuration(player["id"], slot_num, new_name)
if success:
return {
"success": True,
"message": f"Configuration renamed to '{new_name}'",
"new_name": new_name
}
else:
return {"success": False, "error": "Failed to rename configuration"}
except Exception as e:
print(f"Error in _handle_team_config_rename_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_config_apply(self, nickname, slot):
"""Handle applying team configuration to active team"""
try:
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_config_apply_async(nickname, slot_num))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 404 if "not found" in result.get("error", "") else 400)
except Exception as e:
print(f"Error in handle_team_config_apply: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_config_apply_async(self, nickname, slot_num):
"""Async handler for team configuration apply"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Apply the team configuration
result = await self.database.apply_team_configuration(player["id"], slot_num)
return result
except Exception as e:
print(f"Error in _handle_team_config_apply_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_swap_request(self, nickname, slot):
"""Handle team swap request to change active team"""
try:
# Validate slot number
try:
slot_num = int(slot)
if slot_num < 1 or slot_num > 3:
self.send_json_response({"success": False, "error": "Slot must be 1, 2, or 3"}, 400)
return
except ValueError:
self.send_json_response({"success": False, "error": "Invalid slot number"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_swap_async(nickname, slot_num))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 404 if "not found" in result.get("error", "") else 400)
except Exception as e:
print(f"Error in handle_team_swap_request: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_swap_async(self, nickname, slot_num):
"""Async handler for team swapping"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Set the new active team slot
result = await self.database.set_active_team_slot(player["id"], slot_num)
return result
except Exception as e:
print(f"Error in _handle_team_swap_async: {e}")
return {"success": False, "error": str(e)}
def handle_test_team_save(self, nickname):
"""Handle test team builder save request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
team_slot = data.get('team_slot')
team_data = data.get('team_data', {})
# Validate team slot
if not isinstance(team_slot, int) or team_slot < 1 or team_slot > 3:
self.send_json_response({"success": False, "error": "Invalid team slot. Must be 1, 2, or 3"}, 400)
return
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_test_team_save_async(nickname, team_slot, team_data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_test_team_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_test_team_save_async(self, nickname, team_slot, team_data):
"""Async handler for test team save"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Generate PIN and store pending change
import json
team_json = json.dumps(team_data)
config_name = f"Team {team_slot}"
result = await self.database.save_team_configuration(
player["id"], team_slot, config_name, team_json
)
if result:
# Generate PIN for verification (using existing PIN system)
pin_result = await self.database.create_team_change_pin(
player["id"], team_json
)
if pin_result["success"]:
# Send PIN to IRC
if hasattr(self.server, 'bot') and self.server.bot:
self.server.bot.send_private_message(
nickname,
f"š Team {team_slot} Save PIN: {pin_result['pin_code']} (expires in 10 minutes)"
)
return {"success": True, "message": "PIN sent to IRC"}
else:
return {"success": False, "error": "Failed to generate PIN"}
else:
return {"success": False, "error": "Failed to save team configuration"}
except Exception as e:
print(f"Error in _handle_test_team_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_test_team_verify(self, nickname):
"""Handle test team builder PIN verification"""
try:
# Get PIN from request body
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
import json
try:
data = json.loads(post_data.decode('utf-8'))
pin_code = data.get('pin', '').strip()
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid request data"}, 400)
return
if not pin_code or len(pin_code) != 6:
self.send_json_response({"success": False, "error": "PIN must be 6 digits"}, 400)
return
# Run async verification
import asyncio
result = asyncio.run(self._handle_test_team_verify_async(nickname, pin_code))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_test_team_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_test_team_verify_async(self, nickname, pin_code):
"""Async handler for test team PIN verification"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Verify PIN
result = await self.database.verify_team_change_pin(player["id"], pin_code)
if result["success"]:
return {"success": True, "message": "Team configuration saved successfully!"}
else:
return {"success": False, "error": result.get("error", "Invalid PIN")}
except Exception as e:
print(f"Error in _handle_test_team_verify_async: {e}")
return {"success": False, "error": str(e)}
def handle_individual_team_save(self, nickname, team_slot):
"""Handle individual team save request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
team_identifier = data.get('team_identifier', team_slot)
is_active_team = data.get('is_active_team', False)
pets = data.get('pets', [])
# Validate team slot
if team_slot not in ['1', '2', '3', 'active']:
self.send_json_response({"success": False, "error": "Invalid team slot"}, 400)
return
# Convert team_slot to numeric for database operations
if team_slot == 'active':
team_slot_num = 1
is_active_team = True
else:
team_slot_num = int(team_slot)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_individual_team_save_async(nickname, team_slot_num, pets, is_active_team))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_individual_team_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_individual_team_save_async(self, nickname, team_slot, pets, is_active_team):
"""Async handler for individual team save"""
try:
# Get player
player = await self.server.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate pets exist and belong to player
if pets:
player_pets = await self.server.database.get_player_pets(player['id'])
player_pet_ids = [pet['id'] for pet in player_pets]
for pet_data in pets:
pet_id = pet_data.get('pet_id')
if not pet_id:
continue
# Convert pet_id to int for comparison with database IDs
try:
pet_id_int = int(pet_id)
except (ValueError, TypeError):
return {"success": False, "error": WebSecurity.sanitize_error_message("Invalid pet ID", pet_id)}
# Check if pet belongs to player
if pet_id_int not in player_pet_ids:
return {"success": False, "error": WebSecurity.sanitize_error_message("Pet {} not found or doesn't belong to you", str(pet_id))}
# Convert pets array to the expected format for database
# Expected format: {"pet_id": position, "pet_id": position, ...}
team_changes = {}
if pets: # Ensure pets is not None or empty
for pet_data in pets:
if isinstance(pet_data, dict): # Ensure pet_data is a dictionary
pet_id = str(pet_data.get('pet_id')) # Ensure pet_id is string
position = pet_data.get('position', False) # Position or False for inactive
if pet_id and pet_id != 'None': # Only add valid pet IDs
team_changes[pet_id] = position
# Generate PIN and store pending change
import json
team_data = {
'teamSlot': int(team_slot), # Convert to int and use expected key name
'teamData': team_changes, # Use the dictionary format expected by database
'is_active_team': is_active_team
}
# Use unified PIN service for team changes
pin_result = await self.server.pin_service.request_verification(
player_id=player["id"],
nickname=nickname,
action_type="team_change",
action_data=team_data
)
return {"success": True, "requires_pin": True}
except Exception as e:
print(f"Error in _handle_individual_team_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_individual_team_verify(self, nickname, team_slot):
"""Handle individual team PIN verification"""
try:
# Get PIN from request body
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
import json
try:
data = json.loads(post_data.decode('utf-8'))
pin_code = data.get('pin', '').strip()
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid request data"}, 400)
return
if not pin_code or len(pin_code) != 6:
self.send_json_response({"success": False, "error": "PIN must be 6 digits"}, 400)
return
# Run async verification
import asyncio
result = asyncio.run(self._handle_individual_team_verify_async(nickname, pin_code))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_individual_team_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_individual_team_verify_async(self, nickname, pin_code):
"""Async handler for individual team PIN verification"""
try:
# Get player
player = await self.server.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Verify PIN and apply changes using simplified method
result = await self.server.database.apply_individual_team_change(player["id"], pin_code)
if result["success"]:
return {"success": True, "message": "Team saved successfully!"}
else:
return {"success": False, "error": result.get("error", "Invalid PIN")}
except Exception as e:
print(f"Error in _handle_individual_team_verify_async: {e}")
return {"success": False, "error": str(e)}
def handle_pet_rename_request(self, nickname):
"""Handle pet rename request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
try:
result = asyncio.run(self._handle_pet_rename_request_async(nickname, data))
except Exception as async_error:
print(f"Async error in pet rename: {async_error}")
self.send_json_response({"success": False, "error": f"Async error: {str(async_error)}"}, 500)
return
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_pet_rename_request: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_pet_rename_request_async(self, nickname, data):
"""Async handler for pet rename request"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate required fields
if "pet_id" not in data or "new_nickname" not in data:
return {"success": False, "error": "Missing pet_id or new_nickname"}
pet_id = data["pet_id"]
new_nickname = data["new_nickname"]
# Validate nickname (same validation as database method)
if not new_nickname or len(new_nickname.strip()) == 0:
return {"success": False, "error": "Pet nickname cannot be empty"}
new_nickname = new_nickname.strip()
if len(new_nickname) > 20:
return {"success": False, "error": "Pet nickname must be 20 characters or less"}
# Check for profanity/inappropriate content (basic check)
inappropriate_words = ["admin", "bot", "system", "null", "undefined"]
if any(word in new_nickname.lower() for word in inappropriate_words):
return {"success": False, "error": "Pet nickname contains inappropriate content"}
# Verify pet ownership
import aiosqlite
async with aiosqlite.connect(self.database.db_path) as db:
cursor = await db.execute("""
SELECT nickname FROM pets WHERE id = ? AND player_id = ?
""", (pet_id, player["id"]))
pet_row = await cursor.fetchone()
if not pet_row:
return {"success": False, "error": "Pet not found or not owned by player"}
# Use unified PIN service to request verification
rename_data = {
"pet_id": pet_id,
"new_nickname": new_nickname,
"old_nickname": pet_row[0]
}
result = await self.pin_service.request_verification(
player_id=player["id"],
nickname=nickname,
action_type="pet_rename",
action_data=rename_data,
expiration_minutes=5 # Shorter expiration for pet renames
)
if result["success"]:
return {
"success": True,
"message": f"PIN sent to {nickname} via IRC. Check your messages!"
}
else:
return {"success": False, "error": result.get("error", "Failed to generate PIN")}
except Exception as e:
print(f"Error in _handle_pet_rename_request_async: {e}")
return {"success": False, "error": str(e)}
def handle_pet_rename_verify(self, nickname):
"""Handle PIN verification for pet rename"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_pet_rename_verify_async(nickname, data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_pet_rename_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_pet_rename_verify_async(self, nickname, data):
"""Async handler for pet rename PIN verification"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate required field
if "pin" not in data:
return {"success": False, "error": "Missing PIN"}
pin_code = data["pin"]
# Define callback to apply pet rename
async def apply_pet_rename(player_id, action_data):
pet_id = action_data["pet_id"]
new_nickname = action_data["new_nickname"]
# Verify pet ownership again before applying
import aiosqlite
async with aiosqlite.connect(self.database.db_path) as db:
# Check if pet still exists and belongs to player
cursor = await db.execute("""
SELECT id FROM pets WHERE id = ? AND player_id = ?
""", (pet_id, player_id))
pet_row = await cursor.fetchone()
if not pet_row:
raise Exception("Pet not found or no longer owned by player")
# Apply the rename
await db.execute("""
UPDATE pets SET nickname = ? WHERE id = ? AND player_id = ?
""", (new_nickname, pet_id, player_id))
await db.commit()
return {"new_nickname": new_nickname, "pet_id": pet_id}
# Use unified PIN service to verify and execute
result = await self.pin_service.verify_and_execute(
player_id=player["id"],
pin_code=pin_code,
action_type="pet_rename",
action_callback=apply_pet_rename
)
if result["success"]:
callback_result = result.get("callback_result", {})
new_nickname = callback_result.get("new_nickname", "Unknown")
return {
"success": True,
"message": f"Pet renamed to '{new_nickname}' successfully!",
"new_nickname": new_nickname
}
else:
return {"success": False, "error": result.get("error", "PIN verification failed")}
except Exception as e:
print(f"Error in _handle_pet_rename_verify_async: {e}")
return {"success": False, "error": str(e)}
def serve_admin_login(self):
"""Serve the admin login page"""
import sys
sys.path.append('.')
from config import ADMIN_USER
content = """
š Admin Control Panel
Authorized access only
Authentication Required
This area is restricted to bot administrators.
A PIN will be sent to your IRC private messages
"""
html = self.get_page_template("Admin Login", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html.encode())
def serve_player_login(self):
"""Serve the player login page"""
content = """
š Player Login
Access your PetBot profile
Login to Your Profile
Enter your IRC nickname to receive a login PIN via private message.
A PIN will be sent to your IRC private messages
Don't have an account? Type !start in IRC to register!
"""
html = self.get_page_template("Player Login", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html.encode())
def handle_admin_auth(self):
"""Handle admin authentication request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
nickname = data.get('nickname', '').strip()
# Verify admin user
import sys
sys.path.append('.')
from config import ADMIN_USER
if nickname.lower() != ADMIN_USER.lower():
self.send_json_response({"success": False, "error": "Access denied"}, 403)
return
# Use a special admin_user_id (-1) for admin authentication with PIN service
admin_user_id = -1
# Use unified PIN service to request verification
import asyncio
result = asyncio.run(self._request_admin_pin_async(admin_user_id, nickname))
if result["success"]:
self.send_json_response({"success": True, "message": "PIN sent via IRC"})
else:
self.send_json_response({"success": False, "error": result.get("error", "Failed to generate PIN")}, 500)
except Exception as e:
print(f"Error in handle_admin_auth: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _request_admin_pin_async(self, admin_user_id, nickname):
"""Request admin PIN using unified PIN service"""
try:
# Use unified PIN service with special admin action type
result = await self.pin_service.request_verification(
player_id=admin_user_id, # Special ID for admin
nickname=nickname,
action_type="admin_auth",
action_data={"admin_nickname": nickname},
expiration_minutes=15
)
return result
except Exception as e:
return {"success": False, "error": f"Failed to request admin PIN: {str(e)}"}
def handle_admin_verify(self):
"""Handle admin PIN verification"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
try:
import json
data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
nickname = data.get('nickname', '').strip()
pin = data.get('pin', '').strip()
# Use special admin_user_id (-1) for admin authentication
admin_user_id = -1
# Define callback to create admin session
def create_admin_session(player_id, action_data):
admin_nickname = action_data["admin_nickname"]
# Create session token
import hashlib
import time
session_token = hashlib.sha256(f"{admin_nickname}:{pin}:{time.time()}".encode()).hexdigest()
# Store session
self.admin_sessions[session_token] = {
'nickname': admin_nickname,
'expires': time.time() + (60 * 60) # 1 hour session
}
return {"session_token": session_token, "admin_nickname": admin_nickname}
# Verify PIN using unified PIN service
import asyncio
result = asyncio.run(self._verify_admin_pin_async(admin_user_id, pin, create_admin_session))
if result["success"]:
callback_result = result.get("callback_result", {})
session_token = callback_result.get("session_token")
# Set cookie
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Set-Cookie', f'admin_session={session_token}; Path=/admin; HttpOnly')
self.add_security_headers()
self.end_headers()
self.wfile.write(json.dumps({"success": True}).encode())
else:
self.send_json_response({"success": False, "error": result.get("error", "Invalid or expired PIN")}, 401)
except Exception as e:
print(f"Error in handle_admin_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _verify_admin_pin_async(self, admin_user_id, pin_code, callback):
"""Verify admin PIN using unified PIN service"""
try:
# Use unified PIN service to verify and execute
result = await self.pin_service.verify_and_execute(
player_id=admin_user_id,
pin_code=pin_code,
action_type="admin_auth",
action_callback=callback
)
return result
except Exception as e:
return {"success": False, "error": f"Admin PIN verification failed: {str(e)}"}
def handle_player_auth(self):
"""Handle player authentication request and generate PIN"""
try:
# Parse request data
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
nickname = data.get('nickname', '').strip()
if not nickname:
self.send_json_response({"success": False, "error": "Nickname is required"}, 400)
return
# Check if player exists
import asyncio
async def check_player_exists():
player = await self.database.get_player(nickname)
return player
player = asyncio.run(check_player_exists())
if not player:
self.send_json_response({"success": False, "error": "Player not found. You need to register first by using !start in IRC."}, 404)
return
# Generate PIN using PinAuthenticationService
async def generate_login_pin():
from src.pin_authentication import PinAuthenticationService
pin_service = PinAuthenticationService(self.database, self.bot)
result = await pin_service.request_verification(
player_id=player['id'],
nickname=nickname,
action_type="web_login",
action_data={"login_time": datetime.now().isoformat()},
expiration_minutes=10,
message_template="""š Web Login PIN: {pin_code}
This PIN will expire in {expiration_minutes} minutes.
Enter this PIN on the login page to access your profile.
ā ļø Keep this PIN private! Do not share it with anyone."""
)
return result
pin_result = asyncio.run(generate_login_pin())
if pin_result["success"]:
self.send_json_response({"success": True, "message": "PIN sent via IRC"})
else:
self.send_json_response({"success": False, "error": "Failed to generate PIN"}, 500)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
except Exception as e:
print(f"Error in player auth: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
def handle_player_verify(self):
"""Handle player PIN verification and create session"""
try:
# Parse request data
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
nickname = data.get('nickname', '').strip()
pin = data.get('pin', '').strip()
if not nickname or not pin:
self.send_json_response({"success": False, "error": "Nickname and PIN are required"}, 400)
return
if len(pin) != 6 or not pin.isdigit():
self.send_json_response({"success": False, "error": "PIN must be 6 digits"}, 400)
return
# Get player
import asyncio
async def verify_login_pin():
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Verify PIN using PinAuthenticationService
from src.pin_authentication import PinAuthenticationService
pin_service = PinAuthenticationService(self.database, self.bot)
result = await pin_service.verify_and_execute(
player_id=player['id'],
pin_code=pin,
action_type="web_login"
)
if result["success"]:
return {"success": True, "player": player}
else:
return result
result = asyncio.run(verify_login_pin())
if result["success"]:
# Create player session
session_token = self.create_player_session(nickname)
# Set cookie
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Set-Cookie', f'player_session={session_token}; Path=/; HttpOnly; Max-Age=86400') # 24 hours
self.add_security_headers()
self.end_headers()
response_data = {"success": True, "message": "Login successful", "nickname": nickname}
self.wfile.write(json.dumps(response_data).encode('utf-8'))
else:
self.send_json_response({"success": False, "error": result.get("error", "Invalid PIN")}, 401)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
except Exception as e:
print(f"Error in player verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
def handle_player_logout(self):
"""Handle player logout"""
try:
# Remove player session
self.logout_player_session()
# Clear cookie
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Set-Cookie', 'player_session=; Path=/; HttpOnly; Max-Age=0') # Clear cookie
self.add_security_headers()
self.end_headers()
response_data = {"success": True, "message": "Logged out successfully"}
self.wfile.write(json.dumps(response_data).encode('utf-8'))
except Exception as e:
print(f"Error in player logout: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
def check_admin_session(self):
"""Check if user has valid admin session"""
# Get cookie
cookie_header = self.headers.get('Cookie', '')
session_token = None
for cookie in cookie_header.split(';'):
if cookie.strip().startswith('admin_session='):
session_token = cookie.strip()[14:]
break
if not session_token:
return None
# Check if session is valid
import time
session = self.admin_sessions.get(session_token)
if session and session['expires'] > time.time():
# Extend session
session['expires'] = time.time() + (60 * 60)
return session['nickname']
# Invalid or expired session
if session_token in self.admin_sessions:
del self.admin_sessions[session_token]
return None
def check_player_session(self, required_nickname=None):
"""Check if user has valid player session
Args:
required_nickname: If provided, checks if the session matches this specific player
Returns:
str: Player nickname if session is valid, None otherwise
"""
# Get cookie
cookie_header = self.headers.get('Cookie', '')
session_token = None
for cookie in cookie_header.split(';'):
if cookie.strip().startswith('player_session='):
session_token = cookie.strip()[15:]
break
if not session_token:
return None
# Check if session is valid
import time
session = self.player_sessions.get(session_token)
if session and session['expires'] > time.time():
# Extend session
session['expires'] = time.time() + (60 * 60 * 24) # 24 hour session for players
# If a specific nickname is required, check if it matches
if required_nickname and session['nickname'].lower() != required_nickname.lower():
return None
return session['nickname']
# Invalid or expired session
if session_token in self.player_sessions:
del self.player_sessions[session_token]
return None
def create_player_session(self, nickname):
"""Create a new player session
Args:
nickname: Player's nickname
Returns:
str: Session token
"""
import hashlib
import time
session_token = hashlib.sha256(f"{nickname}:player:{time.time()}".encode()).hexdigest()
# Store session
self.player_sessions[session_token] = {
'nickname': nickname,
'expires': time.time() + (60 * 60 * 24) # 24 hour session
}
return session_token
def logout_player_session(self, nickname=None):
"""Logout player session(s)
Args:
nickname: If provided, logout only sessions for this player
"""
if nickname:
# Remove sessions for specific player
to_remove = []
for token, session in self.player_sessions.items():
if session['nickname'].lower() == nickname.lower():
to_remove.append(token)
for token in to_remove:
del self.player_sessions[token]
else:
# Get current session from cookie and remove it
cookie_header = self.headers.get('Cookie', '')
for cookie in cookie_header.split(';'):
if cookie.strip().startswith('player_session='):
session_token = cookie.strip()[15:]
if session_token in self.player_sessions:
del self.player_sessions[session_token]
break
def redirect_to_login(self, message=None):
"""Redirect user to login page with optional message"""
login_url = "/login"
if message:
# You could encode the message in URL params if needed
# For now, just redirect to login
pass
self.send_response(302)
self.send_header('Location', login_url)
self.add_security_headers()
self.end_headers()
def serve_admin_dashboard(self):
"""Serve the admin dashboard page"""
# Check admin session
admin_user = self.check_admin_session()
if not admin_user:
# Redirect to login
self.send_response(302)
self.send_header('Location', '/admin')
self.add_security_headers()
self.end_headers()
return
# Get system statistics
import asyncio
stats = asyncio.run(self._get_admin_stats_async())
content = f"""
š® Admin Control Panel
Welcome, {admin_user}!
š System Statistics
š„ Total Players
{stats['total_players']}
š¾ Total Pets
{stats['total_pets']}
āļø Active Battles
{stats['active_battles']}
š¾ Database Size
{stats['db_size']}
š„ Player Management
Search Player
š§ System Controls
Database Management
IRC Management
š Game Management
Weather Control
Rate Limiting
"""
html = self.get_page_template("Admin Dashboard", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.add_security_headers()
self.end_headers()
self.wfile.write(html.encode())
def _get_location_options(self, locations):
"""Generate HTML options for locations"""
options = ""
for location in locations:
options += f''
return options
async def _get_admin_stats_async(self):
"""Get admin dashboard statistics"""
import aiosqlite
import os
stats = {
'total_players': 0,
'total_pets': 0,
'active_battles': 0,
'db_size': '0 MB',
'total_achievements': 0,
'total_badges': 0,
'total_items': 0,
'locations': []
}
try:
async with aiosqlite.connect(self.database.db_path) as db:
# Get player count
cursor = await db.execute("SELECT COUNT(*) FROM players")
stats['total_players'] = (await cursor.fetchone())[0]
# Get pet count
cursor = await db.execute("SELECT COUNT(*) FROM pets")
stats['total_pets'] = (await cursor.fetchone())[0]
# Get active battles (if table exists)
try:
cursor = await db.execute("SELECT COUNT(*) FROM active_battles")
stats['active_battles'] = (await cursor.fetchone())[0]
except:
stats['active_battles'] = 0
# Get achievement count
try:
cursor = await db.execute("SELECT COUNT(*) FROM player_achievements")
stats['total_achievements'] = (await cursor.fetchone())[0]
except:
stats['total_achievements'] = 0
# Get badge count (if table exists)
try:
cursor = await db.execute("SELECT COUNT(*) FROM player_badges")
stats['total_badges'] = (await cursor.fetchone())[0]
except:
stats['total_badges'] = 0
# Get item count (if table exists)
try:
cursor = await db.execute("SELECT COUNT(*) FROM player_items")
stats['total_items'] = (await cursor.fetchone())[0]
except:
stats['total_items'] = 0
# Get locations
cursor = await db.execute("SELECT id, name FROM locations ORDER BY name")
locations = await cursor.fetchall()
stats['locations'] = [{'id': loc[0], 'name': loc[1]} for loc in locations]
# Get database size
if os.path.exists(self.database.db_path):
size_bytes = os.path.getsize(self.database.db_path)
stats['db_size'] = f"{size_bytes / 1024 / 1024:.2f} MB"
except Exception as e:
print(f"Error getting admin stats: {e}")
return stats
def handle_admin_api(self, endpoint):
"""Handle admin API requests"""
print(f"Admin API request: {endpoint}")
# Check admin session
admin_user = self.check_admin_session()
if not admin_user:
print(f"Unauthorized admin API request for endpoint: {endpoint}")
self.send_json_response({"success": False, "error": "Unauthorized"}, 401)
return
print(f"Authorized admin API request from {admin_user} for endpoint: {endpoint}")
# Get POST data if it's a POST request
content_length = int(self.headers.get('Content-Length', 0))
post_data = {}
if content_length > 0 and self.command == 'POST':
try:
import json
post_data = json.loads(self.rfile.read(content_length).decode('utf-8'))
except:
pass
# Parse query parameters for GET requests
query_params = {}
if '?' in endpoint:
endpoint, query_string = endpoint.split('?', 1)
for param in query_string.split('&'):
if '=' in param:
key, value = param.split('=', 1)
query_params[key] = value
# Route to appropriate handler
if endpoint.startswith('player/'):
# Handle player endpoints - support both info and updates
player_path = endpoint[7:] # Remove 'player/' prefix
if player_path.endswith('/update'):
# Player update endpoint
player_name = player_path[:-7] # Remove '/update' suffix
if self.command == 'POST':
self.handle_admin_player_update(player_name, post_data)
else:
self.send_json_response({"success": False, "error": "Method not allowed"}, 405)
else:
# Player info endpoint
self.handle_admin_player_get(player_path)
elif endpoint == 'backup':
if self.command == 'POST':
self.handle_admin_backup_create()
else:
self.send_json_response({"success": False, "error": "Method not allowed"}, 405)
elif endpoint == 'backups':
self.handle_admin_backups_list()
elif endpoint == 'broadcast':
self.handle_admin_broadcast(post_data)
elif endpoint == 'irc-status':
print(f"IRC status endpoint hit!")
# Add a simple test first
try:
print(f"Calling handle_admin_irc_status...")
self.handle_admin_irc_status()
print(f"handle_admin_irc_status completed")
except Exception as e:
print(f"Exception in IRC status handler: {e}")
import traceback
traceback.print_exc()
# Send a simple fallback response
self.send_json_response({
"success": False,
"error": "IRC status handler failed",
"details": str(e)
}, 500)
elif endpoint == 'weather':
self.handle_admin_weather_set(post_data)
elif endpoint == 'rate-stats':
self.handle_admin_rate_stats(query_params)
elif endpoint == 'rate-reset':
self.handle_admin_rate_reset(post_data)
elif endpoint == 'test':
# Simple test endpoint
self.send_json_response({
"success": True,
"message": "Test endpoint working",
"timestamp": str(datetime.now())
})
else:
self.send_json_response({"success": False, "error": "Unknown endpoint"}, 404)
def handle_admin_player_search(self, data):
"""Search for a player"""
nickname = data.get('nickname', '').strip()
if not nickname:
self.send_json_response({"success": False, "error": "No nickname provided"})
return
import asyncio
player = asyncio.run(self.database.get_player(nickname))
if player:
# Get additional stats
pet_count = asyncio.run(self._get_player_pet_count_async(player['id']))
self.send_json_response({
"success": True,
"player": {
"nickname": player['nickname'],
"level": player['level'],
"money": player['money'],
"pet_count": pet_count,
"experience": player['experience']
}
})
else:
self.send_json_response({"success": False, "error": "Player not found"})
async def _get_player_pet_count_async(self, player_id):
"""Get player's pet count"""
import aiosqlite
async with aiosqlite.connect(self.database.db_path) as db:
cursor = await db.execute("SELECT COUNT(*) FROM pets WHERE player_id = ?", (player_id,))
return (await cursor.fetchone())[0]
def handle_admin_backup_create(self):
"""Create a database backup"""
if self.bot and hasattr(self.bot, 'backup_manager'):
import asyncio
result = asyncio.run(self.bot.backup_manager.create_backup("manual", "Admin web interface"))
if result['success']:
self.send_json_response({
"success": True,
"message": f"Backup created: {result['filename']}"
})
else:
self.send_json_response({
"success": False,
"error": result.get('error', 'Backup failed')
})
else:
self.send_json_response({
"success": False,
"error": "Backup system not available"
})
def handle_admin_weather_set(self, data):
"""Set weather for a location"""
location = data.get('location', '').strip()
weather = data.get('weather', '').strip()
if not location or not weather:
self.send_json_response({"success": False, "error": "Missing location or weather"})
return
# Execute weather change using database directly
try:
import asyncio
result = asyncio.run(self._set_weather_for_location_async(location, weather))
if result.get("success"):
self.send_json_response({
"success": True,
"message": result.get("message", f"Weather set to {weather} in {location}")
})
else:
self.send_json_response({
"success": False,
"error": result.get("error", "Failed to set weather")
})
except Exception as e:
print(f"Error setting weather: {e}")
self.send_json_response({
"success": False,
"error": f"Weather system error: {str(e)}"
})
async def _set_weather_for_location_async(self, location, weather):
"""Async helper to set weather for location"""
try:
import json
import random
# Load weather patterns
try:
with open("config/weather_patterns.json", "r") as f:
weather_data = json.load(f)
except FileNotFoundError:
return {
"success": False,
"error": "Weather configuration file not found"
}
# Validate weather type
weather_types = list(weather_data["weather_types"].keys())
if weather not in weather_types:
return {
"success": False,
"error": f"Invalid weather type. Valid types: {', '.join(weather_types)}"
}
weather_config = weather_data["weather_types"][weather]
# Calculate duration (3 hours = 180 minutes)
duration_range = weather_config.get("duration_minutes", [90, 180])
duration = random.randint(duration_range[0], duration_range[1])
end_time = datetime.now() + timedelta(minutes=duration)
# Set weather for the location
result = await self.database.set_weather_for_location(
location, weather, end_time.isoformat(),
weather_config.get("spawn_modifier", 1.0),
",".join(weather_config.get("affected_types", []))
)
if result.get("success"):
# Announce weather change if it actually changed and bot is available
if result.get("changed") and self.bot and hasattr(self.bot, 'game_engine'):
await self.bot.game_engine.announce_weather_change(
location, result.get("previous_weather"), weather, "web"
)
return {
"success": True,
"message": f"Weather set to {weather} in {location} for {duration} minutes"
}
else:
return {
"success": False,
"error": result.get("error", "Failed to set weather")
}
except Exception as e:
print(f"Error in _set_weather_for_location_async: {e}")
return {
"success": False,
"error": str(e)
}
def handle_admin_announce(self, data):
"""Send announcement to IRC"""
message = data.get('message', '').strip()
if not message:
self.send_json_response({"success": False, "error": "No message provided"})
return
if self.bot and hasattr(self.bot, 'send_message_sync'):
try:
# Send to main channel
self.bot.send_message_sync("#petz", f"š¢ ANNOUNCEMENT: {message}")
self.send_json_response({"success": True, "message": "Announcement sent"})
except Exception as e:
self.send_json_response({"success": False, "error": str(e)})
else:
self.send_json_response({"success": False, "error": "IRC not available"})
def handle_admin_monitor(self, monitor_type):
"""Get monitoring data"""
# TODO: Implement real monitoring data
self.send_json_response({
"success": True,
"type": monitor_type,
"data": []
})
def handle_admin_player_get(self, nickname):
"""Get player information"""
if not nickname:
self.send_json_response({"success": False, "error": "No nickname provided"}, 400)
return
try:
import asyncio
result = asyncio.run(self._get_player_info_async(nickname))
if result["success"]:
self.send_json_response(result)
else:
self.send_json_response(result, 404)
except Exception as e:
print(f"Error in handle_admin_player_get: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
def handle_admin_player_update(self, nickname, data):
"""Update player information"""
if not nickname:
self.send_json_response({"success": False, "error": "No nickname provided"}, 400)
return
if not data:
self.send_json_response({"success": False, "error": "No update data provided"}, 400)
return
try:
import asyncio
result = asyncio.run(self._update_player_async(nickname, data))
if result["success"]:
self.send_json_response(result)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_admin_player_update: {e}")
import traceback
traceback.print_exc()
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _update_player_async(self, nickname, data):
"""Update player information asynchronously"""
try:
# Validate input data
allowed_fields = ['level', 'experience', 'money']
updates = {}
for field in allowed_fields:
if field in data:
value = data[field]
# Validate each field
if field == 'level':
if not isinstance(value, int) or value < 1 or value > 100:
return {"success": False, "error": "Level must be between 1 and 100"}
updates[field] = value
elif field == 'experience':
if not isinstance(value, int) or value < 0:
return {"success": False, "error": "Experience cannot be negative"}
updates[field] = value
elif field == 'money':
if not isinstance(value, int) or value < 0:
return {"success": False, "error": "Money cannot be negative"}
updates[field] = value
if not updates:
return {"success": False, "error": "No valid fields to update"}
# Check if player exists
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Update player data
success = await self.database.update_player_admin(player["id"], updates)
if success:
return {
"success": True,
"message": f"Updated {', '.join(updates.keys())} for player {nickname}",
"updated_fields": list(updates.keys())
}
else:
return {"success": False, "error": "Failed to update player data"}
except Exception as e:
print(f"Error updating player: {e}")
return {"success": False, "error": str(e)}
async def _get_player_info_async(self, nickname):
"""Get player information asynchronously"""
try:
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Get additional stats
pets = await self.database.get_player_pets(player["id"])
# Get location name if current_location_id is set
location_name = "Unknown"
if player.get("current_location_id"):
try:
location_data = await self.database.get_location_by_id(player["current_location_id"])
if location_data:
location_name = location_data["name"]
else:
location_name = f"Location ID {player['current_location_id']}"
except Exception as loc_error:
print(f"Error resolving location: {loc_error}")
location_name = f"Location ID {player['current_location_id']}"
# Get team composition
team_info = await self.database.get_team_composition(player["id"])
return {
"success": True,
"player": {
"nickname": player["nickname"],
"level": player["level"],
"experience": player["experience"],
"money": player["money"],
"current_location": location_name,
"pet_count": len(pets),
"active_pets": team_info.get("active_pets", 0),
"total_pets": team_info.get("total_pets", 0)
}
}
except Exception as e:
print(f"Error getting player info: {e}")
return {"success": False, "error": str(e)}
def handle_admin_backups_list(self):
"""List available backups"""
try:
import os
backup_dir = "backups"
if not os.path.exists(backup_dir):
self.send_json_response({
"success": True,
"backups": []
})
return
backups = []
for filename in os.listdir(backup_dir):
if filename.endswith('.gz'):
filepath = os.path.join(backup_dir, filename)
size = os.path.getsize(filepath)
# Convert size to human readable format
if size < 1024:
size_str = f"{size} B"
elif size < 1024 * 1024:
size_str = f"{size / 1024:.1f} KB"
else:
size_str = f"{size / (1024 * 1024):.1f} MB"
backups.append({
"name": filename,
"size": size_str
})
# Sort by name (newest first)
backups.sort(key=lambda x: x['name'], reverse=True)
self.send_json_response({
"success": True,
"backups": backups[:10] # Show only last 10 backups
})
except Exception as e:
print(f"Error listing backups: {e}")
self.send_json_response({"success": False, "error": str(e)}, 500)
def handle_admin_broadcast(self, data):
"""Send IRC broadcast message"""
message = data.get('message', '').strip()
if not message:
self.send_json_response({"success": False, "error": "No message provided"}, 400)
return
try:
# Send to IRC channel via bot
if self.bot and hasattr(self.bot, 'send_message_sync'):
from config import IRC_CONFIG
channel = IRC_CONFIG.get("channel", "#petz")
self.bot.send_message_sync(channel, f"š¢ Admin Announcement: {message}")
self.send_json_response({
"success": True,
"message": "Broadcast sent successfully"
})
else:
self.send_json_response({"success": False, "error": "IRC bot not available"}, 500)
except Exception as e:
print(f"Error sending broadcast: {e}")
self.send_json_response({"success": False, "error": str(e)}, 500)
def handle_admin_irc_status(self):
"""Get comprehensive IRC connection status and activity"""
try:
print(f"IRC status request - checking bot availability...")
bot = self.bot
print(f"Bot instance: {bot}")
if bot and hasattr(bot, 'connection_manager') and bot.connection_manager:
connection_manager = bot.connection_manager
print(f"Connection manager: {connection_manager}")
if hasattr(connection_manager, 'get_connection_stats'):
try:
stats = connection_manager.get_connection_stats()
print(f"Got comprehensive connection stats")
# Return comprehensive IRC status
response_data = {
"success": True,
"irc_status": stats
}
print(f"Sending comprehensive IRC response")
self.send_json_response(response_data)
except Exception as stats_error:
print(f"Error getting connection stats: {stats_error}")
import traceback
traceback.print_exc()
self.send_json_response({
"success": False,
"error": f"Failed to get connection stats: {str(stats_error)}"
}, 500)
else:
print("Connection manager has no get_connection_stats method")
self.send_json_response({
"success": False,
"error": "Connection manager missing get_connection_stats method"
}, 500)
else:
print("No bot instance or connection manager available")
self.send_json_response({
"success": True,
"irc_status": {
"connected": False,
"state": "disconnected",
"error": "Bot instance or connection manager not available"
}
})
except Exception as e:
print(f"Error getting IRC status: {e}")
import traceback
traceback.print_exc()
try:
self.send_json_response({"success": False, "error": str(e)}, 500)
except Exception as json_error:
print(f"Failed to send JSON error response: {json_error}")
# Send a basic HTTP error response if JSON fails
self.send_error(500, f"IRC status error: {str(e)}")
def handle_admin_rate_stats(self, query_params):
"""Get rate limiting statistics"""
try:
username = query_params.get('user', '').strip()
if self.bot and hasattr(self.bot, 'rate_limiter') and self.bot.rate_limiter:
if username:
# Get stats for specific user
user_stats = self.bot.rate_limiter.get_user_stats(username)
self.send_json_response({
"success": True,
"stats": {
"violations": user_stats.get("violations", 0),
"banned": user_stats.get("banned", False),
"last_violation": user_stats.get("last_violation", "Never")
}
})
else:
# Get global stats
global_stats = self.bot.rate_limiter.get_global_stats()
self.send_json_response({
"success": True,
"stats": {
"total_users": global_stats.get("total_users", 0),
"active_bans": global_stats.get("active_bans", 0),
"total_violations": global_stats.get("total_violations", 0)
}
})
else:
self.send_json_response({
"success": True,
"stats": {
"total_users": 0,
"active_bans": 0,
"total_violations": 0
}
})
except Exception as e:
print(f"Error getting rate stats: {e}")
self.send_json_response({"success": False, "error": str(e)}, 500)
def handle_admin_rate_reset(self, data):
"""Reset rate limiting for user or globally"""
try:
username = data.get('user', '').strip() if data.get('user') else None
if self.bot and hasattr(self.bot, 'rate_limiter') and self.bot.rate_limiter:
if username:
# Reset for specific user
success = self.bot.rate_limiter.reset_user(username)
if success:
self.send_json_response({
"success": True,
"message": f"Rate limits reset for user: {username}"
})
else:
self.send_json_response({"success": False, "error": f"User {username} not found"}, 404)
else:
# Global reset
self.bot.rate_limiter.reset_all()
self.send_json_response({
"success": True,
"message": "All rate limits reset successfully"
})
else:
self.send_json_response({"success": False, "error": "Rate limiter not available"}, 500)
except Exception as e:
print(f"Error resetting rate limits: {e}")
self.send_json_response({"success": False, "error": str(e)}, 500)
# ================================================================
# NEW TEAM BUILDER METHODS - Separated Team Management
# ================================================================
def serve_team_selection_hub(self, nickname):
"""Serve the team selection hub showing all teams with swap options."""
try:
# Get database and bot from server
database = self.server.database if hasattr(self.server, 'database') else None
bot = self.server.bot if hasattr(self.server, 'bot') else None
if not database:
self.send_error(500, "Database not available")
return
# Get team management service
if not hasattr(self, 'team_service'):
from src.team_management import TeamManagementService
from src.pin_authentication import PinAuthenticationService
pin_service = PinAuthenticationService(database, bot)
self.team_service = TeamManagementService(database, pin_service)
# Get team overview
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
player = loop.run_until_complete(database.get_player(nickname))
if not player:
self.send_error(404, "Player not found")
return
team_overview = loop.run_until_complete(self.team_service.get_team_overview(player["id"]))
if not team_overview["success"]:
self.send_error(500, f"Failed to load teams: {team_overview['error']}")
return
teams = team_overview["teams"]
finally:
loop.close()
# Generate team hub HTML
content = self.generate_team_hub_content(nickname, teams)
full_page = self.get_page_template(f"Team Management - {nickname}", content, "teambuilder")
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.add_security_headers()
self.end_headers()
self.wfile.write(full_page.encode('utf-8'))
except Exception as e:
print(f"Error serving team selection hub: {e}")
self.send_error(500, "Internal server error")
def serve_individual_team_editor(self, nickname, team_identifier):
"""Serve individual team editor page."""
try:
# Get database and bot from server
database = self.server.database if hasattr(self.server, 'database') else None
bot = self.server.bot if hasattr(self.server, 'bot') else None
if not database:
self.send_error(500, "Database not available")
return
# Get team management service
if not hasattr(self, 'team_service'):
from src.team_management import TeamManagementService
from src.pin_authentication import PinAuthenticationService
pin_service = PinAuthenticationService(database, bot)
self.team_service = TeamManagementService(database, pin_service)
# Get team data
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
player = loop.run_until_complete(database.get_player(nickname))
if not player:
self.send_error(404, "Player not found")
return
team_data = loop.run_until_complete(
self.team_service.get_individual_team_data(player["id"], team_identifier)
)
if not team_data["success"]:
self.send_error(500, f"Failed to load team: {team_data['error']}")
return
# Get player's pets for the editor
player_pets = loop.run_until_complete(database.get_player_pets(player["id"]))
finally:
loop.close()
# Generate individual team editor HTML
content = self.generate_individual_team_editor_content(nickname, team_identifier, team_data, player_pets)
full_page = self.get_page_template(f"{team_data['team_name']} - {nickname}", content, "teambuilder")
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.add_security_headers()
self.end_headers()
self.wfile.write(full_page.encode('utf-8'))
except Exception as e:
print(f"Error serving individual team editor: {e}")
self.send_error(500, "Internal server error")
def generate_team_hub_content(self, nickname, teams):
"""Generate HTML content for team selection hub."""
return f'''
š Team Management Hub
Manage your teams and swap configurations with PIN verification
'''
def _generate_team_preview(self, team_data, team_identifier):
"""Generate preview for a single team."""
team_name = team_data.get('name', f'Team {team_identifier}')
pet_count = team_data.get('count', 0)
is_active = team_data.get('is_active', False)
if is_active:
actions = f'''
āļø Edit Active Team
'''
else:
actions = f'''
āļø Edit Team {team_identifier}
'''
status = "š ACTIVE TEAM" if is_active else f"š¾ Saved Team"
return f'''
{team_name}
{status}
š¾ {pet_count} pets
{actions}
'''
def _generate_active_team_display(self, active_team_data, nickname):
"""Generate detailed display for active team with individual pet cards."""
pets_dict = active_team_data.get('pets', {})
pet_count = active_team_data.get('count', 0)
# Convert dictionary format to list for consistent processing
pets = []
if isinstance(pets_dict, dict):
# Active team format: {"1": {pet_data}, "2": {pet_data}}
for position, pet_data in pets_dict.items():
if pet_data:
pet_data['team_order'] = int(position)
pets.append(pet_data)
elif isinstance(pets_dict, list):
# Saved team format: [{pet_data}, {pet_data}]
pets = pets_dict
if not pets or pet_count == 0:
return f'''
'''
# Generate individual pet cards for active team
pet_cards = []
for pet in pets:
# Handle both active team format and saved team format
name = pet.get('nickname') or pet.get('name') or pet.get('species_name', 'Unknown')
level = pet.get('level', 1)
hp = pet.get('hp', 0)
max_hp = pet.get('max_hp', 0)
attack = pet.get('attack', 0)
defense = pet.get('defense', 0)
speed = pet.get('speed', 0)
happiness = pet.get('happiness', 50)
species_name = pet.get('species_name', 'Unknown')
# Handle type field variations between active and saved teams
type1 = (pet.get('type1') or pet.get('type_primary') or
pet.get('type1', 'Normal'))
type2 = (pet.get('type2') or pet.get('type_secondary') or
pet.get('type2'))
team_order = pet.get('team_order', 0)
# Calculate HP percentage for health bar
hp_percent = (hp / max_hp) * 100 if max_hp > 0 else 0
hp_color = "#4CAF50" if hp_percent > 60 else "#FF9800" if hp_percent > 25 else "#f44336"
# Happiness emoji
if happiness >= 80:
happiness_emoji = "š"
elif happiness >= 60:
happiness_emoji = "š"
elif happiness >= 40:
happiness_emoji = "š"
elif happiness >= 20:
happiness_emoji = "š"
else:
happiness_emoji = "š¢"
# Type display
type_display = type1
if type2:
type_display += f"/{type2}"
pet_card = f'''
'''
def generate_individual_team_editor_content(self, nickname, team_identifier, team_data, player_pets):
"""Generate HTML content for individual team editor."""
team_name = team_data.get('team_name', 'Unknown Team')
is_active_team = team_data.get('is_active_team', False)
team_pets = team_data.get('pets', [])
# Separate pets into team and storage
team_pet_ids = [p['id'] for p in team_pets]
storage_pets = [p for p in player_pets if p['id'] not in team_pet_ids]
# Helper function to create detailed pet card
def make_pet_card(pet, in_team=False):
name = pet.get('nickname') or pet.get('species_name', 'Unknown')
pet_id = pet.get('id', 0)
level = pet.get('level', 1)
species = pet.get('species_name', 'Unknown')
# Type info
type_str = pet.get('type1', 'Normal')
if pet.get('type2'):
type_str += f"/{pet['type2']}"
# HP calculation
hp = pet.get('hp', 0)
max_hp = pet.get('max_hp', 1)
hp_percent = (hp / max_hp * 100) if max_hp > 0 else 0
hp_color = "#4CAF50" if hp_percent > 60 else "#FF9800" if hp_percent > 25 else "#f44336"
# Get pet moves
moves = pet.get('moves', [])
moves_html = ''
if moves:
moves_html = '
Moves: ' + ', '.join([m.get('name', 'Unknown') for m in moves[:4]]) + '
'
return f"""
{pet.get('emoji', 'š¾')} {name}
Lv.{level}
{species}
{type_str}
HP{hp}/{max_hp}
ATK{pet.get('attack', 0)}
DEF{pet.get('defense', 0)}
SPD{pet.get('speed', 0)}
{moves_html}
{'š' if pet.get('happiness', 50) > 70 else 'š' if pet.get('happiness', 50) > 40 else 'š'}Happiness: {pet.get('happiness', 50)}/100
"""
# Create team slots (6 slots)
team_slots_html = ''
for i in range(1, 7):
# Find pet in this slot
slot_pet = None
for pet in team_pets:
if pet.get('team_order') == i or (i == 1 and not any(p.get('team_order') == 1 for p in team_pets) and team_pets and pet == team_pets[0]):
slot_pet = pet
break
if slot_pet:
team_slots_html += f"""
#{i}
{make_pet_card(slot_pet, True)}
"""
else:
team_slots_html += f"""
#{i}
āDrop pet here
"""
# Create storage pet cards
storage_cards_html = ''.join([make_pet_card(pet, False) for pet in storage_pets])
if not storage_cards_html:
storage_cards_html = '
No pets in storage. All your pets are in teams!
'
return f'''
āļø {team_name}
{"ā” Active battle team" if is_active_team else f"š¾ Saved team configuration (Slot {team_identifier})"}
'''
class PetBotWebServer:
"""Standalone web server for PetBot"""
def __init__(self, database=None, port=8080, bot=None):
self.database = database or Database()
self.port = port
self.bot = bot
self.server = None
# Initialize PIN authentication service
self.pin_service = PinAuthenticationService(self.database, self.bot)
# Add custom message template for admin authentication
self.pin_service.add_message_template("admin_auth", """š Admin Panel Verification PIN: {pin_code}
This PIN will expire in {expiration_minutes} minutes.
Enter this PIN at the admin login page to access the control panel.
ā ļø This is an administrative access PIN. Keep it private!""")
# Add custom message template for pet renames with shorter expiration
self.pin_service.add_message_template("pet_rename", """š¾ Pet Rename Verification PIN: {pin_code}
This PIN will expire in {expiration_minutes} minutes.
Enter this PIN on the pet management page to confirm your pet rename.
ā ļø Keep this PIN private! Do not share it with anyone.""")
def run(self):
"""Start the web server"""
self.server = HTTPServer(('0.0.0.0', self.port), PetBotRequestHandler)
self.server.database = self.database
self.server.bot = self.bot
self.server.pin_service = self.pin_service
print(f'š Starting PetBot web server on http://0.0.0.0:{self.port}')
print(f'š” Accessible from WSL at: http://172.27.217.61:{self.port}')
print(f'š” Accessible from Windows at: http://localhost:{self.port}')
print('')
try:
self.server.serve_forever()
except KeyboardInterrupt:
print('\nš Server stopped')
finally:
self.server.server_close()
def start_in_thread(self):
"""Start the web server in a separate thread"""
import threading
def run_server():
self.server = HTTPServer(('0.0.0.0', self.port), PetBotRequestHandler)
self.server.database = self.database
self.server.bot = self.bot
self.server.pin_service = self.pin_service
try:
self.server.serve_forever()
except Exception as e:
print(f"Web server error: {e}")
finally:
self.server.server_close()
self.server_thread = threading.Thread(target=run_server, daemon=True)
self.server_thread.start()
def stop(self):
"""Stop the web server"""
if self.server:
print('š Stopping web server...')
self.server.shutdown()
self.server.server_close()
def run_standalone():
"""Run the web server in standalone mode"""
import sys
port = 8080
if len(sys.argv) > 1:
try:
port = int(sys.argv[1])
except ValueError:
print('Usage: python webserver.py [port]')
sys.exit(1)
server = PetBotWebServer(port=port)
print('š PetBot Web Server')
print('=' * 50)
print(f'Port: {port}')
print('')
print('š Local URLs:')
print(f' http://localhost:{port}/ - Game Hub (local)')
print(f' http://localhost:{port}/help - Command Help (local)')
print(f' http://localhost:{port}/players - Player List (local)')
print(f' http://localhost:{port}/leaderboard - Leaderboard (local)')
print(f' http://localhost:{port}/locations - Locations (local)')
print('')
print('š Public URLs:')
print(' http://petz.rdx4.com/ - Game Hub')
print(' http://petz.rdx4.com/help - Command Help')
print(' http://petz.rdx4.com/players - Player List')
print(' http://petz.rdx4.com/leaderboard - Leaderboard')
print(' http://petz.rdx4.com/locations - Locations')
print('')
print('š± Example Player Profile:')
print(' http://petz.rdx4.com/player/megasconed')
print('')
print('āļø Press Ctrl+C to stop the server')
print('')
try:
server.run()
except KeyboardInterrupt:
print('\nš Shutting down web server...')
server.stop()
if __name__ == '__main__':
run_standalone()