diff --git a/src/database.py b/src/database.py index 8a3aea9..e69abe1 100644 --- a/src/database.py +++ b/src/database.py @@ -49,6 +49,43 @@ class Database: # Column already exists or other error, which is fine pass + # Add IV columns to pets table (migration) + iv_columns = [ + "ALTER TABLE pets ADD COLUMN iv_hp INTEGER DEFAULT 15", + "ALTER TABLE pets ADD COLUMN iv_attack INTEGER DEFAULT 15", + "ALTER TABLE pets ADD COLUMN iv_defense INTEGER DEFAULT 15", + "ALTER TABLE pets ADD COLUMN iv_speed INTEGER DEFAULT 15" + ] + + for column_sql in iv_columns: + try: + await db.execute(column_sql) + await db.commit() + except Exception: + # Column already exists or other error, which is fine + pass + + # Add future-ready columns for breeding and personality system (migration) + future_columns = [ + "ALTER TABLE pets ADD COLUMN nature TEXT DEFAULT NULL", + "ALTER TABLE pets ADD COLUMN personality_value INTEGER DEFAULT NULL", + "ALTER TABLE pets ADD COLUMN original_trainer_id INTEGER DEFAULT NULL", + "ALTER TABLE pets ADD COLUMN parent1_id INTEGER DEFAULT NULL", + "ALTER TABLE pets ADD COLUMN parent2_id INTEGER DEFAULT NULL", + "ALTER TABLE pets ADD COLUMN generation INTEGER DEFAULT 1", + "ALTER TABLE pets ADD COLUMN is_shiny BOOLEAN DEFAULT FALSE", + "ALTER TABLE pets ADD COLUMN gender TEXT DEFAULT NULL", + "ALTER TABLE pets ADD COLUMN ability TEXT DEFAULT NULL" + ] + + for column_sql in future_columns: + try: + await db.execute(column_sql) + await db.commit() + except Exception: + # Column already exists or other error, which is fine + pass + await db.execute(""" CREATE TABLE IF NOT EXISTS pets ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -65,8 +102,26 @@ class Database: happiness INTEGER DEFAULT 50, caught_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT FALSE, + -- Individual Values (IVs) for each stat (0-31) + iv_hp INTEGER DEFAULT 15, + iv_attack INTEGER DEFAULT 15, + iv_defense INTEGER DEFAULT 15, + iv_speed INTEGER DEFAULT 15, + -- Future-ready columns for breeding and personality system + nature TEXT DEFAULT NULL, + personality_value INTEGER DEFAULT NULL, + original_trainer_id INTEGER DEFAULT NULL, + parent1_id INTEGER DEFAULT NULL, + parent2_id INTEGER DEFAULT NULL, + generation INTEGER DEFAULT 1, + is_shiny BOOLEAN DEFAULT FALSE, + gender TEXT DEFAULT NULL, + ability TEXT DEFAULT NULL, FOREIGN KEY (player_id) REFERENCES players (id), - FOREIGN KEY (species_id) REFERENCES pet_species (id) + FOREIGN KEY (species_id) REFERENCES pet_species (id), + FOREIGN KEY (original_trainer_id) REFERENCES players (id), + FOREIGN KEY (parent1_id) REFERENCES pets (id), + FOREIGN KEY (parent2_id) REFERENCES pets (id) ) """) @@ -365,6 +420,21 @@ class Database: ) """) + await db.execute(""" + CREATE TABLE IF NOT EXISTS pet_moves ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pet_id INTEGER NOT NULL, + move_name TEXT NOT NULL, + power_iv INTEGER DEFAULT 0, + accuracy_iv INTEGER DEFAULT 0, + pp_iv INTEGER DEFAULT 0, + learned_at_level INTEGER DEFAULT 1, + is_signature BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (pet_id) REFERENCES pets (id) ON DELETE CASCADE + ) + """) + await db.execute(""" CREATE TABLE IF NOT EXISTS npc_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -452,6 +522,17 @@ class Database: ) """) + # Migration: Add columns for pet rename functionality + try: + await db.execute("ALTER TABLE pending_team_changes ADD COLUMN action_type TEXT DEFAULT 'team_change'") + await db.execute("ALTER TABLE pending_team_changes ADD COLUMN pet_id INTEGER") + await db.execute("ALTER TABLE pending_team_changes ADD COLUMN new_nickname TEXT") + await db.commit() + print("Added pet rename columns to pending_team_changes table") + except Exception: + # Columns already exist or other error, which is fine + pass + # Create indexes for performance optimization await db.execute("CREATE INDEX IF NOT EXISTS idx_pets_player_active ON pets (player_id, is_active)") await db.execute("CREATE INDEX IF NOT EXISTS idx_pets_species ON pets (species_id)") @@ -489,6 +570,33 @@ class Database: row = await cursor.fetchone() return dict(row) if row else None + async def update_player_admin(self, player_id: int, updates: Dict) -> bool: + """Update player data for admin purposes""" + try: + if not updates: + return False + + # Build dynamic SQL query for provided fields + set_clauses = [] + values = [] + + for field, value in updates.items(): + set_clauses.append(f"{field} = ?") + values.append(value) + + values.append(player_id) # Add player_id for WHERE clause + + sql = f"UPDATE players SET {', '.join(set_clauses)} WHERE id = ?" + + async with aiosqlite.connect(self.db_path) as db: + await db.execute(sql, values) + await db.commit() + return True + + except Exception as e: + print(f"Error updating player admin data: {e}") + return False + async def create_player(self, nickname: str) -> int: async with aiosqlite.connect(self.db_path) as db: # Get Starter Town ID @@ -554,6 +662,16 @@ class Database: row = await cursor.fetchone() return dict(row) if row else None + async def get_location_by_id(self, location_id: int) -> Optional[Dict]: + """Get location by ID""" + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute( + "SELECT * FROM locations WHERE id = ?", (location_id,) + ) + row = await cursor.fetchone() + return dict(row) if row else None + async def get_all_locations(self) -> List[Dict]: """Get all locations""" async with aiosqlite.connect(self.db_path) as db: @@ -1205,12 +1323,18 @@ class Database: # NOTE: _handle_level_up function removed - now handled atomically in award_experience() def _calculate_pet_stats(self, pet_dict: Dict, level: int) -> Dict: - """Calculate pet stats for a given level""" - # Pokémon-style stat calculation - hp = int((2 * pet_dict["base_hp"] + 31) * level / 100) + level + 10 - attack = int((2 * pet_dict["base_attack"] + 31) * level / 100) + 5 - defense = int((2 * pet_dict["base_defense"] + 31) * level / 100) + 5 - speed = int((2 * pet_dict["base_speed"] + 31) * level / 100) + 5 + """Calculate pet stats for a given level using stored IVs""" + # Use stored IVs, with fallback to defaults for existing pets + iv_hp = pet_dict.get("iv_hp", 15) + iv_attack = pet_dict.get("iv_attack", 15) + iv_defense = pet_dict.get("iv_defense", 15) + iv_speed = pet_dict.get("iv_speed", 15) + + # Pokemon-style stat calculation with individual IVs + hp = int((2 * pet_dict["base_hp"] + iv_hp) * level / 100) + level + 10 + attack = int((2 * pet_dict["base_attack"] + iv_attack) * level / 100) + 5 + defense = int((2 * pet_dict["base_defense"] + iv_defense) * level / 100) + 5 + speed = int((2 * pet_dict["base_speed"] + iv_speed) * level / 100) + 5 return { "hp": hp, @@ -1795,6 +1919,7 @@ class Database: async with aiosqlite.connect(self.db_path) as db: # Clear any existing pending changes for this player + # This prevents race conditions with multiple pending team changes await db.execute(""" DELETE FROM pending_team_changes WHERE player_id = ? AND is_verified = FALSE @@ -1834,7 +1959,25 @@ class Database: return {"success": False, "error": "No team data found for this PIN"} try: - team_changes = json.loads(new_team_data) + change_data = json.loads(new_team_data) + # Handle new format with team slot or old format + if isinstance(change_data, dict) and 'teamData' in change_data: + team_changes = change_data['teamData'] + team_slot = change_data.get('teamSlot', 1) + else: + # Backwards compatibility - old format + team_changes = change_data + team_slot = 1 + + # Validate team slot + if not isinstance(team_slot, int) or team_slot < 1 or team_slot > 3: + return {"success": False, "error": "Invalid team slot. Must be 1, 2, or 3"} + + # Validate team_changes is a dictionary + if not isinstance(team_changes, dict): + return {"success": False, "error": f"Invalid team changes format. Expected dict, got {type(team_changes)}"} + + except json.JSONDecodeError: return {"success": False, "error": "Invalid team data format"} @@ -1844,38 +1987,201 @@ class Database: # Begin transaction await db.execute("BEGIN TRANSACTION") - # Update pet active status and team_order based on new team - for pet_id, position in team_changes.items(): - if position: # If position is a number (1-6), pet is active - await db.execute(""" - UPDATE pets SET is_active = TRUE, team_order = ? - WHERE id = ? AND player_id = ? - """, (position, int(pet_id), player_id)) - else: # If position is False, pet is inactive - await db.execute(""" - UPDATE pets SET is_active = FALSE, team_order = NULL - WHERE id = ? AND player_id = ? - """, (int(pet_id), player_id)) - - # Mark any pending change as verified - await db.execute(""" - UPDATE pending_team_changes - SET is_verified = TRUE, verified_at = CURRENT_TIMESTAMP - WHERE player_id = ? AND verification_pin = ? AND is_verified = FALSE - """, (player_id, pin_code)) - - await db.commit() - - return { - "success": True, - "changes_applied": len(team_changes), - "verified_at": datetime.now() - } + # Handle Team 1 (Active Team) vs Teams 2-3 (Saved Configurations) differently + if team_slot == 1: + # Team 1: Apply directly to active team (immediate effect) + + # First, deactivate all pets for this player + await db.execute(""" + UPDATE pets SET is_active = FALSE, team_order = NULL + WHERE player_id = ? + """, (player_id,)) + + # Then activate and position the selected pets + for pet_id, position in team_changes.items(): + if position: # If position is a number (1-6), pet is active + await db.execute(""" + UPDATE pets SET is_active = TRUE, team_order = ? + WHERE id = ? AND player_id = ? + """, (position, int(pet_id), player_id)) + + # Mark pending change as verified + await db.execute(""" + UPDATE pending_team_changes + SET is_verified = TRUE, verified_at = CURRENT_TIMESTAMP + WHERE player_id = ? AND verification_pin = ? AND is_verified = FALSE + """, (player_id, pin_code)) + + await db.commit() + + # Count changes applied + changes_applied = sum(1 for pos in team_changes.values() if pos) + + return { + "success": True, + "message": f"Active team updated successfully", + "changes_applied": changes_applied, + "team_slot": team_slot + } + + else: + # Teams 2-3: Save as configuration only (no immediate effect on active team) + + # Prepare team configuration data + config_data = {} + for pet_id, position in team_changes.items(): + if position: # Only include pets that are in team slots + # Get pet info for the configuration + cursor = await db.execute(""" + SELECT p.*, ps.name as species_name, ps.type1, ps.type2 + FROM pets p + JOIN pet_species ps ON p.species_id = ps.id + WHERE p.id = ? AND p.player_id = ? + """, (pet_id, player_id)) + pet_row = await cursor.fetchone() + + if pet_row: + pet_dict = dict(pet_row) + config_data[str(position)] = { + 'id': pet_dict['id'], + 'name': pet_dict['nickname'] or pet_dict['species_name'], + 'level': pet_dict['level'], + 'type_primary': pet_dict['type1'], + 'hp': pet_dict['hp'], + 'max_hp': pet_dict['max_hp'] + } + + # Save team configuration + config_json = json.dumps(config_data) + await db.execute(""" + INSERT OR REPLACE INTO team_configurations + (player_id, slot_number, config_name, team_data, updated_at) + VALUES (?, ?, ?, ?, datetime('now')) + """, (player_id, team_slot, f'Team {team_slot}', config_json)) + + # Mark pending change as verified + await db.execute(""" + UPDATE pending_team_changes + SET is_verified = TRUE, verified_at = CURRENT_TIMESTAMP + WHERE player_id = ? AND verification_pin = ? AND is_verified = FALSE + """, (player_id, pin_code)) + + await db.commit() + + # Count changes applied + changes_applied = sum(1 for pos in team_changes.values() if pos) + + return { + "success": True, + "message": f"Team {team_slot} configuration saved successfully", + "changes_applied": changes_applied, + "team_slot": team_slot + } except Exception as e: - await db.execute("ROLLBACK") - return {"success": False, "error": f"Failed to apply team changes: {str(e)}"} + await db.rollback() + print(f"Database error in apply_team_change: {e}") + return {"success": False, "error": f"Database error: {str(e)}"} + async def apply_individual_team_change(self, player_id: int, pin_code: str) -> Dict: + """Apply individual team change with simplified logic""" + import json + + # Verify PIN first + pin_result = await self.verify_pin(player_id, pin_code, "team_change") + if not pin_result["success"]: + return pin_result + + # Get team data from request + new_team_data = pin_result["request_data"] + if not new_team_data: + return {"success": False, "error": "No team data found for this PIN"} + + try: + change_data = json.loads(new_team_data) + team_slot = change_data.get('teamSlot', 1) + team_changes = change_data.get('teamData', {}) + + # Simple validation + if not isinstance(team_changes, dict): + return {"success": False, "error": "Invalid team data format"} + + # Apply changes atomically + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + try: + await db.execute("BEGIN TRANSACTION") + + if team_slot == 1: + # Team 1: Update active team + await db.execute(""" + UPDATE pets SET is_active = FALSE, team_order = NULL + WHERE player_id = ? + """, (player_id,)) + + # Activate selected pets + for pet_id_str, position in team_changes.items(): + if position and str(position).isdigit(): + await db.execute(""" + UPDATE pets SET is_active = TRUE, team_order = ? + WHERE id = ? AND player_id = ? + """, (int(position), int(pet_id_str), player_id)) + + else: + # Teams 2-3: Save as configuration with full pet details + pets_list = [] + for pet_id_str, position in team_changes.items(): + if position: + # Get full pet information from database + cursor = await db.execute(""" + SELECT p.id, p.nickname, p.level, p.hp, p.max_hp, p.attack, p.defense, p.speed, p.happiness, + ps.name as species_name, ps.type1, ps.type2 + FROM pets p + JOIN pet_species ps ON p.species_id = ps.id + WHERE p.id = ? AND p.player_id = ? + """, (int(pet_id_str), player_id)) + pet_row = await cursor.fetchone() + + if pet_row: + # Convert Row object to dict properly + pet_dict = { + 'id': pet_row['id'], + 'nickname': pet_row['nickname'], + 'level': pet_row['level'], + 'hp': pet_row['hp'], + 'max_hp': pet_row['max_hp'], + 'attack': pet_row['attack'], + 'defense': pet_row['defense'], + 'speed': pet_row['speed'], + 'happiness': pet_row['happiness'], + 'species_name': pet_row['species_name'], + 'type1': pet_row['type1'], + 'type2': pet_row['type2'], + 'team_order': int(position) + } + pets_list.append(pet_dict) + + # Save configuration in format expected by web interface + await db.execute(""" + INSERT OR REPLACE INTO team_configurations + (player_id, slot_number, config_name, team_data, updated_at) + VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) + """, (player_id, team_slot, f"Team {team_slot}", json.dumps(pets_list))) + + await db.commit() + return {"success": True, "message": f"Team {team_slot} saved successfully"} + + except Exception as e: + await db.execute("ROLLBACK") + print(f"Database error in apply_individual_team_change: {e}") + return {"success": False, "error": f"Database error: {str(e)}"} + + except json.JSONDecodeError: + return {"success": False, "error": "Invalid team data format"} + except Exception as e: + print(f"Error in apply_individual_team_change: {e}") + return {"success": False, "error": str(e)} + async def cleanup_expired_pins(self) -> Dict: """Clean up expired PINs and pending changes.""" async with aiosqlite.connect(self.db_path) as db: @@ -1994,6 +2300,39 @@ class Database: return {"valid": True, "active_count": active_count} + async def get_active_team(self, player_id: int) -> Dict: + """Get active team pets with their positions""" + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute(""" + SELECT + p.id, p.nickname, ps.name as species_name, p.level, p.hp, p.max_hp, p.team_order, + ps.type1, ps.type2, p.attack, p.defense, p.speed + FROM pets p + JOIN pet_species ps ON p.species_id = ps.id + WHERE p.player_id = ? AND p.is_active = TRUE + ORDER BY p.team_order ASC + """, (player_id,)) + + pets = await cursor.fetchall() + team_dict = {} + + for pet in pets: + team_dict[str(pet[6])] = { # team_order as key + 'id': pet[0], + 'name': pet[1] or pet[2], # nickname or species_name + 'species_name': pet[2], + 'level': pet[3], + 'hp': pet[4], + 'max_hp': pet[5], + 'type_primary': pet[7], + 'type_secondary': pet[8], + 'attack': pet[9], + 'defense': pet[10], + 'speed': pet[11] + } + + return team_dict + async def get_team_composition(self, player_id: int) -> Dict: """Get current team composition stats""" async with aiosqlite.connect(self.db_path) as db: @@ -2071,8 +2410,8 @@ class Database: async def set_weather_for_location(self, location_name: str, weather_type: str, active_until: str, spawn_modifier: float, - affected_types: str) -> bool: - """Set weather for a specific location""" + affected_types: str) -> dict: + """Set weather for a specific location and return previous weather info""" try: async with aiosqlite.connect(self.db_path) as db: # Get location ID @@ -2080,10 +2419,14 @@ class Database: location_row = await cursor.fetchone() if not location_row: - return False + return {"success": False, "error": "Location not found"} location_id = location_row[0] + # Get current weather before changing it + previous_weather = await self.get_location_weather_by_name(location_name) + previous_weather_type = previous_weather["weather_type"] if previous_weather else "calm" + # Clear existing weather for this location await db.execute("DELETE FROM location_weather WHERE location_id = ?", (location_id,)) @@ -2095,10 +2438,17 @@ class Database: """, (location_id, weather_type, active_until, spawn_modifier, affected_types)) await db.commit() - return True + + return { + "success": True, + "location": location_name, + "previous_weather": previous_weather_type, + "new_weather": weather_type, + "changed": previous_weather_type != weather_type + } except Exception as e: print(f"Error setting weather for location {location_name}: {e}") - return False + return {"success": False, "error": str(e)} # Team Configuration Methods async def save_team_configuration(self, player_id: int, slot_number: int, config_name: str, team_data: str) -> bool: @@ -2175,6 +2525,204 @@ class Database: print(f"Error renaming team configuration: {e}") return False + async def get_player_team_configurations(self, player_id: int) -> List[Dict]: + """Get all team configurations for a player with team data""" + try: + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + + configs = [] + for slot in range(1, 4): # Slots 1, 2, 3 + cursor = await db.execute(""" + SELECT config_name, team_data, updated_at + FROM team_configurations + WHERE player_id = ? AND slot_number = ? + """, (player_id, slot)) + + row = await cursor.fetchone() + if row: + import json + team_data = json.loads(row['team_data']) + configs.append({ + 'slot': slot, + 'name': row['config_name'], + 'team_data': team_data, + 'updated_at': row['updated_at'], + 'pet_count': len([p for p in team_data.values() if p]) + }) + else: + configs.append({ + 'slot': slot, + 'name': f'Team {slot}', + 'team_data': {}, + 'updated_at': None, + 'pet_count': 0 + }) + + return configs + + except Exception as e: + print(f"Error getting player team configurations: {e}") + return [] + + async def apply_team_configuration(self, player_id: int, slot_number: int) -> Dict: + """Apply a saved team configuration to the player's active team""" + try: + # Load the team configuration + config = await self.load_team_configuration(player_id, slot_number) + if not config: + return {"success": False, "error": f"No team configuration found in slot {slot_number}"} + + import json + team_data = json.loads(config["team_data"]) + + async with aiosqlite.connect(self.db_path) as db: + # First, deactivate all pets for this player + await db.execute(""" + UPDATE pets + SET is_active = FALSE, team_order = NULL + WHERE player_id = ? + """, (player_id,)) + + # Apply the saved team configuration + for position, pet_info in team_data.items(): + if pet_info and 'id' in pet_info: + pet_id = pet_info['id'] + team_order = int(position) # position should be 1-6 + + # Activate the pet and set its team position + await db.execute(""" + UPDATE pets + SET is_active = TRUE, team_order = ? + WHERE id = ? AND player_id = ? + """, (team_order, pet_id, player_id)) + + await db.commit() + + return { + "success": True, + "message": f"Applied team configuration '{config['config_name']}' to active team", + "config_name": config['config_name'] + } + + except Exception as e: + print(f"Error applying team configuration: {e}") + return {"success": False, "error": str(e)} + + # Pet Rename System Methods + async def request_pet_rename(self, player_id: int, pet_id: int, new_nickname: str) -> Dict: + """Request to rename a pet with PIN verification""" + try: + # Input validation + if not new_nickname or len(new_nickname.strip()) == 0: + return {"success": False, "error": "Pet nickname cannot be empty"} + + new_nickname = new_nickname.strip() + if len(new_nickname) > 20: + return {"success": False, "error": "Pet nickname must be 20 characters or less"} + + # Check for profanity/inappropriate content (basic check) + inappropriate_words = ["admin", "bot", "system", "null", "undefined"] + if any(word in new_nickname.lower() for word in inappropriate_words): + return {"success": False, "error": "Pet nickname contains inappropriate content"} + + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + + # Verify pet ownership + cursor = await db.execute(""" + SELECT id, nickname, species_id FROM pets + WHERE id = ? AND player_id = ? + """, (pet_id, player_id)) + + pet = await cursor.fetchone() + if not pet: + return {"success": False, "error": "Pet not found or not owned by player"} + + # Check if new nickname is same as current + if pet["nickname"] == new_nickname: + return {"success": False, "error": "New nickname is the same as current nickname"} + + # Check for duplicate nicknames within player's pets + cursor = await db.execute(""" + SELECT id FROM pets + WHERE player_id = ? AND nickname = ? AND id != ? + """, (player_id, new_nickname, pet_id)) + + duplicate = await cursor.fetchone() + if duplicate: + return {"success": False, "error": "You already have a pet with this nickname"} + + # Generate PIN with 15-second timeout + pin_result = await self.generate_verification_pin(player_id, "pet_rename", f"{pet_id}:{new_nickname}") + if not pin_result["success"]: + return {"success": False, "error": "Failed to generate verification PIN"} + + return { + "success": True, + "pin": pin_result["pin_code"], + "expires_at": pin_result["expires_at"], + "pet_id": pet_id, + "new_nickname": new_nickname + } + + except Exception as e: + print(f"Error requesting pet rename: {e}") + return {"success": False, "error": f"Database error: {str(e)}"} + + async def verify_pet_rename(self, player_id: int, pin_code: str) -> Dict: + """Verify PIN and complete pet rename""" + try: + # Use the new PIN verification system + pin_result = await self.verify_pin(player_id, pin_code, "pet_rename") + if not pin_result["success"]: + return pin_result + + # Parse the request data to get pet_id and new_nickname + request_data = pin_result["request_data"] + try: + pet_id, new_nickname = request_data.split(":", 1) + pet_id = int(pet_id) + except (ValueError, IndexError): + return {"success": False, "error": "Invalid request data format"} + + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + + # Verify pet ownership and update nickname + cursor = await db.execute(""" + UPDATE pets SET nickname = ? WHERE id = ? AND player_id = ? + """, (new_nickname, pet_id, player_id)) + + if cursor.rowcount == 0: + return {"success": False, "error": "Pet not found or permission denied"} + + await db.commit() + + return { + "success": True, + "new_nickname": new_nickname + } + + except Exception as e: + print(f"Error verifying pet rename: {e}") + return {"success": False, "error": "Database error occurred"} + + async def get_player_pets_for_rename(self, player_id: int) -> List[Dict]: + """Get all pets for a player with rename information""" + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute(""" + SELECT p.*, ps.name as species_name, ps.emoji, ps.type1, ps.type2 + FROM pets p + JOIN pet_species ps ON p.species_id = ps.id + WHERE p.player_id = ? + ORDER BY p.is_active DESC, p.team_order ASC, p.level DESC + """, (player_id,)) + + rows = await cursor.fetchall() + return [dict(row) for row in rows] + # NPC Events System Methods async def create_npc_event(self, event_data: Dict) -> int: """Create a new NPC event""" @@ -2472,7 +3020,7 @@ class Database: async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" - SELECT p.*, ps.name as species_name, ps.emoji + SELECT p.*, ps.name as species_name, ps.emoji, ps.type1, ps.type2 FROM pets p JOIN pet_species ps ON p.species_id = ps.id WHERE p.player_id = ? AND p.is_active = 1 AND p.fainted_at IS NULL @@ -2489,7 +3037,7 @@ class Database: if active_only: cursor = await db.execute(""" - SELECT p.*, ps.name as species_name, ps.emoji + SELECT p.*, ps.name as species_name, ps.emoji, ps.type1, ps.type2 FROM pets p JOIN pet_species ps ON p.species_id = ps.id WHERE p.player_id = ? AND p.is_active = 1 AND p.fainted_at IS NULL @@ -2497,7 +3045,7 @@ class Database: """, (player_id,)) else: cursor = await db.execute(""" - SELECT p.*, ps.name as species_name, ps.emoji + SELECT p.*, ps.name as species_name, ps.emoji, ps.type1, ps.type2 FROM pets p JOIN pet_species ps ON p.species_id = ps.id WHERE p.player_id = ? @@ -2505,4 +3053,237 @@ class Database: """, (player_id,)) rows = await cursor.fetchall() - return [dict(row) for row in rows] \ No newline at end of file + return [dict(row) for row in rows] + + # Pet Moves System Methods + async def get_pet_moves(self, pet_id: int) -> List[Dict]: + """Get all moves for a specific pet""" + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute(""" + SELECT pm.*, m.type, m.category, m.power as base_power, + m.accuracy as base_accuracy, m.pp as base_pp, m.description + FROM pet_moves pm + JOIN moves m ON pm.move_name = m.name + WHERE pm.pet_id = ? + ORDER BY pm.learned_at_level, pm.id + """, (pet_id,)) + + moves = await cursor.fetchall() + + # Calculate actual stats with IVs + result = [] + for move in moves: + move_dict = dict(move) + move_dict['actual_power'] = max(1, move_dict['base_power'] + move_dict['power_iv']) + move_dict['actual_accuracy'] = max(10, min(100, move_dict['base_accuracy'] + move_dict['accuracy_iv'])) + move_dict['actual_pp'] = max(1, move_dict['base_pp'] + move_dict['pp_iv']) + result.append(move_dict) + + return result + + async def add_pet_move(self, pet_id: int, move_name: str, power_iv: int = 0, + accuracy_iv: int = 0, pp_iv: int = 0, learned_at_level: int = 1) -> bool: + """Add a move to a pet""" + try: + async with aiosqlite.connect(self.db_path) as db: + # Check if pet already has this move + cursor = await db.execute(""" + SELECT COUNT(*) FROM pet_moves + WHERE pet_id = ? AND move_name = ? + """, (pet_id, move_name)) + + if (await cursor.fetchone())[0] > 0: + return False # Pet already has this move + + # Add the move + await db.execute(""" + INSERT INTO pet_moves + (pet_id, move_name, power_iv, accuracy_iv, pp_iv, learned_at_level) + VALUES (?, ?, ?, ?, ?, ?) + """, (pet_id, move_name, power_iv, accuracy_iv, pp_iv, learned_at_level)) + + await db.commit() + return True + + except Exception as e: + print(f"Error adding pet move: {e}") + return False + + async def remove_pet_move(self, pet_id: int, move_name: str) -> bool: + """Remove a move from a pet""" + try: + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute(""" + DELETE FROM pet_moves + WHERE pet_id = ? AND move_name = ? + """, (pet_id, move_name)) + + await db.commit() + return cursor.rowcount > 0 + + except Exception as e: + print(f"Error removing pet move: {e}") + return False + + async def clear_pet_moves(self, pet_id: int) -> bool: + """Remove all moves from a pet""" + try: + async with aiosqlite.connect(self.db_path) as db: + await db.execute("DELETE FROM pet_moves WHERE pet_id = ?", (pet_id,)) + await db.commit() + return True + + except Exception as e: + print(f"Error clearing pet moves: {e}") + return False + + async def generate_pet_moves(self, pet_id: int, species_type: str, level: int = 1, + rarity: int = 1, is_starter: bool = False) -> bool: + """Generate random moves for a pet based on type and configuration""" + try: + import json + import random + + # Load configuration files + with open("config/move_pools.json", "r") as f: + move_pools = json.load(f) + + with open("config/move_generation.json", "r") as f: + generation_config = json.load(f) + + # Get type-specific move pool + type_pool = move_pools.get(species_type, move_pools.get("Normal", {})) + if not type_pool: + print(f"No move pool found for type {species_type}") + return False + + # Get generation settings + iv_config = generation_config["iv_system"] + level_config = generation_config["level_scaling"] + rarity_config = generation_config["rarity_bonuses"].get(str(rarity), {"bonus_moves": 0, "rare_move_multiplier": 1.0, "iv_bonus": 0}) + + # Determine move count + min_moves, max_moves = type_pool["move_count_range"] + move_count = random.randint(min_moves, max_moves) + + # Add rarity bonus moves + move_count += rarity_config.get("bonus_moves", 0) + move_count = min(move_count, generation_config["balance_settings"]["max_moves_per_pet"]) + + # Generate moves + selected_moves = set() + weights = type_pool["weights"] + + # Ensure basic moves for starters + if is_starter: + starter_rules = generation_config["generation_rules"]["starter_pets"] + basic_moves = type_pool.get("basic_moves", []) + guaranteed_basic = min(starter_rules["guaranteed_basic_moves"], len(basic_moves)) + + for i in range(guaranteed_basic): + if basic_moves and len(selected_moves) < move_count: + move = random.choice(basic_moves) + selected_moves.add(move) + + # Add guaranteed type move for starters + type_moves = type_pool.get("type_moves", []) + if type_moves and len(selected_moves) < move_count: + move = random.choice(type_moves) + selected_moves.add(move) + + # Fill remaining slots + all_possible_moves = [] + + # Add basic moves + for move in type_pool.get("basic_moves", []): + if random.random() < weights.get("basic", 0.8): + all_possible_moves.append(move) + + # Add type moves + for move in type_pool.get("type_moves", []): + if random.random() < weights.get("type", 0.4): + all_possible_moves.append(move) + + # Add rare moves (with level and rarity requirements) + if level >= level_config["rare_move_unlock"]["base_level"]: + rare_chance = weights.get("rare", 0.1) * rarity_config.get("rare_move_multiplier", 1.0) + for move in type_pool.get("rare_moves", []): + if random.random() < rare_chance: + all_possible_moves.append(move) + + # Randomly select remaining moves + while len(selected_moves) < move_count and all_possible_moves: + move = random.choice(all_possible_moves) + selected_moves.add(move) + all_possible_moves.remove(move) + + # Ensure at least one move + if not selected_moves: + fallback_moves = move_pools.get("_settings", {}).get("fallback_moves", ["Tackle"]) + selected_moves.add(random.choice(fallback_moves)) + + # Add moves to database with IVs + for move_name in selected_moves: + # Generate IVs + iv_ranges = iv_config["ranges"] + iv_bonus = rarity_config.get("iv_bonus", 0) + + power_iv = random.randint(iv_ranges["power"]["min"], iv_ranges["power"]["max"]) + iv_bonus + accuracy_iv = random.randint(iv_ranges["accuracy"]["min"], iv_ranges["accuracy"]["max"]) + (iv_bonus // 3) + pp_iv = random.randint(iv_ranges["pp"]["min"], iv_ranges["pp"]["max"]) + (iv_bonus // 2) + + # Clamp IVs to reasonable ranges + power_iv = max(-15, min(25, power_iv)) + accuracy_iv = max(-10, min(15, accuracy_iv)) + pp_iv = max(-10, min(20, pp_iv)) + + await self.add_pet_move(pet_id, move_name, power_iv, accuracy_iv, pp_iv, level) + + if generation_config["admin_controls"]["log_move_generation"]: + print(f"Generated {len(selected_moves)} moves for pet {pet_id}: {list(selected_moves)}") + + return True + + except Exception as e: + print(f"Error generating pet moves: {e}") + return False + + async def migrate_existing_pets_to_move_system(self) -> bool: + """Migrate existing pets to the new move system""" + try: + print("🔄 Migrating existing pets to new move system...") + + # Get all pets that don't have moves yet + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute(""" + SELECT p.id, p.level, ps.type1, ps.rarity, ps.name as species_name + FROM pets p + JOIN pet_species ps ON p.species_id = ps.id + WHERE p.id NOT IN (SELECT DISTINCT pet_id FROM pet_moves) + """) + + pets_to_migrate = await cursor.fetchall() + + migrated_count = 0 + for pet in pets_to_migrate: + success = await self.generate_pet_moves( + pet["id"], + pet["type1"], + pet["level"], + pet["rarity"] or 1, + False # Not a starter + ) + + if success: + migrated_count += 1 + if migrated_count % 10 == 0: + print(f" Migrated {migrated_count}/{len(pets_to_migrate)} pets...") + + print(f"✅ Successfully migrated {migrated_count} pets to new move system") + return True + + except Exception as e: + print(f"Error migrating pets to move system: {e}") + return False \ No newline at end of file diff --git a/src/team_management.py b/src/team_management.py new file mode 100644 index 0000000..af68f0e --- /dev/null +++ b/src/team_management.py @@ -0,0 +1,395 @@ +#!/usr/bin/env python3 +""" +Team Management Service for PetBot +Handles team swapping, individual team editing, and team selection hub functionality. +""" + +import asyncio +import json +from typing import Dict, List, Optional + + +class TeamManagementService: + """Service for managing player teams and team swapping operations.""" + + def __init__(self, database, pin_service): + self.database = database + self.pin_service = pin_service + + async def get_team_overview(self, player_id: int) -> Dict: + """Get overview of all teams for a player.""" + try: + # Get active team + active_team = await self.database.get_active_team(player_id) + + # Get saved team configurations + team_configs = await self.database.get_player_team_configurations(player_id) + + # Structure the data + teams = { + "active": { + "pets": active_team, + "count": len(active_team), + "is_active": True + } + } + + # Add saved configurations + for i in range(1, 4): + config = next((c for c in team_configs if c.get("slot") == i), None) + if config: + # team_data is already parsed by get_player_team_configurations + team_data = config["team_data"] if config["team_data"] else {} + teams[f"slot_{i}"] = { + "name": config.get("name", f"Team {i}"), + "pets": team_data, + "count": len(team_data), + "last_updated": config.get("updated_at"), + "is_active": False + } + else: + teams[f"slot_{i}"] = { + "name": f"Team {i}", + "pets": {}, + "count": 0, + "last_updated": None, + "is_active": False + } + + return {"success": True, "teams": teams} + + except Exception as e: + return {"success": False, "error": f"Failed to get team overview: {str(e)}"} + + async def request_team_swap(self, player_id: int, nickname: str, source_slot: int) -> Dict: + """Request to swap a saved team configuration to active team.""" + try: + # Validate slot + if source_slot < 1 or source_slot > 3: + return {"success": False, "error": "Invalid team slot. Must be 1, 2, or 3"} + + # Get the source team configuration + config = await self.database.load_team_configuration(player_id, source_slot) + if not config: + return {"success": False, "error": f"No team configuration found in slot {source_slot}"} + + # Parse team data + team_data = json.loads(config["team_data"]) + if not team_data: + return {"success": False, "error": f"Team {source_slot} is empty"} + + # Request PIN verification for team swap + pin_request = await self.pin_service.request_verification( + player_id=player_id, + nickname=nickname, + action_type="team_swap", + action_data={ + "source_slot": source_slot, + "team_data": team_data, + "config_name": config["config_name"] + }, + expiration_minutes=10 + ) + + if pin_request["success"]: + return { + "success": True, + "message": f"PIN sent to confirm swapping {config['config_name']} to active team", + "source_slot": source_slot, + "team_name": config["config_name"], + "expires_in_minutes": pin_request["expires_in_minutes"] + } + else: + return pin_request + + except Exception as e: + return {"success": False, "error": f"Failed to request team swap: {str(e)}"} + + async def verify_team_swap(self, player_id: int, pin_code: str) -> Dict: + """Verify PIN and execute team swap.""" + try: + # Define team swap callback + async def apply_team_swap_callback(player_id, action_data): + """Apply the team swap operation.""" + source_slot = action_data["source_slot"] + team_data = action_data["team_data"] + config_name = action_data["config_name"] + + # Get current active team before swapping + current_active = await self.database.get_active_team(player_id) + + # Apply the saved team as active team + result = await self.database.apply_team_configuration(player_id, source_slot) + + if result["success"]: + return { + "success": True, + "message": f"Successfully applied {config_name} as active team", + "source_slot": source_slot, + "pets_applied": len(team_data), + "previous_active_count": len(current_active) + } + else: + raise Exception(f"Failed to apply team configuration: {result.get('error', 'Unknown error')}") + + # Verify PIN and execute swap + result = await self.pin_service.verify_and_execute( + player_id=player_id, + pin_code=pin_code, + action_type="team_swap", + action_callback=apply_team_swap_callback + ) + + return result + + except Exception as e: + return {"success": False, "error": f"Team swap verification failed: {str(e)}"} + + async def get_individual_team_data(self, player_id: int, team_identifier: str) -> Dict: + """Get data for editing an individual team.""" + try: + if team_identifier == "active": + # Get active team + active_pets = await self.database.get_active_team(player_id) + return { + "success": True, + "team_type": "active", + "team_name": "Active Team", + "team_data": active_pets, + "is_active_team": True + } + else: + # Get saved team configuration + try: + slot = int(team_identifier) + if slot < 1 or slot > 3: + return {"success": False, "error": "Invalid team slot"} + except ValueError: + return {"success": False, "error": "Invalid team identifier"} + + config = await self.database.load_team_configuration(player_id, slot) + if config: + # Parse team_data - it should be a JSON string containing list of pets + try: + team_pets = json.loads(config["team_data"]) if config["team_data"] else [] + + # Ensure team_pets is a list (new format) + if isinstance(team_pets, list): + pets_data = team_pets + else: + # Handle old format - convert dict to list + pets_data = [] + if isinstance(team_pets, dict): + for position, pet_info in team_pets.items(): + if pet_info and 'pet_id' in pet_info: + # This is old format, we'll need to get full pet data + pet_data = await self._get_full_pet_data(player_id, pet_info['pet_id']) + if pet_data: + pet_data['team_order'] = int(position) + pets_data.append(pet_data) + + return { + "success": True, + "team_type": "saved", + "team_slot": slot, + "team_name": config["config_name"], + "pets": pets_data, # Use 'pets' key expected by webserver + "is_active_team": False, + "last_updated": config.get("updated_at") + } + except json.JSONDecodeError: + return { + "success": True, + "team_type": "saved", + "team_slot": slot, + "team_name": config["config_name"], + "pets": [], + "is_active_team": False, + "last_updated": config.get("updated_at") + } + else: + return { + "success": True, + "team_type": "saved", + "team_slot": slot, + "team_name": f"Team {slot}", + "pets": [], # Use 'pets' key expected by webserver + "is_active_team": False, + "last_updated": None + } + + except Exception as e: + return {"success": False, "error": f"Failed to get team data: {str(e)}"} + + async def _get_full_pet_data(self, player_id: int, pet_id: int) -> Optional[Dict]: + """Helper method to get full pet data for backward compatibility.""" + try: + import aiosqlite + async with aiosqlite.connect(self.database.db_path) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute(""" + SELECT p.id, p.nickname, p.level, p.hp, p.max_hp, p.attack, p.defense, p.speed, p.happiness, + ps.name as species_name, ps.type1, ps.type2 + FROM pets p + JOIN pet_species ps ON p.species_id = ps.id + WHERE p.id = ? AND p.player_id = ? + """, (pet_id, player_id)) + row = await cursor.fetchone() + return dict(row) if row else None + except Exception as e: + print(f"Error getting full pet data: {e}") + return None + + async def save_individual_team( + self, + player_id: int, + nickname: str, + team_identifier: str, + team_data: Dict + ) -> Dict: + """Save changes to an individual team.""" + try: + if team_identifier == "active": + # Save to active team + action_type = "active_team_change" + action_data = { + "team_type": "active", + "team_data": team_data + } + else: + # Save to team configuration slot + try: + slot = int(team_identifier) + if slot < 1 or slot > 3: + return {"success": False, "error": "Invalid team slot"} + except ValueError: + return {"success": False, "error": "Invalid team identifier"} + + action_type = f"team_{slot}_change" + action_data = { + "team_type": "saved", + "team_slot": slot, + "team_data": team_data + } + + # Request PIN verification + pin_request = await self.pin_service.request_verification( + player_id=player_id, + nickname=nickname, + action_type=action_type, + action_data=action_data, + expiration_minutes=10 + ) + + return pin_request + + except Exception as e: + return {"success": False, "error": f"Failed to save individual team: {str(e)}"} + + async def verify_individual_team_save(self, player_id: int, pin_code: str, team_identifier: str) -> Dict: + """Verify PIN and save individual team changes.""" + try: + if team_identifier == "active": + action_type = "active_team_change" + else: + try: + slot = int(team_identifier) + action_type = f"team_{slot}_change" + except ValueError: + return {"success": False, "error": "Invalid team identifier"} + + # Define save callback + async def apply_individual_team_save_callback(player_id, action_data): + """Apply individual team save.""" + team_type = action_data["team_type"] + team_data = action_data["team_data"] + + if team_type == "active": + # Apply to active team + changes_applied = await self._apply_to_active_team(player_id, team_data) + return { + "success": True, + "message": "Active team updated successfully", + "changes_applied": changes_applied, + "team_type": "active" + } + else: + # Save to configuration slot + slot = action_data["team_slot"] + changes_applied = await self._save_team_configuration(player_id, slot, team_data) + return { + "success": True, + "message": f"Team {slot} configuration saved successfully", + "changes_applied": changes_applied, + "team_slot": slot, + "team_type": "saved" + } + + # Verify PIN and execute save + result = await self.pin_service.verify_and_execute( + player_id=player_id, + pin_code=pin_code, + action_type=action_type, + action_callback=apply_individual_team_save_callback + ) + + return result + + except Exception as e: + return {"success": False, "error": f"Individual team save verification failed: {str(e)}"} + + async def _apply_to_active_team(self, player_id: int, team_data: Dict) -> int: + """Apply team changes to active pets.""" + changes_count = 0 + + # Deactivate all pets + await self.database.execute(""" + UPDATE pets SET is_active = FALSE, team_order = NULL + WHERE player_id = ? + """, (player_id,)) + + # Activate selected pets + for pet_id, position in team_data.items(): + if position: + await self.database.execute(""" + UPDATE pets SET is_active = TRUE, team_order = ? + WHERE id = ? AND player_id = ? + """, (position, int(pet_id), player_id)) + changes_count += 1 + + return changes_count + + async def _save_team_configuration(self, player_id: int, slot: int, team_data: Dict) -> int: + """Save team as configuration.""" + pets_list = [] + changes_count = 0 + + for pet_id, position in team_data.items(): + if position: + pet_info = await self.database.get_pet_by_id(pet_id) + if pet_info and pet_info["player_id"] == player_id: + # Create full pet data object in new format + pet_dict = { + 'id': pet_info['id'], + 'nickname': pet_info['nickname'] or pet_info.get('species_name', 'Unknown'), + 'level': pet_info['level'], + 'hp': pet_info.get('hp', 0), + 'max_hp': pet_info.get('max_hp', 0), + 'attack': pet_info.get('attack', 0), + 'defense': pet_info.get('defense', 0), + 'speed': pet_info.get('speed', 0), + 'happiness': pet_info.get('happiness', 0), + 'species_name': pet_info.get('species_name', 'Unknown'), + 'type1': pet_info.get('type1'), + 'type2': pet_info.get('type2'), + 'team_order': int(position) + } + pets_list.append(pet_dict) + changes_count += 1 + + # Save configuration in new list format + success = await self.database.save_team_configuration( + player_id, slot, f'Team {slot}', json.dumps(pets_list) + ) + + return changes_count if success else 0 \ No newline at end of file