Фильтр на права пользователя в чате

This commit is contained in:
2026-02-23 14:11:08 +07:00
parent d0cc3a458d
commit b4fd4934a9

324
bot/filters/chat_rights.py Normal file
View File

@@ -0,0 +1,324 @@
"""
Фильтры для проверки прав пользователей в чатах
"""
from typing import Any, Union
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
from aiogram.filters import BaseFilter
from aiogram.types import Message, CallbackQuery
from aiogram.enums import ChatMemberStatus
from configs import settings
from middleware.loggers import logger
__all__ = (
'IsBotOwner',
'IsChatCreator',
'IsChatAdmin',
'IsModerator',
'CanDeleteMessages',
'CanRestrictMembers',
'CanPinMessages'
)
class IsBotOwner(BaseFilter):
"""
Проверяет, является ли пользователь владельцем бота (из .env).
Attributes:
send_error_message: Отправлять ли сообщение об ошибке доступа
Example:
```python
# Без сообщения об ошибке
@router.message(Command("reset"), IsOwner())
async def reset_command(message: Message):
await message.answer("🔄 Сброс данных...")
# С сообщением об ошибке
@router.message(Command("secret"), IsOwner(send_error_message=True))
async def secret_command(message: Message):
await message.answer("🔐 Секретная команда выполнена")
```
"""
def __init__(self, send_error_message: bool = False) -> None:
"""
Args:
send_error_message: Если True, отправляет сообщение при отказе в доступе
"""
self.send_error_message = send_error_message
async def __call__(
self,
event: Union[Message, CallbackQuery],
bot: Bot
) -> Union[bool, dict[str, Any]]:
"""
Проверка владельца бота.
Returns:
bool или dict: True/dict если владелец, False иначе
"""
if not event.from_user:
return False
user_id = event.from_user.id
is_owner = user_id in settings.OWNER_ID
if not is_owner:
logger.warning(
f"Попытка доступа к команде владельца от user_id={user_id}",
log_type='SECURITY',
message=event if isinstance(event, Message) else None
)
if self.send_error_message:
error_text = "⛔ Эта команда доступна только владельцу бота!"
if isinstance(event, Message):
await event.answer(error_text)
elif isinstance(event, CallbackQuery):
await event.answer(error_text, show_alert=True)
return False
# Возвращаем информацию для handler
return {
'is_owner': True,
'user_id': user_id,
'owner_ids': settings.OWNER_ID
}
class IsChatCreator(BaseFilter):
"""
Проверяет, является ли пользователь создателем чата.
Example:
```python
@router.message(Command("transfer"), IsChatCreator())
async def transfer_ownership(message: Message):
await message.answer("👑 Передача владения чатом...")
```
"""
async def __call__(self, message: Message, bot: Bot) -> Union[bool, dict]:
try:
member = await bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
is_creator = member.status == ChatMemberStatus.CREATOR
if is_creator:
return {
'is_creator': True,
'user_id': message.from_user.id,
'chat_id': message.chat.id
}
return False
except (TelegramBadRequest, TelegramForbiddenError) as e:
logger.error(
f"Ошибка проверки создателя чата: {e}",
log_type='CHAT_RIGHTS',
message=message
)
return False
class IsChatAdmin(BaseFilter):
"""
Проверяет, является ли пользователь администратором чата (или создателем).
Example:
```python
@router.message(Command("ban"), IsChatAdmin())
async def ban_user(message: Message):
await message.answer("🔨 Бан пользователя...")
```
"""
async def __call__(self, message: Message, bot: Bot) -> Union[bool, dict]:
try:
member = await bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
is_admin = member.status in (
ChatMemberStatus.ADMINISTRATOR,
ChatMemberStatus.CREATOR
)
if is_admin:
return {
'is_admin': True,
'status': member.status.value,
'user_id': message.from_user.id,
'chat_id': message.chat.id
}
return False
except (TelegramBadRequest, TelegramForbiddenError) as e:
logger.error(
f"Ошибка проверки администратора чата: {e}",
log_type='CHAT_RIGHTS',
message=message
)
return False
class IsModerator(BaseFilter):
"""
Проверяет, имеет ли администратор модераторские права:
- Удаление сообщений
- Ограничение пользователей
- Закрепление сообщений
Example:
```python
@router.message(Command("warn"), IsModerator())
async def warn_user(message: Message):
await message.answer("⚠️ Предупреждение пользователю...")
```
"""
async def __call__(self, message: Message, bot: Bot) -> Union[bool, dict]:
try:
member = await bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
# Создатель всегда модератор
if member.status == ChatMemberStatus.CREATOR:
return {
'is_moderator': True,
'status': 'creator',
'user_id': message.from_user.id
}
# Проверка прав администратора
if member.status != ChatMemberStatus.ADMINISTRATOR:
return False
# Проверка модераторских прав
required_rights = [
getattr(member, 'can_delete_messages', False),
getattr(member, 'can_restrict_members', False),
getattr(member, 'can_pin_messages', False),
]
has_all_rights = all(required_rights)
if has_all_rights:
return {
'is_moderator': True,
'status': 'administrator',
'can_delete': required_rights[0],
'can_restrict': required_rights[1],
'can_pin': required_rights[2],
'user_id': message.from_user.id
}
return False
except (TelegramBadRequest, TelegramForbiddenError) as e:
logger.error(
f"Ошибка проверки модератора: {e}",
log_type='CHAT_RIGHTS',
message=message
)
return False
class CanDeleteMessages(BaseFilter):
"""
Проверяет право на удаление сообщений.
Example:
```python
@router.message(Command("clear"), CanDeleteMessages())
async def clear_messages(message: Message):
await message.answer("🗑️ Очистка сообщений...")
```
"""
async def __call__(self, message: Message, bot: Bot) -> bool:
try:
member = await bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
if member.status == ChatMemberStatus.CREATOR:
return True
return getattr(member, 'can_delete_messages', False)
except (TelegramBadRequest, TelegramForbiddenError):
return False
class CanRestrictMembers(BaseFilter):
"""
Проверяет право на ограничение пользователей (бан, мут).
Example:
```python
@router.message(Command("mute"), CanRestrictMembers())
async def mute_user(message: Message):
await message.answer("🔇 Мут пользователя...")
```
"""
async def __call__(self, message: Message, bot: Bot) -> bool:
try:
member = await bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
if member.status == ChatMemberStatus.CREATOR:
return True
return getattr(member, 'can_restrict_members', False)
except (TelegramBadRequest, TelegramForbiddenError):
return False
class CanPinMessages(BaseFilter):
"""
Проверяет право на закрепление сообщений.
Example:
```python
@router.message(Command("pin"), CanPinMessages())
async def pin_message(message: Message):
if message.reply_to_message:
await message.reply_to_message.pin()
```
"""
async def __call__(self, message: Message, bot: Bot) -> bool:
try:
member = await bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
if member.status == ChatMemberStatus.CREATOR:
return True
return getattr(member, 'can_pin_messages', False)
except (TelegramBadRequest, TelegramForbiddenError):
return False