#!/usr/bin/env python3
"""
PetBot Web Server
Provides web interface for bot data including help, player stats, and pet collections
"""
import os
import sys
import asyncio
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from threading import Thread
import time
# Add the project directory to the path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from src.database import Database
from src.rate_limiter import RateLimiter, CommandCategory
class PetBotRequestHandler(BaseHTTPRequestHandler):
"""HTTP request handler for PetBot web server"""
@property
def database(self):
"""Get database instance from server"""
return self.server.database
@property
def bot(self):
"""Get bot instance from server"""
return getattr(self.server, 'bot', None)
@property
def rate_limiter(self):
"""Get rate limiter from bot instance"""
bot = self.bot
return getattr(bot, 'rate_limiter', None) if bot else None
def get_client_ip(self):
"""Get client IP address for rate limiting"""
# Check for X-Forwarded-For header (in case of proxy)
forwarded_for = self.headers.get('X-Forwarded-For')
if forwarded_for:
return forwarded_for.split(',')[0].strip()
# Check for X-Real-IP header
real_ip = self.headers.get('X-Real-IP')
if real_ip:
return real_ip.strip()
# Fallback to client address
return self.client_address[0]
def check_rate_limit(self):
"""Check rate limit for web requests"""
if not self.rate_limiter:
return True, None
client_ip = self.get_client_ip()
# Use IP address as user identifier for web requests
user_identifier = f"web:{client_ip}"
# Run async rate limit check
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
allowed, message = loop.run_until_complete(
self.rate_limiter.check_rate_limit(user_identifier, CommandCategory.WEB)
)
return allowed, message
finally:
loop.close()
def send_rate_limit_error(self, message):
"""Send rate limit error response"""
self.send_response(429)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.send_header('Retry-After', '60')
self.end_headers()
content = f"""
"""
html = self.get_page_template("PetBot Game Hub", content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html.encode())
def serve_help(self):
"""Serve the help page using unified template"""
content = """
๐ PetBot Commands
Complete guide to Pokemon-style pet collecting in IRC
๐ Getting Started
!start
Begin your pet collecting journey! Creates your trainer account and gives you your first starter pet.
Example: !start
!help
Get a link to this comprehensive command reference page.
Example: !help
!stats
View your basic trainer information including level, experience, and money.
Example: !stats
๐ Exploration & Travel
!explore
Search your current location for wild pets or items. You might find pets to battle/catch or discover useful items!
Example: !explore
!travel <location>
Move to a different location. Each area has unique pets and gyms. Some locations require achievements to unlock.
Example: !travel whispering woods
!weather
Check the current weather effects in your location. Weather affects which pet types spawn more frequently.
Example: !weather
!where / !location
See which location you're currently in and get information about the area.
Example: !where
๐บ๏ธ Available Locations
Starter Town - Peaceful starting area (Fire/Water/Grass pets)
Whispering Woods - Ancient forest (Grass pets + new species: Vinewrap, Bloomtail)
Electric Canyon - Charged valley (Electric/Rock pets)
Thunderstorm - 2.0x Electric spawns (30-60 minutes)
Blizzard - 1.7x Ice/Water spawns (1-2 hours)
Earthquake - 1.8x Rock spawns (30-90 minutes)
Calm - Normal spawns (1.5-3 hours)
โ๏ธ Battle System
!catch / !capture
Attempt to catch a wild pet that appeared during exploration. Success depends on the pet's level and rarity.
Example: !catch
!battle
Start a turn-based battle with a wild pet. Defeat it to gain experience and money for your active pet.
Example: !battle
!attack <move>
Use a specific move during battle. Each move has different power, type, and effects.
Example: !attack flamethrower
!moves
View all available moves for your active pet, including their types and power levels.
Example: !moves
!flee
Attempt to escape from the current battle. Not always successful!
Example: !flee
๐๏ธ Gym Battles NEW!
!gym
List all gyms in your current location with your progress. Shows victories and next difficulty level.
Example: !gym
!gym list
Show all gyms across all locations with your badge collection progress.
Example: !gym list
!gym challenge "<name>"
Challenge a gym leader! You must be in the same location as the gym. Difficulty increases with each victory.
Example: !gym challenge "Forest Guardian"
!gym info "<name>"
Get detailed information about a gym including leader, theme, team, and badge details.
Example: !gym info "Storm Master"
๐ก Gym Strategy: Each gym specializes in a specific type. Bring pets with type advantages! The more you beat a gym, the harder it gets, but the better the rewards!
Access detailed information through the web dashboard at http://petz.rdx4.com/
Player Profiles - Complete stats, pet collections, and inventories
Leaderboard - Top players by level and achievements
Locations Guide - All areas with spawn information
Gym Badges - Display your earned badges and progress
"""
# Add command-specific CSS to the unified styles
additional_css = """
.section {
background: var(--bg-secondary);
border-radius: 15px;
margin-bottom: 30px;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
border: 1px solid var(--border-color);
overflow: hidden;
}
.section-header {
background: var(--gradient-primary);
color: white;
padding: 20px 25px;
font-size: 1.3em;
font-weight: 700;
}
.section-content {
padding: 25px;
}
.command-grid {
display: grid;
gap: 20px;
}
.command {
border: 1px solid var(--border-color);
border-radius: 12px;
overflow: hidden;
transition: all 0.3s ease;
background: var(--bg-tertiary);
}
.command:hover {
transform: translateY(-3px);
box-shadow: 0 8px 25px rgba(102, 255, 102, 0.15);
border-color: var(--text-accent);
}
.command-name {
background: var(--bg-primary);
padding: 15px 20px;
font-family: 'Fira Code', 'Courier New', monospace;
font-weight: bold;
color: var(--text-accent);
border-bottom: 1px solid var(--border-color);
font-size: 1.2em;
text-shadow: 0 0 10px rgba(102, 255, 102, 0.3);
}
.command-desc {
padding: 20px;
line-height: 1.7;
color: var(--text-primary);
}
.command-example {
background: var(--bg-primary);
padding: 12px 20px;
font-family: 'Fira Code', 'Courier New', monospace;
color: var(--text-secondary);
border-top: 1px solid var(--border-color);
font-size: 0.95em;
}
.info-box {
background: var(--bg-tertiary);
padding: 20px;
border-radius: 12px;
margin: 20px 0;
border: 1px solid var(--border-color);
}
.info-box h4 {
margin: 0 0 15px 0;
color: var(--text-accent);
font-size: 1.1em;
font-weight: 600;
}
.info-box ul {
margin: 0;
padding-left: 25px;
}
.info-box li {
margin: 8px 0;
color: var(--text-primary);
}
.info-box strong {
color: var(--text-accent);
}
.footer {
text-align: center;
margin-top: 50px;
padding: 30px;
background: var(--bg-secondary);
border-radius: 15px;
color: var(--text-secondary);
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
border: 1px solid var(--border-color);
}
.tip {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 20px;
border-radius: 12px;
margin: 20px 0;
font-weight: 500;
text-shadow: 0 2px 4px rgba(0,0,0,0.3);
}
.gym-list {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 15px;
margin: 15px 0;
}
.gym-card {
background: var(--bg-primary);
padding: 15px;
border-radius: 8px;
border: 1px solid var(--border-color);
}
.gym-card strong {
color: var(--text-accent);
}
"""
# Get the unified template with additional CSS
html_content = self.get_page_template("Command Help", content, "help")
# Insert additional CSS before closing tag
html_content = html_content.replace("", additional_css + "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
def serve_players(self):
"""Serve the players page with real data"""
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_error_page("Players", "Database not available")
return
# Fetch players data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
players_data = loop.run_until_complete(self.fetch_players_data(database))
loop.close()
self.serve_players_data(players_data)
except Exception as e:
print(f"Error fetching players data: {e}")
self.serve_error_page("Players", f"Error loading players: {str(e)}")
async def fetch_players_data(self, database):
"""Fetch all players data from database"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
# Get all players with basic stats
cursor = await db.execute("""
SELECT p.nickname, p.level, p.experience, p.money, p.created_at,
l.name as location_name,
(SELECT COUNT(*) FROM pets WHERE player_id = p.id) as pet_count,
(SELECT COUNT(*) FROM pets WHERE player_id = p.id AND is_active = 1) as active_pets,
(SELECT COUNT(*) FROM player_achievements WHERE player_id = p.id) as achievement_count
FROM players p
LEFT JOIN locations l ON p.current_location_id = l.id
ORDER BY p.level DESC, p.experience DESC
""")
rows = await cursor.fetchall()
# Convert SQLite rows to dictionaries properly
players = []
for row in rows:
player_dict = {
'nickname': row[0],
'level': row[1],
'experience': row[2],
'money': row[3],
'created_at': row[4],
'location_name': row[5],
'pet_count': row[6],
'active_pets': row[7],
'achievement_count': row[8]
}
players.append(player_dict)
return players
except Exception as e:
print(f"Database error fetching players: {e}")
return []
def serve_players_data(self, players_data):
"""Serve players page with real data"""
# Calculate statistics
total_players = len(players_data)
total_pets = sum(p['pet_count'] for p in players_data) if players_data else 0
total_achievements = sum(p['achievement_count'] for p in players_data) if players_data else 0
highest_level = max((p['level'] for p in players_data), default=0) if players_data else 0
# Build statistics cards
stats_content = f"""
๐ Total Players
{total_players}
๐พ Total Pets
{total_pets}
๐ Achievements
{total_achievements}
โญ Highest Level
{highest_level}
"""
# Build players table HTML
if players_data:
players_html = ""
for i, player in enumerate(players_data, 1):
rank_emoji = {"1": "๐ฅ", "2": "๐ฅ", "3": "๐ฅ"}.get(str(i), f"{i}.")
players_html += f"""
'
# Data rows
content += ''
for i, player in enumerate(data):
row_data = row_formatter(player, i)
rank_class = f"rank-{i+1}" if i < 3 else ""
content += f'
'
for cell in row_data:
content += f'
{cell}
'
content += '
'
content += ''
content += '
'
content += '
'
return content
def serve_locations(self):
"""Serve the locations page with real data"""
# Get database instance from the server class
database = self.server.database if hasattr(self.server, 'database') else None
if not database:
self.serve_error_page("Locations", "Database not available")
return
# Fetch locations data
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
locations_data = loop.run_until_complete(self.fetch_locations_data(database))
loop.close()
self.serve_locations_data(locations_data)
except Exception as e:
print(f"Error fetching locations data: {e}")
self.serve_error_page("Locations", f"Error loading locations: {str(e)}")
async def fetch_locations_data(self, database):
"""Fetch all locations and their spawn data from database"""
try:
import aiosqlite
async with aiosqlite.connect(database.db_path) as db:
# Get all locations
cursor = await db.execute("""
SELECT l.*,
GROUP_CONCAT(DISTINCT ps.name || ' (' || ps.type1 ||
CASE WHEN ps.type2 IS NOT NULL THEN '/' || ps.type2 ELSE '' END || ')') as spawns
FROM locations l
LEFT JOIN location_spawns ls ON l.id = ls.location_id
LEFT JOIN pet_species ps ON ls.species_id = ps.id
GROUP BY l.id
ORDER BY l.id
""")
rows = await cursor.fetchall()
# Convert SQLite rows to dictionaries properly
locations = []
for row in rows:
location_dict = {
'id': row[0],
'name': row[1],
'description': row[2],
'level_min': row[3],
'level_max': row[4],
'spawns': row[5] if len(row) > 5 else None
}
locations.append(location_dict)
return locations
except Exception as e:
print(f"Database error fetching locations: {e}")
return []
def serve_locations_data(self, locations_data):
"""Serve locations page with real data using unified template"""
# Build locations HTML
locations_html = ""
if locations_data:
for location in locations_data:
spawns = location.get('spawns', 'No pets found')
if not spawns or spawns == 'None':
spawns = "No pets spawn here yet"
# Split spawns into a readable list and remove duplicates
if spawns != "No pets spawn here yet":
spawn_list = list(set([spawn.strip() for spawn in spawns.split(',') if spawn.strip()]))
spawn_list.sort() # Sort alphabetically for consistency
else:
spawn_list = []
spawn_badges = ""
visible_spawns = spawn_list[:6] # Show first 6
hidden_spawns = spawn_list[6:] # Hide the rest
# Add visible spawn badges
for spawn in visible_spawns:
spawn_badges += f'{spawn}'
# Add hidden spawn badges (initially hidden)
if hidden_spawns:
location_id = location['id']
for spawn in hidden_spawns:
spawn_badges += f'{spawn}'
# Add functional "show more" button
spawn_badges += f'+{len(hidden_spawns)} more'
if not spawn_badges:
spawn_badges = 'No pets spawn here yet'
locations_html += f"""
"""
# Add no-pets-specific CSS
additional_css = """
.main-container {
text-align: center;
max-width: 800px;
margin: 0 auto;
}
.no-pets-message {
background: var(--bg-secondary);
padding: 40px;
border-radius: 15px;
border: 2px solid var(--warning-color);
margin-top: 30px;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
}
.no-pets-message h2 {
color: var(--warning-color);
margin-top: 0;
}
"""
html_content = self.get_page_template(f"Team Builder - {nickname}", content, "players")
html_content = html_content.replace("", additional_css + "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
def serve_teambuilder_interface(self, nickname, pets):
"""Serve the full interactive team builder interface"""
active_pets = [pet for pet in pets if pet['is_active']]
inactive_pets = [pet for pet in pets if not pet['is_active']]
# Debug logging
print(f"Team Builder Debug for {nickname}:")
print(f"Total pets: {len(pets)}")
active_names = [f"{pet['nickname'] or pet['species_name']} (ID:{pet['id']}, is_active:{pet['is_active']})" for pet in active_pets]
inactive_names = [f"{pet['nickname'] or pet['species_name']} (ID:{pet['id']}, is_active:{pet['is_active']})" for pet in inactive_pets]
print(f"Active pets: {len(active_pets)} - {active_names}")
print(f"Inactive pets: {len(inactive_pets)} - {inactive_names}")
# Generate detailed pet cards with debugging
def make_pet_card(pet, is_active):
name = pet['nickname'] or pet['species_name']
status = "Active" if is_active else "Storage"
status_class = "active" if is_active else "storage"
type_str = pet['type1']
if pet['type2']:
type_str += f"/{pet['type2']}"
# Debug logging
print(f"Making pet card for {name} (ID: {pet['id']}): is_active={pet['is_active']}, passed_is_active={is_active}, status_class={status_class}, team_order={pet.get('team_order', 'None')}")
# Calculate HP percentage for health bar
hp_percent = (pet['hp'] / pet['max_hp']) * 100 if pet['max_hp'] > 0 else 0
hp_color = "#4CAF50" if hp_percent > 60 else "#FF9800" if hp_percent > 25 else "#f44336"
return f"""
{name}
{status}
Level {pet['level']} {pet['species_name']}
{type_str}
HP: {pet['hp']}/{pet['max_hp']}
ATK{pet['attack']}
DEF{pet['defense']}
SPD{pet['speed']}
EXP{pet['experience']}
{'๐' if pet['happiness'] > 70 else '๐' if pet['happiness'] > 40 else '๐'}Happiness: {pet['happiness']}/100
"""
# Create 6 numbered slots and place pets in their positions
team_slots = [''] * 6 # Initialize 6 empty slots
# Place active pets in their team_order positions
for pet in active_pets:
team_order = pet.get('team_order')
if team_order and 1 <= team_order <= 6:
team_slots[team_order - 1] = make_pet_card(pet, True)
storage_cards = ''.join(make_pet_card(pet, False) for pet in inactive_pets)
html = f"""
Team Builder - {nickname}โ Back to {nickname}'s Profile
๐พ Team Builder
Drag pets between Active and Storage to build your perfect team
Changes are saved securely with PIN verification via IRC
๐ PIN Verification Required
A 6-digit PIN has been sent to you via IRC private message.
Enter the PIN below to confirm your team changes:
"""
# Generate storage pets HTML first
storage_pets_html = ""
for pet in inactive_pets:
storage_pets_html += make_pet_card(pet, False)
# Generate active pets HTML for team slots
active_pets_html = ""
for pet in active_pets:
if pet.get('team_order'):
active_pets_html += make_pet_card(pet, True)
# Create content using string concatenation instead of f-strings to avoid CSS brace issues
team_builder_content = """
๐พ Team Builder
Drag pets between Active Team and Storage. Double-click as backup.
Changes are saved securely with PIN verification via IRC
๐ PIN Verification Required
A 6-digit PIN has been sent to you via IRC private message.
Enter the PIN below to confirm your team changes:
๐ก How to use:
โข Drag pets to team slots
โข Double-click to move pets
โข Empty slots show placeholders
"""
# Get the unified template
html_content = self.get_page_template(f"Team Builder - {nickname}", team_builder_content, "")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_content.encode())
def handle_team_save(self, nickname):
"""Handle team save request and generate PIN"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No data provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
# Parse JSON data
import json
try:
team_data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json_response({"success": False, "error": "Invalid JSON data"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_save_async(nickname, team_data))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_team_save: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_save_async(self, nickname, team_data):
"""Async handler for team save"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Validate team composition
validation = await self.database.validate_team_composition(player["id"], team_data)
if not validation["valid"]:
return {"success": False, "error": validation["error"]}
# Create pending team change with PIN
import json
result = await self.database.create_pending_team_change(
player["id"],
json.dumps(team_data)
)
if result["success"]:
# Send PIN via IRC
self.send_pin_via_irc(nickname, result["pin_code"])
return {
"success": True,
"message": "PIN sent to your IRC private messages",
"expires_in_minutes": 10
}
else:
return result
except Exception as e:
print(f"Error in _handle_team_save_async: {e}")
return {"success": False, "error": str(e)}
def handle_team_verify(self, nickname):
"""Handle PIN verification and apply team changes"""
try:
# Get POST data
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"success": False, "error": "No PIN provided"}, 400)
return
post_data = self.rfile.read(content_length).decode('utf-8')
# Parse JSON data
import json
try:
data = json.loads(post_data)
pin_code = data.get("pin", "").strip()
except (json.JSONDecodeError, AttributeError):
self.send_json_response({"success": False, "error": "Invalid data format"}, 400)
return
if not pin_code:
self.send_json_response({"success": False, "error": "PIN code is required"}, 400)
return
# Run async operations
import asyncio
result = asyncio.run(self._handle_team_verify_async(nickname, pin_code))
if result["success"]:
self.send_json_response(result, 200)
else:
self.send_json_response(result, 400)
except Exception as e:
print(f"Error in handle_team_verify: {e}")
self.send_json_response({"success": False, "error": "Internal server error"}, 500)
async def _handle_team_verify_async(self, nickname, pin_code):
"""Async handler for PIN verification"""
try:
# Get player
player = await self.database.get_player(nickname)
if not player:
return {"success": False, "error": "Player not found"}
# Apply team changes with PIN verification
result = await self.database.apply_team_change(player["id"], pin_code)
if result["success"]:
return {
"success": True,
"message": f"Team changes applied successfully! {result['changes_applied']} pets updated.",
"changes_applied": result["changes_applied"]
}
else:
return result
except Exception as e:
print(f"Error in _handle_team_verify_async: {e}")
return {"success": False, "error": str(e)}
def send_pin_via_irc(self, nickname, pin_code):
"""Send PIN to player via IRC private message"""
print(f"๐ PIN for {nickname}: {pin_code}")
# Try to send via IRC bot if available
if self.bot and hasattr(self.bot, 'send_message_sync'):
try:
# Send PIN via private message using sync wrapper
self.bot.send_message_sync(nickname, f"๐ Team Builder PIN: {pin_code}")
self.bot.send_message_sync(nickname, f"๐ก Enter this PIN on the web page to confirm your team changes. PIN expires in 10 minutes.")
print(f"โ PIN sent to {nickname} via IRC")
except Exception as e:
print(f"โ Failed to send PIN via IRC: {e}")
else:
print(f"โ No IRC bot available to send PIN to {nickname}")
print(f"๐ก Manual PIN for {nickname}: {pin_code}")
class PetBotWebServer:
"""Standalone web server for PetBot"""
def __init__(self, database=None, port=8080, bot=None):
self.database = database or Database()
self.port = port
self.bot = bot
self.server = None
def run(self):
"""Start the web server"""
self.server = HTTPServer(('0.0.0.0', self.port), PetBotRequestHandler)
self.server.database = self.database
self.server.bot = self.bot
print(f'๐ Starting PetBot web server on http://0.0.0.0:{self.port}')
print(f'๐ก Accessible from WSL at: http://172.27.217.61:{self.port}')
print(f'๐ก Accessible from Windows at: http://localhost:{self.port}')
print('')
print('๐ Public access at: http://petz.rdx4.com/')
print('')
self.server.serve_forever()
def start_in_thread(self):
"""Start the web server in a background thread"""
import threading
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
def stop(self):
"""Stop the web server"""
if self.server:
self.server.shutdown()
self.server.server_close()
def run_standalone():
"""Run the web server in standalone mode"""
import sys
port = 8080
if len(sys.argv) > 1:
try:
port = int(sys.argv[1])
except ValueError:
print('Usage: python webserver.py [port]')
sys.exit(1)
server = PetBotWebServer(port)
print('๐ PetBot Web Server')
print('=' * 50)
print(f'Port: {port}')
print('')
print('๐ Local URLs:')
print(f' http://localhost:{port}/ - Game Hub (local)')
print(f' http://localhost:{port}/help - Command Help (local)')
print(f' http://localhost:{port}/players - Player List (local)')
print(f' http://localhost:{port}/leaderboard - Leaderboard (local)')
print(f' http://localhost:{port}/locations - Locations (local)')
print('')
print('๐ Public URLs:')
print(' http://petz.rdx4.com/ - Game Hub')
print(' http://petz.rdx4.com/help - Command Help')
print(' http://petz.rdx4.com/players - Player List')
print(' http://petz.rdx4.com/leaderboard - Leaderboard')
print(' http://petz.rdx4.com/locations - Locations')
print('')
print('Press Ctrl+C to stop')
try:
server.run()
except KeyboardInterrupt:
print('\nโ Web server stopped')
if __name__ == '__main__':
run_standalone()