|
|
from telethon import TelegramClient, events
|
|
|
from telethon import types
|
|
|
|
|
|
from datetime import timedelta
|
|
|
|
|
|
from PIL import Image
|
|
|
import io
|
|
|
import pytesseract
|
|
|
|
|
|
from db import DBAction, db_action, create_table
|
|
|
|
|
|
from df_utils import detect_intent_text
|
|
|
|
|
|
import random
|
|
|
import asyncio
|
|
|
|
|
|
import os
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
languages = ['rus', 'eng']
|
|
|
|
|
|
bot = TelegramClient(os.environ['SESSION_NAME'], int(os.environ['API_ID']),
|
|
|
os.environ['API_HASH']).start(bot_token=os.environ['BOT_TOKEN'])
|
|
|
|
|
|
|
|
|
@bot.on(events.NewMessage(pattern='/help'))
|
|
|
async def on_help(msg):
|
|
|
"""Send detailed help message"""
|
|
|
await msg.reply('Дотупные команды: \n'
|
|
|
'/help - помощь\n'
|
|
|
'/bs [add/rm] <id стикерапка> - запретить, разрешить стикерпак в группе, можно написать в ответ на стикер из нужного стикерпака\n'
|
|
|
'/exc [add/rm] [id пользователя/@упомянуть пользователя/"me"] - сделать пользователя неприкасаемым для бота, можно написать в ответ на любое сообщение нужного пользователя\n'
|
|
|
'/report в ответ на сообщение, отправляет отчет об ошибке распознавания запрещенных слов разработчику, если отправил администратор, размучивает пользователя')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
|
|
|
@bot.on(events.NewMessage())
|
|
|
async def check_dm(msg):
|
|
|
if msg.is_private:
|
|
|
await msg.reply('Привет!, я не работаю в личных сообщениях. Добавь меня в группу и мы будем избавляться от вагонов вместе!')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
|
|
|
async def check_admin(msg):
|
|
|
"""Checks if a user is an admin in the chat"""
|
|
|
return any([user.id == msg.sender_id async for user in bot.iter_participants(msg.chat_id,
|
|
|
filter=types.ChannelParticipantsAdmins)])
|
|
|
|
|
|
|
|
|
@bot.on(events.NewMessage(pattern='/report'))
|
|
|
async def on_report(msg):
|
|
|
"""Send detection error report"""
|
|
|
if msg.is_reply:
|
|
|
reply_message = await msg.get_reply_message()
|
|
|
await bot.edit_permissions(msg.chat_id, reply_message.sender_id, send_messages=True)
|
|
|
await bot.send_message(msg.chat_id, 'пользователь размучен.')
|
|
|
else:
|
|
|
await msg.reply('Ответь командой на сообщение, которое было неправильно распознано, чтобы отправить отчет об ошибке и размутить пользователя')
|
|
|
|
|
|
|
|
|
async def check_user_in_exceptions(user_id, chat_id):
|
|
|
"""Checks if a user is in the exceptions list"""
|
|
|
return await db_action('SELECT id FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, chat_id),
|
|
|
DBAction.fetchone) is not None
|
|
|
|
|
|
|
|
|
@bot.on(events.NewMessage(pattern='/exc'))
|
|
|
async def manage_exceptions(msg):
|
|
|
"""Manage users, that will be ignored by the bot"""
|
|
|
if not await check_admin(msg):
|
|
|
await msg.reply('Добавлять пользователей в исключения может только админ')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
args = msg.message.text.split(' ')
|
|
|
|
|
|
if msg.is_reply and len(args) > 1:
|
|
|
reply_message = await msg.get_reply_message()
|
|
|
user_id = reply_message.sender_id
|
|
|
elif len(args) > 2:
|
|
|
if args[2] == 'me':
|
|
|
user_id = msg.sender_id
|
|
|
elif args[2].isdigit():
|
|
|
try:
|
|
|
await bot.get_entity(args[2])
|
|
|
except ValueError:
|
|
|
await msg.reply(f'Пользователь с id {args[2]} не найден')
|
|
|
raise events.StopPropagation
|
|
|
user_id = args[2]
|
|
|
elif '@' in args[2]:
|
|
|
try:
|
|
|
user = await bot.get_entity(args[2])
|
|
|
user_id = user.id
|
|
|
except ValueError:
|
|
|
await msg.reply(f'Пользователь с ником {args[2]} не найден')
|
|
|
raise events.StopPropagation
|
|
|
else:
|
|
|
await msg.reply('Неверный синтаксис. Возможный вариант: /exception [add/rm] [user_id/username/"me"]')
|
|
|
raise events.StopPropagation
|
|
|
else:
|
|
|
await msg.reply('Неверный синтаксисю Вы можете использовать: /exception [add/rm] [user_id/username/"me"], или ответить на сообщение пользователя, которого хотите добавить в исключения')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
match args[1]:
|
|
|
case 'add':
|
|
|
if await check_user_in_exceptions(user_id, msg.chat_id):
|
|
|
await msg.reply('Пользователь уже есть в списке исключений')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
await db_action('INSERT INTO user_exceptions (user_id, chat_id) VALUES (?, ?)', (user_id, msg.chat_id,),
|
|
|
DBAction.commit)
|
|
|
await msg.reply('Пользователь добавлен в исключения, теперь его сообщения не будут проверяться ботом')
|
|
|
case 'rm':
|
|
|
if not await check_user_in_exceptions(user_id, msg.chat_id):
|
|
|
await msg.reply('Пользователя пока нет в исключениях')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
await db_action('DELETE FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, msg.chat_id,),
|
|
|
DBAction.commit)
|
|
|
await msg.reply('Пользователь удален из списка исключений')
|
|
|
case _:
|
|
|
await msg.reply(f'Неизвестный аргумент {args[1]}, возможные варианты: add, rm')
|
|
|
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
|
|
|
async def check_stickerpack_in_banlist(stickerpack_id, chat_id):
|
|
|
"""Checks if a stickerpack is in the banlist"""
|
|
|
return await db_action('SELECT id FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?',
|
|
|
(stickerpack_id, chat_id), DBAction.fetchone) is not None
|
|
|
|
|
|
|
|
|
@bot.on(events.NewMessage(pattern='/bs'))
|
|
|
async def manage_stickerpack(msg):
|
|
|
"""Add or remove stickerpack from banlist"""
|
|
|
if not await check_admin(msg):
|
|
|
await msg.reply('Добавлять стикеры в банлист может только админ')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
args = msg.text.split()
|
|
|
|
|
|
if len(args) > 2:
|
|
|
if not args[2].isdigit():
|
|
|
await msg.reply('Возможно, вы ввели неверный id стикерпака или ввели не id стикерпака')
|
|
|
raise events.StopPropagation
|
|
|
stickerpack_id = args[2]
|
|
|
elif msg.is_reply and len(args) > 1:
|
|
|
reply_message = await msg.get_reply_message()
|
|
|
if not reply_message.sticker:
|
|
|
await msg.reply('Ответьте сообщением с командой именно на стикер, не на медиафайл или текст')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
for attr in reply_message.media.document.attributes:
|
|
|
if isinstance(attr, types.DocumentAttributeSticker):
|
|
|
stickerpack_id = attr.stickerset.id
|
|
|
break
|
|
|
else:
|
|
|
await msg.reply('Неверный синтаксис. Вы можете использовать: /bs [add/rm] [stickerpack_id] или ответить сообщением с командой на стикер')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
match args[1]:
|
|
|
case 'add':
|
|
|
if await check_stickerpack_in_banlist(stickerpack_id, msg.chat_id):
|
|
|
await msg.reply('Набор стикеров уже в банлисте')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
await db_action("INSERT INTO banned_stickerpacks (pack_id, chat_id) VALUES (?, ?)",
|
|
|
(stickerpack_id, msg.chat_id,), DBAction.commit)
|
|
|
await msg.reply(f"Стикерпак с id {stickerpack_id} добавлен в банлист")
|
|
|
case 'rm':
|
|
|
if not await check_stickerpack_in_banlist(stickerpack_id, msg.chat_id):
|
|
|
await msg.reply('Набор стикеров не найден в банлисте')
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
await db_action("DELETE FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?",
|
|
|
(stickerpack_id, msg.chat_id,), DBAction.commit)
|
|
|
await msg.reply(f"Стикерпак с id {stickerpack_id} удален из банлиста")
|
|
|
case _:
|
|
|
await msg.reply(f'Неизвестный аргумент {args[1]}, возможные варианты: add, rm')
|
|
|
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
|
|
|
async def on_mute(msg):
|
|
|
if random.randint(0, 6) == 1:
|
|
|
await msg.respond(
|
|
|
'Бот работает в режиме обучения, напишите /report в ответ на сообщение, если оно не было распознано или распознано по ошибке')
|
|
|
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=10), send_messages=False)
|
|
|
|
|
|
|
|
|
@bot.on(events.NewMessage())
|
|
|
async def process_message(msg):
|
|
|
"""if await check_admin(msg) or await check_user_in_exceptions(msg.sender_id, msg.chat_id):
|
|
|
raise events.StopPropagation"""
|
|
|
|
|
|
if msg.text:
|
|
|
for lang in languages:
|
|
|
resp = await detect_intent_text(os.environ['GOOGLE_PROJECT_ID'], msg.chat_id, msg.text, lang)
|
|
|
if resp:
|
|
|
await msg.reply(random.choice(['Харе уже свои вагоны слать',
|
|
|
'Вагончиками балуемся?',
|
|
|
'Вагоны тут не приветствуются',
|
|
|
'Надоел уже со своими вагонами']))
|
|
|
await on_mute(msg)
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
if msg.sticker:
|
|
|
banstickerpacks = [stickerpack[0] for stickerpack in await db_action('SELECT pack_id FROM banned_stickerpacks WHERE chat_id = ?',
|
|
|
(msg.chat_id,), DBAction.fetchall)]
|
|
|
if any([x.stickerset.id in banstickerpacks for x in msg.sticker.attributes if isinstance(x, types.DocumentAttributeSticker)]):
|
|
|
await msg.reply(random.choice(['Ай-ай-ай, стикеры с вагончиками, не хорошо',
|
|
|
'Стикерами с вагонами балуемся? Негоже так делать',
|
|
|
'Надоел уже со своими вагонами',
|
|
|
'Щас высажу с поезда, будете тут про номерные свои кричать!']))
|
|
|
await on_mute(msg)
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
if msg.photo:
|
|
|
for lang in languages:
|
|
|
image_text = pytesseract.image_to_string(Image.open(io.BytesIO(await bot.download_media(msg.photo, file=bytes))), lang=lang)
|
|
|
resp = await detect_intent_text(os.environ['GOOGLE_PROJECT_ID'], msg.chat_id, image_text, lang)
|
|
|
if resp:
|
|
|
await msg.reply(random.choice(['Картинки с текстом про вагончики тоже нельзя 🙃',
|
|
|
'Мемами с вагонами балуемся? Не надо так делать',
|
|
|
'Вагоны нельзя ни в каком виде, даже в в мемах']))
|
|
|
await on_mute(msg)
|
|
|
raise events.StopPropagation
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
await create_table()
|
|
|
|
|
|
await bot.run_until_disconnected()
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
asyncio.get_event_loop().run_until_complete(main())
|