diff --git a/src/database.py b/src/database.py index 3782c86..8a3aea9 100644 --- a/src/database.py +++ b/src/database.py @@ -1063,86 +1063,146 @@ class Database: def calculate_exp_for_level(self, level: int) -> int: """Calculate total experience needed to reach a level""" + # Input validation + if not isinstance(level, int) or level < 1: + return 0 + + # Cap at level 100 to prevent overflow + level = min(level, 100) + # Using a cubic growth formula: level^3 * 4 - 12 return max(0, (level ** 3) * 4 - 12) def calculate_level_from_exp(self, exp: int) -> int: """Calculate what level a pet should be based on experience""" + # Input validation + if not isinstance(exp, int) or exp < 0: + return 1 + level = 1 - while self.calculate_exp_for_level(level + 1) <= exp: + # Prevent infinite loop with reasonable upper bound + max_iterations = 100 + iterations = 0 + + while iterations < max_iterations and self.calculate_exp_for_level(level + 1) <= exp: level += 1 + iterations += 1 + return min(level, 100) # Cap at level 100 async def award_experience(self, pet_id: int, exp_amount: int) -> Dict: - """Award experience to a pet and handle leveling up""" + """Award experience to a pet and handle leveling up with proper transaction isolation""" + # Input validation + if not isinstance(pet_id, int) or pet_id <= 0: + return {"success": False, "error": "Invalid pet ID"} + + if not isinstance(exp_amount, int): + return {"success": False, "error": "Experience amount must be an integer"} + + if exp_amount < 0: + return {"success": False, "error": "Cannot award negative experience"} + + if exp_amount > 10000: # Reasonable cap to prevent abuse + return {"success": False, "error": "Experience amount too large"} + async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row - # Get current pet data - cursor = await db.execute(""" - SELECT p.*, ps.name as species_name, ps.base_hp, ps.base_attack, - ps.base_defense, ps.base_speed - FROM pets p - JOIN pet_species ps ON p.species_id = ps.id - WHERE p.id = ? - """, (pet_id,)) - - pet = await cursor.fetchone() - if not pet: - return {"success": False, "error": "Pet not found"} - - pet_dict = dict(pet) - old_level = pet_dict["level"] - old_exp = pet_dict["experience"] - new_exp = old_exp + exp_amount - new_level = self.calculate_level_from_exp(new_exp) - - result = { - "success": True, - "pet_id": pet_id, - "pet_name": pet_dict["nickname"] or pet_dict["species_name"], - "species_name": pet_dict["species_name"], - "old_level": old_level, - "new_level": new_level, - "old_exp": old_exp, - "new_exp": new_exp, - "exp_gained": exp_amount, - "leveled_up": new_level > old_level, - "levels_gained": new_level - old_level - } - - # Update experience and level - await db.execute(""" - UPDATE pets SET experience = ?, level = ? WHERE id = ? - """, (new_exp, new_level, pet_id)) - - # Handle level up if it occurred - if new_level > old_level: - await self._handle_level_up(db, pet_dict, new_level) - result["stat_increases"] = await self._calculate_stat_increases(pet_dict, old_level, new_level) - - await db.commit() - return result + try: + # Start immediate transaction for isolation + await db.execute("BEGIN IMMEDIATE") + + # Get current pet data with row-level lock + cursor = await db.execute(""" + SELECT p.*, ps.name as species_name, ps.base_hp, ps.base_attack, + ps.base_defense, ps.base_speed + FROM pets p + JOIN pet_species ps ON p.species_id = ps.id + WHERE p.id = ? + """, (pet_id,)) + + pet = await cursor.fetchone() + if not pet: + await db.execute("ROLLBACK") + return {"success": False, "error": "Pet not found"} + + pet_dict = dict(pet) + old_level = pet_dict["level"] + old_exp = pet_dict["experience"] + old_hp = pet_dict["hp"] + old_max_hp = pet_dict["max_hp"] + + # Calculate new experience and level + new_exp = old_exp + exp_amount + new_level = self.calculate_level_from_exp(new_exp) + + # Prepare result data + result = { + "success": True, + "pet_id": pet_id, + "pet_name": pet_dict["nickname"] or pet_dict["species_name"], + "species_name": pet_dict["species_name"], + "old_level": old_level, + "new_level": new_level, + "old_exp": old_exp, + "new_exp": new_exp, + "exp_gained": exp_amount, + "leveled_up": new_level > old_level, + "levels_gained": new_level - old_level + } + + # Single atomic update - no race conditions + if new_level > old_level: + # Pet leveled up - calculate new stats and update everything at once + new_stats = self._calculate_pet_stats(pet_dict, new_level) + + # Calculate stat increases for return data + old_stats = self._calculate_pet_stats(pet_dict, old_level) + stat_increases = { + "hp": new_stats["hp"] - old_stats["hp"], + "attack": new_stats["attack"] - old_stats["attack"], + "defense": new_stats["defense"] - old_stats["defense"], + "speed": new_stats["speed"] - old_stats["speed"] + } + result["stat_increases"] = stat_increases + + # Calculate new current HP (preserve HP percentage but ensure not below 1) + if old_max_hp > 0: + hp_percentage = old_hp / old_max_hp + new_hp = max(1, int(new_stats["hp"] * hp_percentage)) + # Give full HP bonus for leveling up + new_hp = new_stats["hp"] + else: + new_hp = new_stats["hp"] + + # Single atomic update for level up + await db.execute(""" + UPDATE pets + SET experience = ?, level = ?, max_hp = ?, hp = ?, + attack = ?, defense = ?, speed = ? + WHERE id = ? + """, ( + new_exp, new_level, new_stats["hp"], new_hp, + new_stats["attack"], new_stats["defense"], new_stats["speed"], + pet_id + )) + else: + # No level up - just update experience + await db.execute(""" + UPDATE pets SET experience = ? WHERE id = ? + """, (new_exp, pet_id)) + + # Commit the transaction + await db.commit() + return result + + except Exception as e: + # Rollback on any error + await db.execute("ROLLBACK") + print(f"Error in award_experience: {e}") + return {"success": False, "error": f"Database error: {str(e)}"} - async def _handle_level_up(self, db, pet_dict: Dict, new_level: int): - """Handle pet leveling up - recalculate stats and HP""" - # Calculate new stats based on level - new_stats = self._calculate_pet_stats(pet_dict, new_level) - - # Update pet stats and level - await db.execute(""" - UPDATE pets - SET level = ?, max_hp = ?, attack = ?, defense = ?, speed = ?, hp = ? - WHERE id = ? - """, ( - new_level, - new_stats["hp"], - new_stats["attack"], - new_stats["defense"], - new_stats["speed"], - new_stats["hp"], # Restore full HP on level up - pet_dict["id"] - )) + # NOTE: _handle_level_up function removed - now handled atomically in award_experience() def _calculate_pet_stats(self, pet_dict: Dict, level: int) -> Dict: """Calculate pet stats for a given level"""