You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

239 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from telethon import TelegramClient, events
from telethon import types
from datetime import timedelta
from PIL import Image
import io
import pytesseract
from db import DBAction, db_action, create_table
from df_utils import detect_intent_text
import random
import asyncio
import os
from dotenv import load_dotenv
load_dotenv()
languages = ['rus', 'eng']
bot = TelegramClient(os.environ['SESSION_NAME'], int(os.environ['API_ID']),
os.environ['API_HASH']).start(bot_token=os.environ['BOT_TOKEN'])
@bot.on(events.NewMessage(pattern='/help'))
async def on_help(msg):
"""Send detailed help message"""
await msg.reply('Дотупные команды: \n'
'/help - помощь\n'
'/bs [add/rm] <id стикерапка> - запретить, разрешить стикерпак в группе, можно написать в ответ на стикер из нужного стикерпака\n'
'/exc [add/rm] [id пользователя/@упомянуть пользователя/"me"] - сделать пользователя неприкасаемым для бота, можно написать в ответ на любое сообщение нужного пользователя\n'
'/report в ответ на сообщение, отправляет отчет об ошибке распознавания запрещенных слов разработчику, если отправил администратор, размучивает пользователя')
raise events.StopPropagation
@bot.on(events.NewMessage())
async def check_dm(msg):
if msg.is_private:
await msg.reply('Привет!, я не работаю в личных сообщениях. Добавь меня в группу и мы будем избавляться от вагонов вместе!')
raise events.StopPropagation
async def check_admin(msg):
"""Checks if a user is an admin in the chat"""
return any([user.id == msg.sender_id async for user in bot.iter_participants(msg.chat_id,
filter=types.ChannelParticipantsAdmins)])
@bot.on(events.NewMessage(pattern='/report'))
async def on_report(msg):
"""Send detection error report"""
if msg.is_reply:
reply_message = await msg.get_reply_message()
await bot.edit_permissions(msg.chat_id, reply_message.sender_id, send_messages=True)
await bot.send_message(msg.chat_id, 'пользователь размучен.')
else:
await msg.reply('Ответь командой на сообщение, которое было неправильно распознано, чтобы отправить отчет об ошибке и размутить пользователя')
async def check_user_in_exceptions(user_id, chat_id):
"""Checks if a user is in the exceptions list"""
return await db_action('SELECT id FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, chat_id),
DBAction.fetchone) is not None
@bot.on(events.NewMessage(pattern='/exc'))
async def manage_exceptions(msg):
"""Manage users, that will be ignored by the bot"""
if not await check_admin(msg):
await msg.reply('Добавлять пользователей в исключения может только админ')
raise events.StopPropagation
args = msg.message.text.split(' ')
if msg.is_reply and len(args) > 1:
reply_message = await msg.get_reply_message()
user_id = reply_message.sender_id
elif len(args) > 2:
if args[2] == 'me':
user_id = msg.sender_id
elif args[2].isdigit():
try:
await bot.get_entity(args[2])
except ValueError:
await msg.reply(f'Пользователь с id {args[2]} не найден')
raise events.StopPropagation
user_id = args[2]
elif '@' in args[2]:
try:
user = await bot.get_entity(args[2])
user_id = user.id
except ValueError:
await msg.reply(f'Пользователь с ником {args[2]} не найден')
raise events.StopPropagation
else:
await msg.reply('Неверный синтаксис. Возможный вариант: /exception [add/rm] [user_id/username/"me"]')
raise events.StopPropagation
else:
await msg.reply('Неверный синтаксисю Вы можете использовать: /exception [add/rm] [user_id/username/"me"], или ответить на сообщение пользователя, которого хотите добавить в исключения')
raise events.StopPropagation
match args[1]:
case 'add':
if await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователь уже есть в списке исключений')
raise events.StopPropagation
await db_action('INSERT INTO user_exceptions (user_id, chat_id) VALUES (?, ?)', (user_id, msg.chat_id,),
DBAction.commit)
await msg.reply('Пользователь добавлен в исключения, теперь его сообщения не будут проверяться ботом')
case 'rm':
if not await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователя пока нет в исключениях')
raise events.StopPropagation
await db_action('DELETE FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, msg.chat_id,),
DBAction.commit)
await msg.reply('Пользователь удален из списка исключений')
case _:
await msg.reply(f'Неизвестный аргумент {args[1]}, возможные варианты: add, rm')
raise events.StopPropagation
async def check_stickerpack_in_banlist(stickerpack_id, chat_id):
"""Checks if a stickerpack is in the banlist"""
return await db_action('SELECT id FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?',
(stickerpack_id, chat_id), DBAction.fetchone) is not None
@bot.on(events.NewMessage(pattern='/bs'))
async def manage_stickerpack(msg):
"""Add or remove stickerpack from banlist"""
if not await check_admin(msg):
await msg.reply('Добавлять стикеры в банлист может только админ')
raise events.StopPropagation
args = msg.text.split()
if len(args) > 2:
if not args[2].isdigit():
await msg.reply('Возможно, вы ввели неверный id стикерпака или ввели не id стикерпака')
raise events.StopPropagation
stickerpack_id = args[2]
elif msg.is_reply and len(args) > 1:
reply_message = await msg.get_reply_message()
if not reply_message.sticker:
await msg.reply('Ответьте сообщением с командой именно на стикер, не на медиафайл или текст')
raise events.StopPropagation
for attr in reply_message.media.document.attributes:
if isinstance(attr, types.DocumentAttributeSticker):
stickerpack_id = attr.stickerset.id
break
else:
await msg.reply('Неверный синтаксис. Вы можете использовать: /bs [add/rm] [stickerpack_id] или ответить сообщением с командой на стикер')
raise events.StopPropagation
match args[1]:
case 'add':
if await check_stickerpack_in_banlist(stickerpack_id, msg.chat_id):
await msg.reply('Набор стикеров уже в банлисте')
raise events.StopPropagation
await db_action("INSERT INTO banned_stickerpacks (pack_id, chat_id) VALUES (?, ?)",
(stickerpack_id, msg.chat_id,), DBAction.commit)
await msg.reply(f"Стикерпак с id {stickerpack_id} добавлен в банлист")
case 'rm':
if not await check_stickerpack_in_banlist(stickerpack_id, msg.chat_id):
await msg.reply('Набор стикеров не найден в банлисте')
raise events.StopPropagation
await db_action("DELETE FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?",
(stickerpack_id, msg.chat_id,), DBAction.commit)
await msg.reply(f"Стикерпак с id {stickerpack_id} удален из банлиста")
case _:
await msg.reply(f'Неизвестный аргумент {args[1]}, возможные варианты: add, rm')
raise events.StopPropagation
async def on_mute(msg):
if random.randint(0, 6) == 1:
await msg.respond(
'Бот работает в режиме обучения, напишите /report в ответ на сообщение, если оно не было распознано или распознано по ошибке')
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=10), send_messages=False)
@bot.on(events.NewMessage())
async def process_message(msg):
"""if await check_admin(msg) or await check_user_in_exceptions(msg.sender_id, msg.chat_id):
raise events.StopPropagation"""
if msg.text:
for lang in languages:
resp = await detect_intent_text(os.environ['GOOGLE_PROJECT_ID'], msg.chat_id, msg.text, lang)
if resp:
await msg.reply(random.choice(['Харе уже свои вагоны слать',
'Вагончиками балуемся?',
'Вагоны тут не приветствуются',
'Надоел уже со своими вагонами']))
await on_mute(msg)
raise events.StopPropagation
if msg.sticker:
banstickerpacks = [stickerpack[0] for stickerpack in await db_action('SELECT pack_id FROM banned_stickerpacks WHERE chat_id = ?',
(msg.chat_id,), DBAction.fetchall)]
if any([x.stickerset.id in banstickerpacks for x in msg.sticker.attributes if isinstance(x, types.DocumentAttributeSticker)]):
await msg.reply(random.choice(['Ай-ай-ай, стикеры с вагончиками, не хорошо',
'Стикерами с вагонами балуемся? Негоже так делать',
'Надоел уже со своими вагонами',
'Щас высажу с поезда, будете тут про номерные свои кричать!']))
await on_mute(msg)
raise events.StopPropagation
if msg.photo:
for lang in languages:
image_text = pytesseract.image_to_string(Image.open(io.BytesIO(await bot.download_media(msg.photo, file=bytes))), lang=lang)
resp = await detect_intent_text(os.environ['GOOGLE_PROJECT_ID'], msg.chat_id, image_text, lang)
if resp:
await msg.reply(random.choice(['Картинки с текстом про вагончики тоже нельзя 🙃',
'Мемами с вагонами балуемся? Не надо так делать',
'Вагоны нельзя ни в каком виде, даже в в мемах']))
await on_mute(msg)
raise events.StopPropagation
async def main():
await create_table()
await bot.run_until_disconnected()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())