Petbot/src/database.py
megaproxy 9cf2231a03 Implement secure team builder with PIN verification system
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-14 17:08:02 +01:00

1588 lines
No EOL
67 KiB
Python

import aiosqlite
import json
from typing import Dict, List, Optional, Tuple
from datetime import datetime
class Database:
def __init__(self, db_path: str = "data/petbot.db"):
self.db_path = db_path
async def init_database(self):
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS players (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nickname TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_active TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
level INTEGER DEFAULT 1,
experience INTEGER DEFAULT 0,
money INTEGER DEFAULT 100,
current_location_id INTEGER DEFAULT NULL,
FOREIGN KEY (current_location_id) REFERENCES locations (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS pet_species (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
type1 TEXT NOT NULL,
type2 TEXT,
base_hp INTEGER NOT NULL,
base_attack INTEGER NOT NULL,
base_defense INTEGER NOT NULL,
base_speed INTEGER NOT NULL,
evolution_level INTEGER,
evolution_species_id INTEGER,
rarity INTEGER DEFAULT 1,
FOREIGN KEY (evolution_species_id) REFERENCES pet_species (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS pets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
species_id INTEGER NOT NULL,
nickname TEXT,
level INTEGER DEFAULT 1,
experience INTEGER DEFAULT 0,
hp INTEGER NOT NULL,
max_hp INTEGER NOT NULL,
attack INTEGER NOT NULL,
defense INTEGER NOT NULL,
speed INTEGER NOT NULL,
happiness INTEGER DEFAULT 50,
caught_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT FALSE,
FOREIGN KEY (player_id) REFERENCES players (id),
FOREIGN KEY (species_id) REFERENCES pet_species (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS moves (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
type TEXT NOT NULL,
category TEXT NOT NULL,
power INTEGER,
accuracy INTEGER DEFAULT 100,
pp INTEGER DEFAULT 10,
description TEXT
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS pet_moves (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pet_id INTEGER NOT NULL,
move_id INTEGER NOT NULL,
current_pp INTEGER,
FOREIGN KEY (pet_id) REFERENCES pets (id),
FOREIGN KEY (move_id) REFERENCES moves (id),
UNIQUE(pet_id, move_id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS battles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player1_id INTEGER NOT NULL,
player2_id INTEGER,
battle_type TEXT NOT NULL,
status TEXT DEFAULT 'active',
winner_id INTEGER,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ended_at TIMESTAMP,
FOREIGN KEY (player1_id) REFERENCES players (id),
FOREIGN KEY (player2_id) REFERENCES players (id),
FOREIGN KEY (winner_id) REFERENCES players (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS locations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
level_min INTEGER DEFAULT 1,
level_max INTEGER DEFAULT 5
)
""")
# Add current_location_id column if it doesn't exist
try:
await db.execute("ALTER TABLE players ADD COLUMN current_location_id INTEGER DEFAULT 1")
await db.commit()
print("Added current_location_id column to players table")
except:
pass # Column already exists
await db.execute("""
CREATE TABLE IF NOT EXISTS location_spawns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
location_id INTEGER NOT NULL,
species_id INTEGER NOT NULL,
spawn_rate REAL DEFAULT 0.1,
min_level INTEGER DEFAULT 1,
max_level INTEGER DEFAULT 5,
FOREIGN KEY (location_id) REFERENCES locations (id),
FOREIGN KEY (species_id) REFERENCES pet_species (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS achievements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
requirement_type TEXT NOT NULL,
requirement_data TEXT,
unlock_location_id INTEGER,
FOREIGN KEY (unlock_location_id) REFERENCES locations (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS player_achievements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
achievement_id INTEGER NOT NULL,
completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (player_id) REFERENCES players (id),
FOREIGN KEY (achievement_id) REFERENCES achievements (id),
UNIQUE(player_id, achievement_id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS location_weather (
id INTEGER PRIMARY KEY AUTOINCREMENT,
location_id INTEGER NOT NULL,
weather_type TEXT NOT NULL,
active_until TIMESTAMP,
spawn_modifier REAL DEFAULT 1.0,
affected_types TEXT,
FOREIGN KEY (location_id) REFERENCES locations (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS active_battles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
wild_pet_data TEXT NOT NULL,
player_pet_id INTEGER NOT NULL,
player_hp INTEGER NOT NULL,
wild_hp INTEGER NOT NULL,
turn_count INTEGER DEFAULT 1,
current_turn TEXT DEFAULT 'player',
battle_status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (player_id) REFERENCES players (id),
FOREIGN KEY (player_pet_id) REFERENCES pets (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT NOT NULL,
category TEXT NOT NULL,
rarity TEXT NOT NULL,
effect TEXT,
effect_value INTEGER DEFAULT 0,
consumable BOOLEAN DEFAULT TRUE,
sell_value INTEGER DEFAULT 0
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS player_inventory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
item_id INTEGER NOT NULL,
quantity INTEGER DEFAULT 1,
obtained_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (player_id) REFERENCES players (id),
FOREIGN KEY (item_id) REFERENCES items (id),
UNIQUE(player_id, item_id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS gyms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
location_id INTEGER NOT NULL,
name TEXT UNIQUE NOT NULL,
leader_name TEXT NOT NULL,
description TEXT,
theme TEXT NOT NULL,
badge_name TEXT NOT NULL,
badge_icon TEXT NOT NULL,
badge_description TEXT,
FOREIGN KEY (location_id) REFERENCES locations (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS gym_teams (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gym_id INTEGER NOT NULL,
species_id INTEGER NOT NULL,
base_level INTEGER NOT NULL,
move_set TEXT,
team_position INTEGER NOT NULL,
FOREIGN KEY (gym_id) REFERENCES gyms (id),
FOREIGN KEY (species_id) REFERENCES pet_species (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS player_gym_battles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
gym_id INTEGER NOT NULL,
victories INTEGER DEFAULT 0,
highest_difficulty INTEGER DEFAULT 0,
first_victory_date TIMESTAMP,
last_battle_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (player_id) REFERENCES players (id),
FOREIGN KEY (gym_id) REFERENCES gyms (id),
UNIQUE(player_id, gym_id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS active_gym_battles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
gym_id INTEGER NOT NULL,
difficulty_level INTEGER NOT NULL,
current_pet_index INTEGER DEFAULT 0,
gym_team_data TEXT NOT NULL,
battle_status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (player_id) REFERENCES players (id),
FOREIGN KEY (gym_id) REFERENCES gyms (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS player_encounters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
species_id INTEGER NOT NULL,
first_encounter_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_encounters INTEGER DEFAULT 1,
last_encounter_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
caught_count INTEGER DEFAULT 0,
FOREIGN KEY (player_id) REFERENCES players (id),
FOREIGN KEY (species_id) REFERENCES pet_species (id),
UNIQUE(player_id, species_id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS pending_team_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
new_team_data TEXT NOT NULL,
verification_pin TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
is_verified BOOLEAN DEFAULT FALSE,
verified_at TIMESTAMP,
FOREIGN KEY (player_id) REFERENCES players (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS verification_pins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
player_id INTEGER NOT NULL,
pin_code TEXT NOT NULL,
request_type TEXT NOT NULL,
request_data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
is_used BOOLEAN DEFAULT FALSE,
used_at TIMESTAMP,
FOREIGN KEY (player_id) REFERENCES players (id)
)
""")
await db.commit()
async def get_player(self, nickname: str) -> Optional[Dict]:
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM players WHERE nickname = ?", (nickname,)
)
row = await cursor.fetchone()
return dict(row) if row else None
async def create_player(self, nickname: str) -> int:
async with aiosqlite.connect(self.db_path) as db:
# Get Starter Town ID
cursor = await db.execute("SELECT id FROM locations WHERE name = 'Starter Town'")
starter_town = await cursor.fetchone()
if not starter_town:
raise Exception("Starter Town location not found in database - ensure game data is loaded first")
starter_town_id = starter_town[0]
cursor = await db.execute(
"INSERT INTO players (nickname, current_location_id) VALUES (?, ?)",
(nickname, starter_town_id)
)
await db.commit()
return cursor.lastrowid
async def get_player_pets(self, player_id: int, active_only: bool = False) -> List[Dict]:
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
query = """
SELECT p.*, ps.name as species_name, ps.type1, ps.type2
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ?
"""
params = [player_id]
if active_only:
query += " AND p.is_active = TRUE"
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_player_location(self, player_id: int) -> Optional[Dict]:
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT l.* FROM locations l
JOIN players p ON p.current_location_id = l.id
WHERE p.id = ?
""", (player_id,))
row = await cursor.fetchone()
return dict(row) if row else None
async def update_player_location(self, player_id: int, location_id: int):
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"UPDATE players SET current_location_id = ? WHERE id = ?",
(location_id, player_id)
)
await db.commit()
async def get_location_by_name(self, location_name: str) -> Optional[Dict]:
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM locations WHERE name = ?", (location_name,)
)
row = await cursor.fetchone()
return dict(row) if row else None
async def get_all_locations(self) -> List[Dict]:
"""Get all locations"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("SELECT * FROM locations ORDER BY id")
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def check_player_achievements(self, player_id: int, achievement_type: str, data: str):
"""Check and award achievements based on player actions"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Get relevant achievements not yet earned
cursor = await db.execute("""
SELECT a.* FROM achievements a
WHERE a.requirement_type = ?
AND a.id NOT IN (
SELECT pa.achievement_id FROM player_achievements pa
WHERE pa.player_id = ?
)
""", (achievement_type, player_id))
achievements = await cursor.fetchall()
newly_earned = []
for achievement in achievements:
if await self._check_achievement_requirement(player_id, achievement, data):
# Award achievement
await db.execute("""
INSERT INTO player_achievements (player_id, achievement_id)
VALUES (?, ?)
""", (player_id, achievement["id"]))
newly_earned.append(dict(achievement))
await db.commit()
return newly_earned
async def _check_achievement_requirement(self, player_id: int, achievement: Dict, data: str) -> bool:
"""Check if player meets achievement requirement"""
req_type = achievement["requirement_type"]
req_data = achievement["requirement_data"]
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
if req_type == "catch_type":
# Count unique species of a specific type caught
required_count, pet_type = req_data.split(":")
cursor = await db.execute("""
SELECT COUNT(DISTINCT ps.id) as count
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ? AND (ps.type1 = ? OR ps.type2 = ?)
""", (player_id, pet_type, pet_type))
result = await cursor.fetchone()
return result["count"] >= int(required_count)
elif req_type == "catch_total":
# Count total pets caught
required_count = int(req_data)
cursor = await db.execute("""
SELECT COUNT(*) as count FROM pets WHERE player_id = ?
""", (player_id,))
result = await cursor.fetchone()
return result["count"] >= required_count
elif req_type == "explore_count":
# This would need exploration tracking - placeholder for now
return False
return False
async def get_player_achievements(self, player_id: int) -> List[Dict]:
"""Get all achievements earned by player"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT a.*, pa.completed_at
FROM achievements a
JOIN player_achievements pa ON a.id = pa.achievement_id
WHERE pa.player_id = ?
ORDER BY pa.completed_at DESC
""", (player_id,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def can_access_location(self, player_id: int, location_id: int) -> bool:
"""Check if player can access a location based on achievements"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Check if location requires any achievements
cursor = await db.execute("""
SELECT a.* FROM achievements a
WHERE a.unlock_location_id = ?
""", (location_id,))
required_achievements = await cursor.fetchall()
if not required_achievements:
return True # No requirements
# Check if player has ALL required achievements
for achievement in required_achievements:
cursor = await db.execute("""
SELECT 1 FROM player_achievements
WHERE player_id = ? AND achievement_id = ?
""", (player_id, achievement["id"]))
if not await cursor.fetchone():
return False # Missing required achievement
return True
async def get_missing_location_requirements(self, player_id: int, location_id: int) -> List[Dict]:
"""Get list of achievements required to access a location that the player doesn't have"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Get all achievements required for this location
cursor = await db.execute("""
SELECT a.* FROM achievements a
WHERE a.unlock_location_id = ?
""", (location_id,))
required_achievements = await cursor.fetchall()
missing_achievements = []
# Check which ones the player doesn't have
for achievement in required_achievements:
cursor = await db.execute("""
SELECT 1 FROM player_achievements
WHERE player_id = ? AND achievement_id = ?
""", (player_id, achievement["id"]))
if not await cursor.fetchone():
# Player doesn't have this achievement - convert to dict manually
achievement_dict = {
'id': achievement[0],
'name': achievement[1],
'description': achievement[2],
'requirement_type': achievement[3],
'requirement_data': achievement[4],
'unlock_location_id': achievement[5]
}
missing_achievements.append(achievement_dict)
return missing_achievements
async def get_location_weather(self, location_id: int) -> Optional[Dict]:
"""Get current weather for a location"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT * FROM location_weather
WHERE location_id = ? AND active_until > datetime('now')
ORDER BY id DESC LIMIT 1
""", (location_id,))
row = await cursor.fetchone()
return dict(row) if row else None
async def activate_pet(self, player_id: int, pet_identifier: str) -> Dict:
"""Activate a pet by name or species name. Returns result dict."""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Find pet by nickname or species name
cursor = await db.execute("""
SELECT p.*, ps.name as species_name
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ? AND p.is_active = FALSE
AND (p.nickname = ? OR ps.name = ?)
LIMIT 1
""", (player_id, pet_identifier, pet_identifier))
pet = await cursor.fetchone()
if not pet:
return {"success": False, "error": f"No inactive pet found named '{pet_identifier}'"}
# Activate the pet
await db.execute("UPDATE pets SET is_active = TRUE WHERE id = ?", (pet["id"],))
await db.commit()
return {"success": True, "pet": dict(pet)}
async def deactivate_pet(self, player_id: int, pet_identifier: str) -> Dict:
"""Deactivate a pet by name or species name. Returns result dict."""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Find pet by nickname or species name
cursor = await db.execute("""
SELECT p.*, ps.name as species_name
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ? AND p.is_active = TRUE
AND (p.nickname = ? OR ps.name = ?)
LIMIT 1
""", (player_id, pet_identifier, pet_identifier))
pet = await cursor.fetchone()
if not pet:
return {"success": False, "error": f"No active pet found named '{pet_identifier}'"}
# Check if this is the only active pet
cursor = await db.execute("SELECT COUNT(*) as count FROM pets WHERE player_id = ? AND is_active = TRUE", (player_id,))
active_count = await cursor.fetchone()
if active_count["count"] <= 1:
return {"success": False, "error": "You must have at least one active pet!"}
# Deactivate the pet
await db.execute("UPDATE pets SET is_active = FALSE WHERE id = ?", (pet["id"],))
await db.commit()
return {"success": True, "pet": dict(pet)}
async def swap_pets(self, player_id: int, pet1_identifier: str, pet2_identifier: str) -> Dict:
"""Swap the active status of two pets. Returns result dict."""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Find both pets
cursor = await db.execute("""
SELECT p.*, ps.name as species_name
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ?
AND (p.nickname = ? OR ps.name = ?)
LIMIT 1
""", (player_id, pet1_identifier, pet1_identifier))
pet1 = await cursor.fetchone()
cursor = await db.execute("""
SELECT p.*, ps.name as species_name
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ?
AND (p.nickname = ? OR ps.name = ?)
LIMIT 1
""", (player_id, pet2_identifier, pet2_identifier))
pet2 = await cursor.fetchone()
if not pet1:
return {"success": False, "error": f"Pet '{pet1_identifier}' not found"}
if not pet2:
return {"success": False, "error": f"Pet '{pet2_identifier}' not found"}
if pet1["id"] == pet2["id"]:
return {"success": False, "error": "Cannot swap a pet with itself"}
# Swap their active status
await db.execute("UPDATE pets SET is_active = ? WHERE id = ?", (not pet1["is_active"], pet1["id"]))
await db.execute("UPDATE pets SET is_active = ? WHERE id = ?", (not pet2["is_active"], pet2["id"]))
await db.commit()
return {
"success": True,
"pet1": dict(pet1),
"pet2": dict(pet2),
"pet1_now": "active" if not pet1["is_active"] else "storage",
"pet2_now": "active" if not pet2["is_active"] else "storage"
}
# Item and Inventory Methods
async def add_item_to_inventory(self, player_id: int, item_name: str, quantity: int = 1) -> bool:
"""Add an item to player's inventory"""
async with aiosqlite.connect(self.db_path) as db:
# Get item ID
cursor = await db.execute("SELECT id FROM items WHERE name = ?", (item_name,))
item = await cursor.fetchone()
if not item:
return False
item_id = item[0]
# Check if player already has this item
cursor = await db.execute(
"SELECT quantity FROM player_inventory WHERE player_id = ? AND item_id = ?",
(player_id, item_id)
)
existing = await cursor.fetchone()
if existing:
# Update quantity
new_quantity = existing[0] + quantity
await db.execute(
"UPDATE player_inventory SET quantity = ? WHERE player_id = ? AND item_id = ?",
(new_quantity, player_id, item_id)
)
else:
# Insert new item
await db.execute(
"INSERT INTO player_inventory (player_id, item_id, quantity) VALUES (?, ?, ?)",
(player_id, item_id, quantity)
)
await db.commit()
return True
async def get_player_inventory(self, player_id: int) -> List[Dict]:
"""Get all items in player's inventory"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT i.name, i.description, i.category, i.rarity, i.effect,
i.effect_value, pi.quantity, pi.obtained_at
FROM player_inventory pi
JOIN items i ON pi.item_id = i.id
WHERE pi.player_id = ?
ORDER BY i.rarity DESC, i.name ASC
""", (player_id,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def use_item(self, player_id: int, item_name: str) -> Dict:
"""Use an item from inventory"""
async with aiosqlite.connect(self.db_path) as db:
# Get item details
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT i.*, pi.quantity
FROM items i
JOIN player_inventory pi ON i.id = pi.item_id
WHERE pi.player_id = ? AND i.name = ?
""", (player_id, item_name))
item = await cursor.fetchone()
if not item:
return {"success": False, "error": "Item not found in inventory"}
if item["quantity"] <= 0:
return {"success": False, "error": "No items of this type available"}
# Remove one from inventory if consumable
if item["consumable"]:
if item["quantity"] == 1:
await db.execute(
"DELETE FROM player_inventory WHERE player_id = ? AND item_id = ?",
(player_id, item["id"])
)
else:
await db.execute(
"UPDATE player_inventory SET quantity = quantity - 1 WHERE player_id = ? AND item_id = ?",
(player_id, item["id"])
)
await db.commit()
return {
"success": True,
"item": dict(item),
"effect": item["effect"],
"effect_value": item["effect_value"]
}
async def initialize_items(self):
"""Initialize items from config file"""
import json
try:
with open("config/items.json", "r") as f:
items_data = json.load(f)
except FileNotFoundError:
print("Items config file not found")
return
async with aiosqlite.connect(self.db_path) as db:
# Clear existing items
await db.execute("DELETE FROM items")
# Add all items from config
for category_items in items_data.values():
if category_items and isinstance(category_items, list):
for item in category_items:
if "id" in item: # Valid item entry
sell_value = 0
if item.get("effect") == "sell_value":
sell_value = item.get("effect_value", 0)
await db.execute("""
INSERT OR REPLACE INTO items
(id, name, description, category, rarity, effect, effect_value, sell_value)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
item["id"],
item["name"],
item["description"],
item["category"],
item["rarity"],
item.get("effect", "none"),
item.get("effect_value", 0),
sell_value
))
await db.commit()
print("Items initialized from config")
async def update_pet_hp(self, pet_id: int, new_hp: int) -> bool:
"""Update a pet's current HP"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("UPDATE pets SET hp = ? WHERE id = ?", (new_hp, pet_id))
await db.commit()
return True
async def get_active_pets(self, player_id: int) -> List[Dict]:
"""Get all active pets for a player"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT p.*, ps.name as species_name, ps.type1, ps.type2,
ps.base_hp, ps.base_attack, ps.base_defense, ps.base_speed
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ? AND p.is_active = 1
ORDER BY p.id ASC
""", (player_id,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
def calculate_exp_for_level(self, level: int) -> int:
"""Calculate total experience needed to reach a level"""
# Using a cubic growth formula: level^3 * 4 - 12
return max(0, (level ** 3) * 4 - 12)
def calculate_level_from_exp(self, exp: int) -> int:
"""Calculate what level a pet should be based on experience"""
level = 1
while self.calculate_exp_for_level(level + 1) <= exp:
level += 1
return min(level, 100) # Cap at level 100
async def award_experience(self, pet_id: int, exp_amount: int) -> Dict:
"""Award experience to a pet and handle leveling up"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Get current pet data
cursor = await db.execute("""
SELECT p.*, ps.name as species_name, ps.base_hp, ps.base_attack,
ps.base_defense, ps.base_speed
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.id = ?
""", (pet_id,))
pet = await cursor.fetchone()
if not pet:
return {"success": False, "error": "Pet not found"}
pet_dict = dict(pet)
old_level = pet_dict["level"]
old_exp = pet_dict["experience"]
new_exp = old_exp + exp_amount
new_level = self.calculate_level_from_exp(new_exp)
result = {
"success": True,
"pet_id": pet_id,
"pet_name": pet_dict["nickname"] or pet_dict["species_name"],
"species_name": pet_dict["species_name"],
"old_level": old_level,
"new_level": new_level,
"old_exp": old_exp,
"new_exp": new_exp,
"exp_gained": exp_amount,
"leveled_up": new_level > old_level,
"levels_gained": new_level - old_level
}
# Update experience
await db.execute("""
UPDATE pets SET experience = ? WHERE id = ?
""", (new_exp, pet_id))
# Handle level up if it occurred
if new_level > old_level:
await self._handle_level_up(db, pet_dict, new_level)
result["stat_increases"] = await self._calculate_stat_increases(pet_dict, old_level, new_level)
await db.commit()
return result
async def _handle_level_up(self, db, pet_dict: Dict, new_level: int):
"""Handle pet leveling up - recalculate stats and HP"""
# Calculate new stats based on level
new_stats = self._calculate_pet_stats(pet_dict, new_level)
# Update pet stats and level
await db.execute("""
UPDATE pets
SET level = ?, max_hp = ?, attack = ?, defense = ?, speed = ?, hp = ?
WHERE id = ?
""", (
new_level,
new_stats["hp"],
new_stats["attack"],
new_stats["defense"],
new_stats["speed"],
new_stats["hp"], # Restore full HP on level up
pet_dict["id"]
))
def _calculate_pet_stats(self, pet_dict: Dict, level: int) -> Dict:
"""Calculate pet stats for a given level"""
# Pokémon-style stat calculation
hp = int((2 * pet_dict["base_hp"] + 31) * level / 100) + level + 10
attack = int((2 * pet_dict["base_attack"] + 31) * level / 100) + 5
defense = int((2 * pet_dict["base_defense"] + 31) * level / 100) + 5
speed = int((2 * pet_dict["base_speed"] + 31) * level / 100) + 5
return {
"hp": hp,
"attack": attack,
"defense": defense,
"speed": speed
}
async def _calculate_stat_increases(self, pet_dict: Dict, old_level: int, new_level: int) -> Dict:
"""Calculate stat increases from leveling up"""
old_stats = self._calculate_pet_stats(pet_dict, old_level)
new_stats = self._calculate_pet_stats(pet_dict, new_level)
return {
"hp": new_stats["hp"] - old_stats["hp"],
"attack": new_stats["attack"] - old_stats["attack"],
"defense": new_stats["defense"] - old_stats["defense"],
"speed": new_stats["speed"] - old_stats["speed"]
}
# Gym Battle System Methods
async def get_gyms_in_location(self, location_id: int, player_id: int = None) -> List[Dict]:
"""Get all gyms in a specific location with optional player progress"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
if player_id:
cursor = await db.execute("""
SELECT g.*, l.name as location_name,
COALESCE(pgb.victories, 0) as victories,
COALESCE(pgb.highest_difficulty, 0) as highest_difficulty,
pgb.first_victory_date
FROM gyms g
JOIN locations l ON g.location_id = l.id
LEFT JOIN player_gym_battles pgb ON g.id = pgb.gym_id AND pgb.player_id = ?
WHERE g.location_id = ?
ORDER BY g.id
""", (player_id, location_id))
else:
cursor = await db.execute("""
SELECT g.*, l.name as location_name
FROM gyms g
JOIN locations l ON g.location_id = l.id
WHERE g.location_id = ?
ORDER BY g.id
""", (location_id,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_gym_by_name(self, gym_name: str) -> Optional[Dict]:
"""Get gym details by name (case-insensitive)"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT g.*, l.name as location_name
FROM gyms g
JOIN locations l ON g.location_id = l.id
WHERE LOWER(g.name) = LOWER(?)
""", (gym_name,))
row = await cursor.fetchone()
return dict(row) if row else None
async def get_gym_by_name_in_location(self, gym_name: str, location_id: int) -> Optional[Dict]:
"""Get gym details by name in a specific location (case-insensitive)"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT g.*, l.name as location_name
FROM gyms g
JOIN locations l ON g.location_id = l.id
WHERE g.location_id = ? AND LOWER(g.name) = LOWER(?)
""", (location_id, gym_name))
row = await cursor.fetchone()
return dict(row) if row else None
async def get_gym_team(self, gym_id: int, difficulty_multiplier: float = 1.0) -> List[Dict]:
"""Get gym team with difficulty scaling applied"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT gt.*, ps.name as species_name, ps.type1, ps.type2,
ps.base_hp, ps.base_attack, ps.base_defense, ps.base_speed
FROM gym_teams gt
JOIN pet_species ps ON gt.species_id = ps.id
WHERE gt.gym_id = ?
ORDER BY gt.team_position
""", (gym_id,))
rows = await cursor.fetchall()
team = []
for row in rows:
# Apply difficulty scaling
scaled_level = int(row["base_level"] * difficulty_multiplier)
stat_multiplier = 1.0 + (difficulty_multiplier - 1.0) * 0.5 # Less aggressive stat scaling
# Calculate scaled stats
hp = int((2 * row["base_hp"] + 31) * scaled_level / 100) + scaled_level + 10
attack = int(((2 * row["base_attack"] + 31) * scaled_level / 100) + 5) * stat_multiplier
defense = int(((2 * row["base_defense"] + 31) * scaled_level / 100) + 5) * stat_multiplier
speed = int(((2 * row["base_speed"] + 31) * scaled_level / 100) + 5) * stat_multiplier
pet_data = {
"species_id": row["species_id"],
"species_name": row["species_name"],
"level": scaled_level,
"type1": row["type1"],
"type2": row["type2"],
"hp": int(hp),
"max_hp": int(hp),
"attack": int(attack),
"defense": int(defense),
"speed": int(speed),
"moves": row["move_set"].split(",") if row["move_set"] else ["Tackle", "Growl"],
"position": row["team_position"]
}
team.append(pet_data)
return team
async def get_player_gym_progress(self, player_id: int, gym_id: int) -> Optional[Dict]:
"""Get player's progress for a specific gym"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT * FROM player_gym_battles
WHERE player_id = ? AND gym_id = ?
""", (player_id, gym_id))
row = await cursor.fetchone()
return dict(row) if row else None
async def record_gym_victory(self, player_id: int, gym_id: int) -> Dict:
"""Record a gym victory and update player progress"""
async with aiosqlite.connect(self.db_path) as db:
# Get current progress
cursor = await db.execute("""
SELECT victories, first_victory_date FROM player_gym_battles
WHERE player_id = ? AND gym_id = ?
""", (player_id, gym_id))
current = await cursor.fetchone()
if current:
new_victories = current[0] + 1
await db.execute("""
UPDATE player_gym_battles
SET victories = ?, highest_difficulty = ?, last_battle_date = CURRENT_TIMESTAMP
WHERE player_id = ? AND gym_id = ?
""", (new_victories, new_victories, player_id, gym_id))
else:
new_victories = 1
await db.execute("""
INSERT INTO player_gym_battles
(player_id, gym_id, victories, highest_difficulty, first_victory_date, last_battle_date)
VALUES (?, ?, 1, 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""", (player_id, gym_id))
await db.commit()
return {
"victories": new_victories,
"is_first_victory": current is None,
"next_difficulty": new_victories + 1
}
async def initialize_gyms(self):
"""Initialize gyms from config file"""
import json
try:
with open("config/gyms.json", "r") as f:
gyms_data = json.load(f)
except FileNotFoundError:
print("Gyms config file not found")
return
async with aiosqlite.connect(self.db_path) as db:
# Clear existing gym data
await db.execute("DELETE FROM gym_teams")
await db.execute("DELETE FROM gyms")
for gym_config in gyms_data:
# Get location ID
cursor = await db.execute(
"SELECT id FROM locations WHERE name = ?",
(gym_config["location"],)
)
location_row = await cursor.fetchone()
if not location_row:
print(f"Location '{gym_config['location']}' not found for gym '{gym_config['name']}'")
continue
location_id = location_row[0]
# Insert gym
cursor = await db.execute("""
INSERT OR REPLACE INTO gyms
(location_id, name, leader_name, description, theme, badge_name, badge_icon, badge_description)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
location_id,
gym_config["name"],
gym_config["leader_name"],
gym_config["description"],
gym_config["theme"],
gym_config["badge"]["name"],
gym_config["badge"]["icon"],
gym_config["badge"]["description"]
))
gym_id = cursor.lastrowid
# Insert gym team
for member in gym_config["team"]:
# Get species ID
species_cursor = await db.execute(
"SELECT id FROM pet_species WHERE name = ?",
(member["species"],)
)
species_row = await species_cursor.fetchone()
if species_row:
moves_str = ",".join(member["moves"]) if member["moves"] else ""
await db.execute("""
INSERT INTO gym_teams
(gym_id, species_id, base_level, move_set, team_position)
VALUES (?, ?, ?, ?, ?)
""", (
gym_id,
species_row[0],
member["base_level"],
moves_str,
member["position"]
))
await db.commit()
print("Gyms initialized from config")
async def start_gym_battle(self, player_id: int, gym_id: int, difficulty_level: int, gym_team: List[Dict]) -> int:
"""Start a new gym battle"""
import json
gym_team_json = json.dumps(gym_team)
async with aiosqlite.connect(self.db_path) as db:
# End any existing gym battle for this player
await db.execute("""
UPDATE active_gym_battles
SET battle_status = 'ended'
WHERE player_id = ? AND battle_status = 'active'
""", (player_id,))
# Create new gym battle
cursor = await db.execute("""
INSERT INTO active_gym_battles
(player_id, gym_id, difficulty_level, current_pet_index, gym_team_data)
VALUES (?, ?, ?, 0, ?)
""", (player_id, gym_id, difficulty_level, gym_team_json))
battle_id = cursor.lastrowid
await db.commit()
return battle_id
async def get_active_gym_battle(self, player_id: int) -> Optional[Dict]:
"""Get player's active gym battle"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT agb.*, g.name as gym_name, g.leader_name, g.badge_icon
FROM active_gym_battles agb
JOIN gyms g ON agb.gym_id = g.id
WHERE agb.player_id = ? AND agb.battle_status = 'active'
ORDER BY agb.id DESC LIMIT 1
""", (player_id,))
row = await cursor.fetchone()
if row:
battle_data = dict(row)
# Parse gym team data
import json
battle_data["gym_team"] = json.loads(battle_data["gym_team_data"])
return battle_data
return None
async def advance_gym_battle(self, player_id: int) -> bool:
"""Advance to next pet in gym battle"""
async with aiosqlite.connect(self.db_path) as db:
# Get current battle
cursor = await db.execute("""
SELECT current_pet_index, gym_team_data
FROM active_gym_battles
WHERE player_id = ? AND battle_status = 'active'
""", (player_id,))
battle = await cursor.fetchone()
if not battle:
return False
import json
gym_team = json.loads(battle[1])
current_index = battle[0]
# Check if there are more pets
if current_index + 1 >= len(gym_team):
return False # No more pets
# Advance to next pet
await db.execute("""
UPDATE active_gym_battles
SET current_pet_index = current_pet_index + 1
WHERE player_id = ? AND battle_status = 'active'
""", (player_id,))
await db.commit()
return True
async def end_gym_battle(self, player_id: int, victory: bool = False) -> Optional[Dict]:
"""End gym battle and return final status"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Get battle info before ending it
cursor = await db.execute("""
SELECT agb.gym_id, agb.difficulty_level, g.name as gym_name
FROM active_gym_battles agb
JOIN gyms g ON agb.gym_id = g.id
WHERE agb.player_id = ? AND agb.battle_status = 'active'
""", (player_id,))
battle = await cursor.fetchone()
if not battle:
return None
battle_dict = dict(battle)
# End the battle
await db.execute("""
UPDATE active_gym_battles
SET battle_status = 'completed'
WHERE player_id = ? AND battle_status = 'active'
""", (player_id,))
result = {
"gym_id": battle_dict["gym_id"],
"gym_name": battle_dict["gym_name"],
"difficulty_level": battle_dict["difficulty_level"],
"victory": victory
}
# Record victory if successful
if victory:
await self.record_gym_victory(player_id, battle_dict["gym_id"])
await db.commit()
return result
async def record_encounter(self, player_id: int, species_name: str, was_caught: bool = False) -> bool:
"""Record a player encountering a pet species"""
try:
async with aiosqlite.connect(self.db_path) as db:
# Get species ID
cursor = await db.execute(
"SELECT id FROM pet_species WHERE name = ?", (species_name,)
)
species_row = await cursor.fetchone()
if not species_row:
return False
species_id = species_row[0]
# Check if encounter already exists
cursor = await db.execute("""
SELECT total_encounters, caught_count FROM player_encounters
WHERE player_id = ? AND species_id = ?
""", (player_id, species_id))
existing = await cursor.fetchone()
if existing:
# Update existing encounter
new_total = existing[0] + 1
new_caught = existing[1] + (1 if was_caught else 0)
await db.execute("""
UPDATE player_encounters
SET total_encounters = ?, last_encounter_date = CURRENT_TIMESTAMP, caught_count = ?
WHERE player_id = ? AND species_id = ?
""", (new_total, new_caught, player_id, species_id))
else:
# Create new encounter record
caught_count = 1 if was_caught else 0
await db.execute("""
INSERT INTO player_encounters (player_id, species_id, caught_count)
VALUES (?, ?, ?)
""", (player_id, species_id, caught_count))
await db.commit()
return True
except Exception as e:
print(f"Error recording encounter: {e}")
return False
async def get_player_encounters(self, player_id: int) -> List[Dict]:
"""Get all encounters for a player"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT pe.*, ps.name as species_name, ps.type1, ps.type2, ps.rarity
FROM player_encounters pe
JOIN pet_species ps ON pe.species_id = ps.id
WHERE pe.player_id = ?
ORDER BY pe.first_encounter_date ASC
""", (player_id,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_encounter_stats(self, player_id: int) -> Dict:
"""Get encounter statistics for a player"""
async with aiosqlite.connect(self.db_path) as db:
# Total species encountered
cursor = await db.execute("""
SELECT COUNT(*) FROM player_encounters WHERE player_id = ?
""", (player_id,))
species_encountered = (await cursor.fetchone())[0]
# Total encounters
cursor = await db.execute("""
SELECT SUM(total_encounters) FROM player_encounters WHERE player_id = ?
""", (player_id,))
total_encounters_result = await cursor.fetchone()
total_encounters = total_encounters_result[0] if total_encounters_result[0] else 0
# Total species available
cursor = await db.execute("""
SELECT COUNT(*) FROM pet_species
""")
total_species = (await cursor.fetchone())[0]
# Calculate completion percentage
completion_percentage = (species_encountered / total_species * 100) if total_species > 0 else 0
return {
"species_encountered": species_encountered,
"total_encounters": total_encounters,
"total_species": total_species,
"completion_percentage": round(completion_percentage, 1)
}
async def set_pet_nickname(self, player_id: int, pet_identifier: str, nickname: str) -> Dict:
"""Set a nickname for a pet. Returns result dict."""
# Validate nickname
if not nickname.strip():
return {"success": False, "error": "Nickname cannot be empty"}
nickname = nickname.strip()
if len(nickname) > 20:
return {"success": False, "error": "Nickname must be 20 characters or less"}
# Basic content validation
if any(char in nickname for char in ['<', '>', '&', '"', "'"]):
return {"success": False, "error": "Nickname contains invalid characters"}
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Find pet by current nickname or species name
cursor = await db.execute("""
SELECT p.*, ps.name as species_name
FROM pets p
JOIN pet_species ps ON p.species_id = ps.id
WHERE p.player_id = ?
AND (p.nickname = ? OR ps.name = ?)
LIMIT 1
""", (player_id, pet_identifier, pet_identifier))
pet = await cursor.fetchone()
if not pet:
return {"success": False, "error": f"Pet '{pet_identifier}' not found"}
# Update nickname
await db.execute("UPDATE pets SET nickname = ? WHERE id = ?", (nickname, pet["id"]))
await db.commit()
return {
"success": True,
"pet": dict(pet),
"old_name": pet["nickname"] or pet["species_name"],
"new_nickname": nickname
}
# Team Builder PIN System Methods
async def generate_verification_pin(self, player_id: int, request_type: str, request_data: str = None) -> Dict:
"""Generate a secure PIN for verification. Returns PIN code and expiration."""
import secrets
import string
from datetime import datetime, timedelta
# Generate cryptographically secure 6-digit PIN
pin_code = ''.join(secrets.choice(string.digits) for _ in range(6))
# PIN expires in 10 minutes
expires_at = datetime.now() + timedelta(minutes=10)
async with aiosqlite.connect(self.db_path) as db:
# Clear any existing unused PINs for this player and request type
await db.execute("""
UPDATE verification_pins
SET is_used = TRUE, used_at = CURRENT_TIMESTAMP
WHERE player_id = ? AND request_type = ? AND is_used = FALSE
""", (player_id, request_type))
# Insert new PIN
cursor = await db.execute("""
INSERT INTO verification_pins
(player_id, pin_code, request_type, request_data, expires_at)
VALUES (?, ?, ?, ?, ?)
""", (player_id, pin_code, request_type, request_data, expires_at.isoformat()))
await db.commit()
pin_id = cursor.lastrowid
return {
"success": True,
"pin_id": pin_id,
"pin_code": pin_code,
"expires_at": expires_at,
"expires_in_minutes": 10
}
async def verify_pin(self, player_id: int, pin_code: str, request_type: str) -> Dict:
"""Verify a PIN code and return request data if valid."""
from datetime import datetime
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Find valid PIN
cursor = await db.execute("""
SELECT * FROM verification_pins
WHERE player_id = ? AND pin_code = ? AND request_type = ?
AND is_used = FALSE AND expires_at > datetime('now')
ORDER BY created_at DESC LIMIT 1
""", (player_id, pin_code, request_type))
pin_record = await cursor.fetchone()
if not pin_record:
return {"success": False, "error": "Invalid or expired PIN"}
# Mark PIN as used
await db.execute("""
UPDATE verification_pins
SET is_used = TRUE, used_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (pin_record["id"],))
await db.commit()
return {
"success": True,
"pin_id": pin_record["id"],
"request_data": pin_record["request_data"],
"created_at": pin_record["created_at"]
}
async def create_pending_team_change(self, player_id: int, new_team_data: str) -> Dict:
"""Create a pending team change request and generate PIN."""
from datetime import datetime, timedelta
import json
# Validate team data is valid JSON
try:
json.loads(new_team_data)
except json.JSONDecodeError:
return {"success": False, "error": "Invalid team data format"}
# Generate PIN for this request
pin_result = await self.generate_verification_pin(
player_id, "team_change", new_team_data
)
if not pin_result["success"]:
return pin_result
# Store pending change
expires_at = datetime.now() + timedelta(minutes=10)
async with aiosqlite.connect(self.db_path) as db:
# Clear any existing pending changes for this player
await db.execute("""
DELETE FROM pending_team_changes
WHERE player_id = ? AND is_verified = FALSE
""", (player_id,))
# Insert new pending change
cursor = await db.execute("""
INSERT INTO pending_team_changes
(player_id, new_team_data, verification_pin, expires_at)
VALUES (?, ?, ?, ?)
""", (player_id, new_team_data, pin_result["pin_code"], expires_at.isoformat()))
await db.commit()
change_id = cursor.lastrowid
return {
"success": True,
"change_id": change_id,
"pin_code": pin_result["pin_code"],
"expires_at": expires_at,
"expires_in_minutes": 10
}
async def apply_team_change(self, player_id: int, pin_code: str) -> Dict:
"""Apply pending team change after PIN verification."""
import json
from datetime import datetime
# Verify PIN
pin_result = await self.verify_pin(player_id, pin_code, "team_change")
if not pin_result["success"]:
return pin_result
# Get team data from request
new_team_data = pin_result["request_data"]
if not new_team_data:
return {"success": False, "error": "No team data found for this PIN"}
try:
team_changes = json.loads(new_team_data)
except json.JSONDecodeError:
return {"success": False, "error": "Invalid team data format"}
# Apply team changes atomically
async with aiosqlite.connect(self.db_path) as db:
try:
# Begin transaction
await db.execute("BEGIN TRANSACTION")
# Update pet active status based on new team
for pet_id, is_active in team_changes.items():
await db.execute("""
UPDATE pets SET is_active = ?
WHERE id = ? AND player_id = ?
""", (is_active, int(pet_id), player_id))
# Mark any pending change as verified
await db.execute("""
UPDATE pending_team_changes
SET is_verified = TRUE, verified_at = CURRENT_TIMESTAMP
WHERE player_id = ? AND verification_pin = ? AND is_verified = FALSE
""", (player_id, pin_code))
await db.commit()
return {
"success": True,
"changes_applied": len(team_changes),
"verified_at": datetime.now()
}
except Exception as e:
await db.execute("ROLLBACK")
return {"success": False, "error": f"Failed to apply team changes: {str(e)}"}
async def cleanup_expired_pins(self) -> Dict:
"""Clean up expired PINs and pending changes."""
async with aiosqlite.connect(self.db_path) as db:
# Clean expired verification pins
cursor = await db.execute("""
DELETE FROM verification_pins
WHERE expires_at < datetime('now') OR is_used = TRUE
""")
pins_cleaned = cursor.rowcount
# Clean expired pending team changes
cursor = await db.execute("""
DELETE FROM pending_team_changes
WHERE expires_at < datetime('now') OR is_verified = TRUE
""")
changes_cleaned = cursor.rowcount
await db.commit()
return {
"success": True,
"pins_cleaned": pins_cleaned,
"changes_cleaned": changes_cleaned
}