Files
balance_bot/bot/handlers/custom/econom.py
2026-01-23 04:45:55 +07:00

286 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# modules/economy.py
import json
from pathlib import Path
from typing import Dict, Optional, Tuple, List
import aiofiles
from aiogram import Router
from aiogram.filters import Command, CommandObject
from aiogram.types import Message, User
from aiogram.utils.markdown import hbold
from bot.filters import IsOwner
# ==================== Конфигурация ====================
ECONOMY_FILE = Path("data/economy.json")
ECONOMY_FILE.parent.mkdir(parents=True, exist_ok=True)
CURRENCY_NAME = "коинов"
# ==================== Хранилище ====================
class Economy:
def __init__(self):
self.data: Dict[int, dict] = {} # user_id → {balance, username, full_name}
self.username_to_id: Dict[str, int] = {} # username.lower() → user_id
async def load(self):
if not ECONOMY_FILE.exists():
return
try:
async with aiofiles.open(ECONOMY_FILE, "r", encoding="utf-8") as f:
content = await f.read()
if not content.strip():
return
raw = json.loads(content)
self.data = {int(uid): info for uid, info in raw.items()}
self.username_to_id = {}
for uid, info in self.data.items():
username = info.get("username")
if username:
self.username_to_id[username.lower()] = uid
except Exception as e:
print(f"[Economy] Load error: {e}")
async def save(self):
try:
async with aiofiles.open(ECONOMY_FILE, "w", encoding="utf-8") as f:
await f.write(json.dumps(self.data, indent=2, ensure_ascii=False))
except Exception as e:
print(f"[Economy] Save error: {e}")
async def ensure_user(self, user_id: int, username: Optional[str] = None, full_name: Optional[str] = None):
"""Создаёт пользователя с 0 балансом, если его нет"""
if user_id not in self.data:
self.data[user_id] = {
"balance": 50,
"username": username,
"full_name": full_name or "Unknown User"
}
if username:
self.username_to_id[username.lower()] = user_id
await self.save()
# Обновляем данные, если изменились
updated = False
if username and self.data[user_id]["username"] != username:
old = self.data[user_id]["username"]
if old and old.lower() in self.username_to_id:
del self.username_to_id[old.lower()]
self.data[user_id]["username"] = username
self.username_to_id[username.lower()] = user_id
updated = True
if full_name and self.data[user_id]["full_name"] != full_name:
self.data[user_id]["full_name"] = full_name
updated = True
if updated:
await self.save()
async def get_balance(self, user: User) -> int:
await self.ensure_user(user.id, user.username, user.full_name)
return self.data[user.id]["balance"]
async def modify_balance(self, user_id: int, delta: int, username: Optional[str] = None, full_name: Optional[str] = None) -> int:
await self.ensure_user(user_id, username, full_name)
self.data[user_id]["balance"] += delta
await self.save()
return self.data[user_id]["balance"]
async def set_balance(self, user_id: int, amount: int, username: Optional[str] = None, full_name: Optional[str] = None):
await self.ensure_user(user_id, username, full_name)
self.data[user_id]["balance"] = amount
await self.save()
async def delete_user(self, user_id: int) -> bool:
if user_id in self.data:
username = self.data[user_id].get("username")
if username and username.lower() in self.username_to_id:
del self.username_to_id[username.lower()]
del self.data[user_id]
await self.save()
return True
return False
def resolve_id(self, username: str) -> Optional[int]:
return self.username_to_id.get(username.removeprefix("@").lower())
def get_top(self, limit: int = 20) -> List[Tuple[int, int, str, str]]:
"""Топ только с положительным балансом"""
items = []
for uid, info in self.data.items():
bal = info["balance"]
if bal <= -1000:
continue # ← НЕ ПОКАЗЫВАЕМ НУЛЕВЫЕ БАЛАНСЫ
items.append((
uid,
bal,
info.get("username") or "",
info.get("full_name") or f"User#{uid}"
))
return sorted(items, key=lambda x: x[1], reverse=True)[:limit]
economy = Economy()
router = Router(name="economy")
# ==================== Утилиты ====================
def fmt(num: int) -> str:
return f"{num:,}".replace(",", " ")
def user_mention(user: Optional[User] = None, username: str = "", full_name: str = "") -> str:
if user:
if user.username:
return f"@{user.username}"
return hbold(user.full_name or "Unknown")
if username:
return f"@{username}"
return hbold(full_name or "Unknown User")
# ==================== Функция для регистрации при любом сообщении ====================
async def register_user_on_message(message: Message):
"""Вызывай эту функцию в глобальном обработчике сообщений"""
if message.from_user:
await economy.ensure_user(
user_id=message.from_user.id,
username=message.from_user.username,
full_name=message.from_user.full_name
)
# ==================== Команды ====================
@router.message(Command("balance"))
async def cmd_balance(message: Message, command: CommandObject):
target = message.from_user
if message.reply_to_message and message.reply_to_message.from_user:
target = message.reply_to_message.from_user
elif command.args:
uid = economy.resolve_id(command.args.strip())
if uid:
info = economy.data[uid]
name = user_mention(username=info.get("username"), full_name=info.get("full_name"))
await message.answer(f"Баланс {name}: {hbold(fmt(info['balance']))} {CURRENCY_NAME}")
return
bal = await economy.get_balance(target)
await message.answer(f"Баланс {user_mention(target)}: {hbold(fmt(bal))} {CURRENCY_NAME}")
async def _get_target(message: Message, arg: Optional[str] = None):
if message.reply_to_message and message.reply_to_message.from_user:
return message.reply_to_message.from_user, None, None
if arg:
username_raw = arg.strip().removeprefix("@")
uid = economy.resolve_id("@" + username_raw) or economy.resolve_id(username_raw)
return None, uid, username_raw
return message.from_user, None, None
@router.message(Command("setbalance"), IsOwner(send_error_message=True))
async def cmd_setbalance(message: Message, command: CommandObject):
if not command.args:
return await message.answer("Использование: /setbalance <сумма> [@username | реплай]")
parts = command.args.strip().split(maxsplit=2)
try:
amount = int(parts[0])
except ValueError:
return await message.answer("Сумма должна быть числом.")
user_obj, uid, username = await _get_target(message, parts[1] if len(parts) > 1 else None)
target_id = user_obj.id if user_obj else uid
if not target_id:
return await message.answer("Пользователь не найден.")
await economy.set_balance(target_id, amount, username, user_obj.full_name if user_obj else None)
await message.answer(f"Баланс {user_mention(user_obj, username)}{hbold(fmt(amount))} {CURRENCY_NAME}")
return None
@router.message(Command("plusbalance"), IsOwner(send_error_message=True))
async def cmd_plusbalance(message: Message, command: CommandObject):
if not command.args:
return await message.answer("Использование: /plusbalance <сумма> [@username | реплай]")
parts = command.args.strip().split(maxsplit=2)
try:
delta = int(parts[0])
except ValueError:
return await message.answer("Сумма должна быть числом.")
user_obj, uid, username = await _get_target(message, parts[1] if len(parts) > 1 else None)
target_id = user_obj.id if user_obj else uid
if not target_id:
return await message.answer("Пользователь не найден.")
new_bal = await economy.modify_balance(target_id, delta, username, user_obj.full_name if user_obj else None)
await message.answer(f"{user_mention(user_obj, username)} +{fmt(delta)}{hbold(fmt(new_bal))} {CURRENCY_NAME}")
return None
@router.message(Command("minbalance"), IsOwner(send_error_message=True))
async def cmd_minbalance(message: Message, command: CommandObject):
if not command.args:
return await message.answer("Использование: /minbalance <сумма> [@username | реплай]")
parts = command.args.strip().split(maxsplit=2)
try:
delta = int(parts[0])
except ValueError:
return await message.answer("Сумма должна быть числом.")
user_obj, uid, username = await _get_target(message, parts[1] if len(parts) > 1 else None)
target_id = user_obj.id if user_obj else uid
if not target_id:
return await message.answer("Пользователь не найден.")
new_bal = await economy.modify_balance(target_id, -delta, username, user_obj.full_name if user_obj else None)
await message.answer(f"{user_mention(user_obj, username)} -{fmt(delta)}{hbold(fmt(new_bal))} {CURRENCY_NAME}")
return None
@router.message(Command("top"))
async def cmd_top(message: Message):
top = economy.get_top(20)
if not top:
return await message.answer("Топ пустой — никто ещё не заработал коины!")
lines = ["Топ-20 богачей:"]
for i, (_, bal, username, full_name) in enumerate(top, 1):
medal = ["1st", "2nd", "3rd"][i-1] if i <= 3 else f"{i}."
name = f"@{username}" if username else hbold(full_name)
lines.append(f"{medal} {name}{hbold(fmt(bal))} {CURRENCY_NAME}")
await message.answer("\n".join(lines))
return None
@router.message(Command("deletebalance"), IsOwner(send_error_message=True))
async def cmd_deletebalance(message: Message):
user_obj, uid, username = await _get_target(message)
if not (user_obj or uid):
return await message.answer("Укажи пользователя реплаем или @username")
target_id = user_obj.id if user_obj else uid
deleted = await economy.delete_user(target_id)
name = user_mention(user_obj, username)
await message.answer(f"Запись {name} {'удалена' if deleted else 'не существовала'}")
return None
# ==================== Запуск ====================
async def on_startup(_):
await economy.load()
__all__ = ["router", "on_startup", "register_user_on_message"]