Форум сайта python.su
Доброго времени суток, пытаюсь реализовать следующий алгоритм:
1. ф-я get_events - Получает новые данные из Ms SQL (2014) раз в n секунд, сортирует их по параметрам и записывает в очередь
2. ф-и process_email_queue() и process_telegram_queue() постоянно обрабатывают очереди для отправки данных
Вопросы :
1. Данные дублируются (отправляются одни и те же события), не могу додуматься как решить эту проблему
2. Как можно оптимизировать\упростить код
import aioodbc import datetime import time import asyncio import logging from logging.handlers import TimedRotatingFileHandler import re import aiogram from aiogram import Bot, Dispatcher, types from aiogram.utils import executor import traceback import smtplib from email.message import EmailMessage from email.mime.text import MIMEText from email_validator import validate_email, EmailNotValidError TOKEN = '...' bot = Bot(token=TOKEN) dp = Dispatcher(bot) # Queues telegram_queue = asyncio.Queue() email_queue = asyncio.Queue() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Create loggers for different log types logger_telegram = logging.getLogger('telegram') logger_system = logging.getLogger('system') logger_email = logging.getLogger('email') # Configure TimedRotatingFileHandler for each log type file_handler_telegram = logging.handlers.TimedRotatingFileHandler('logs/telegram.txt', when='midnight', interval=1) file_handler_system = logging.handlers.TimedRotatingFileHandler('logs/system.txt', when='midnight', interval=1) file_handler_email = logging.handlers.TimedRotatingFileHandler('logs/email.txt', when='midnight', interval=1) # Set the log format log_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # Configure log levels and handlers for each logger logger_telegram.setLevel(logging.DEBUG) logger_telegram.addHandler(file_handler_telegram) file_handler_telegram.setFormatter(log_format) logger_system.setLevel(logging.DEBUG) logger_system.addHandler(file_handler_system) file_handler_system.setFormatter(log_format) logger_email.setLevel(logging.DEBUG) logger_email.addHandler(file_handler_system) file_handler_email.setFormatter(log_format) # Example usage logger_telegram.info('Start Telegram log message') logger_system.info('Start System log message') logger_email.info('Start Email log message') # Connecting to database dsn = 'DRIVER={ODBC Driver 17 for SQL Server};' \ 'SERVER=192.168.1.150;' \ 'DATABASE=davp;' \ 'UID=...;' \ 'PWD=...' @dp.message_handler(commands=['start']) async def send_welcome(message: types.Message): user_id = message.from_user.id user_firstname = message.from_user.first_name user_lastname = message.from_user.last_name user_login = message.from_user.username print(f"Command '/start' received from Name: {user_firstname}, Surname: {user_lastname}, Login: {user_login}, ID: {user_id}") await message.answer(f"User ID: {user_id}") @dp.message_handler(commands=['chatid']) async def send_chatid(message: types.Message): current_chat_id = message.chat.id await message.answer(f'Current chat ID: {current_chat_id}') # Telegram Sending function async def send_telegram(result): new_dt = result[4].strftime("%d.%m.%Y %H:%M:%S") g_nr = result[2] receivers = result[8] # Add Emoji to reactions reaction_type = str(result[11]) match reaction_type: case "1": # Alarm emoji_text = "🚨" case "2": #Panic emoji_text = "‼" case "3": # Fire emoji_text = "🧯" case "4": # Restore emoji_text = "✅" case "5": # Close emoji_text = "🔐" case "6": # Open emoji_text = "🔓" case "7": # Low Power emoji_text = "⚠️" case "9": # Power Fault emoji_text = "⚠️" case "17": # Late open emoji_text = "🔓" case "18": # Trouble emoji_text = "⚠️" case "20": # Alarm emoji_text = "🚨" case _: # None emoji_text = "" tgID = receivers.replace("t", "") message = f'[{new_dt}] [Nr]: {result[9]} [Event]: {emoji_text} {result[0]} [Rajons]: {g_nr}' try: #await bot.send_message(tgID, message) #print('Telegram Success') logger_telegram.info(f'[INFO]: Telegram sent: User: {result[6]}, ID: {receivers}, Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}') except Exception as e: logger_telegram.error(f'Telegram Error {e}') # E-Mail Sending function async def send_email(result): new_dt = result[4].strftime("%d.%m.%Y %H:%M:%S") g_nr = result[2] receivers = result[8] try: valid_email = validate_email(receivers) # Constructing email body subject = f"User: {receivers} Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}" message_body = "Message Sender" msg = MIMEText(message_body, 'plain', 'utf-8') msg['From'] = 'TEST <test@mymail.com>' msg['To'] = 'testinbox@mymail.com' msg['Subject'] = subject smtpObj = smtplib.SMTP('smtp-relay.gmail.com', 25) smtpObj.sendmail('test@mymail.com', ['testinbox@mymail.com'], msg.as_string()) smtpObj.quit() print('E-Mail Success!') print(f"[INFO]: Mail sent: {receivers}, Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}") except EmailNotValidError as e: print(f'[ERROR]: Can not send message due to an error [Nr]: {result[3]}, data: {receivers}, converted {tgID}') print(str(e)) except Exception as e: print(f'[ERROR]: E-Mail - {e}') async def process_telegram_queue(): while True: result = await telegram_queue.get() if result: await send_telegram(result) telegram_queue.task_done() await asyncio.sleep(2) async def process_email_queue(): while True: results = [] # Collect multiple results from the queue for _ in range(email_queue.qsize()): result = await email_queue.get() if result: results.append(result) email_queue.task_done() # Process each result for result in results: await send_email(result) await asyncio.sleep(2) async def get_events(): last_execution_time = datetime.datetime.now() last_event_id = 0 #receivers = result[8] #result = None # Initialize result outside the loop #receivers = None # Initialize receivers outside the loop while True: try: async with aioodbc.connect(dsn=dsn) as connection: async with connection.cursor() as cursor: query = """ SELECT DISTINCT e.eventname, e.unifier_id, e.groupnr, e.objectdbid, e.receivedtime, d.SendMail, cp.name, cp.objectid, cp.email, ou.unified_id, e.id, e.reactionid FROM (SELECT * FROM davp.dbo.events UNION ALL SELECT * FROM davp_archive.dbo.events) e JOIN davp.dbo.eventsdescription d ON e.eventdescid = d.Id JOIN davp.dbo.object_unifiers ou ON ou.id = e.unifier_id JOIN davp.dbo.corresppersons cp ON e.objectdbid = cp.objectid WHERE d.SendMail = 'Yes' AND cp.SendMail = 'Yes' AND e.repeated = 'No' AND e.receivedtime > ? AND e.id > ? ORDER BY CAST(e.receivedtime AS DATETIME) DESC""" await cursor.execute(query, last_execution_time, last_event_id) results = await cursor.fetchall() # Sort Results by receivedtime results.sort(key=lambda x: x[4], reverse=True) # Check if any results are found for result in results: if result[4] is not None and result is not None: receivers = result[8] last_event_id = result[10] #print(f'Found - {result}') # Check and Split to queues if re.match(r"t\d+$", receivers): await telegram_queue.put(result) #print(f'TG Queue {result}') else: await email_queue.put(result) #print(f'EM Queue {result}') else: print(f'[ERROR]: result[4] value - {result[4]}') await asyncio.sleep(10) except Exception as e: logger_system.error(f'Exception: {type(e).__name__}, Message: {str(e)}') logger_system.error(f'Traceback: {traceback.format_exc()}') logger_system.error(f'Result[4] value - {result[4]}, Event ID : {result[10]}, E-Mail: {result[8]}') await asyncio.sleep(15) async def main(): first_iteration = True while True: try: if not first_iteration: print("Second iteration, sending messages") asyncio.create_task(get_events()) asyncio.create_task(process_telegram_queue()) asyncio.create_task(process_email_queue()) await asyncio.sleep(10) await asyncio.gather(get_events(), process_telegram_queue(), process_email_queue()) else: await asyncio.sleep(5) print("First iteration, skip sending messages") first_iteration = False await asyncio.sleep(5) pass except Exception as e: logger_system.error(f'Error : {e}') await asyncio.sleep(15) if __name__ == "__main__": asyncio.run(main())
Офлайн
То что выложил простынку хорошо
Но оно тут лишнее ты немного не вкуриваешь понятие очереди и так далее
Так что почитай доку по RabbitMQ
Аналогов море но этот пожалуй самый простой для освоения на уровне простых задач
А задача простая - кролик это как раз менеджер очереди
Ты создаешь в нем поток сознания и лупишь в него свои данные раз в нужное тебе время секунд - они все становяться в очередь как бабки
А далее ты уже средстваи самого кролика эту очередь ровняешь/обрабатываешь/выполняешь действия
Если на пальцах
Сервис апдейта сообщений кидает список на отправку в очередь
Кролик(Джон Уик) следит за ней и как только появились новые задачи обрабатывает их (при обработке могут подходить новые бабки)
Выполнил - смотри опять новые и так далее
Таким образом у тебя нет привязки основному процессу ждать всех отправок
Офлайн
Под кролика вроде нужен сервак, чует моя 5-я точка, это можно сделать средствами asyncio, головушка только не догоняет, сложно после с php прыгнуть в змейку…
Можно самый простой пример как для не далёкого, с питоном пока что на вы, и то при помощи gpt, вроде что-то понятно, вроде нет, может заинклюдить проверку на обработанный ID (он всегда уникален) который был обработан?
Офлайн
как я и сказал выше ты ничего не понимаешь не в асинхронности не в мнопогопоточности.
Я могу скинуть код но ты не сможешь его применить в свой набор инструкций потому что ты тупой очередной верующий который думает что программирование это просто
Но если ты все же другой
попробую заюзать python.threads
тогда за усилие поможем кодом
А за копипасту говнобота не будем
Офлайн
Сразу “тупой”, “говнобот” и т.д. я попросил помощи т.к я не понимаю почему идёт дубляж данных, хотя метод .get() должен удалять обработанный элемент из очереди, чего в моём случае почему-то не происходит…
Вы же мне советуете кролика или потоки не понятно зачем, этот как советовать поставить мотор на велосипед для того что бы проехать 20 метров, вместо того что бы подсказать каким болтом закрутить гайку на педали…
По поводу копипасты с “говнобота” какая разница использую я шаблоны из мануалов или из нейронки, если и там и там описана общая конструкция, которую я взял за основу и переделал под свои нужды… К тому же нейрока экономит время, но к сожалению не всегда с её помощью можно до конца понять ньюансы, по этому и пришел к людям за помощью. Если нет желания тратить на меня своё драгоценное время, не пишите, спасибо.
Офлайн
DeeKeiDБлин, я думал, ты шутишь.
По поводу копипасты с “говнобота” какая разница использую я шаблоны из мануалов или из нейронки, если и там и там описана общая конструкция, которую я взял за основу и переделал под свои нужды… К тому же нейрока экономит время
DeeKeiDТы правда через ChatGPT пытаешься программу написать? Да ты совсем дурак что ли? Это же просто расхайпованное говно, которое генерирует всякую ересь и просто врёт, что это мол идеальный исходный код. Для дебилов приманка. Надеюсь, ты её не оплачивал. Они там, помню, её сходу монетизировали, пока никто не успел понять, что это фуфло обычное.
Можно самый простой пример как для не далёкого, с питоном пока что на вы, и то при помощи gpt, вроде что-то понятно, вроде нет
ZerGДа, ему сначала нужно лечение пройти в психоневрологическом диспансере. А то человечек совсем потерялся в этом мире. В ChatGPT он решил бота написать. facepalm
Я могу скинуть код но ты не сможешь его применить в свой набор инструкций потому что ты тупой очередной верующий который думает что программирование это просто
Отредактировано py.user.next (Ноя. 22, 2023 12:58:30)
Офлайн
Под кролика вроде нужен сервак, чует моя 5-я точка, это можно сделать средствами asyncio, головушка только не догоняет, сложно после с php прыгнуть в змейку…
Отредактировано ZerG (Ноя. 22, 2023 14:27:26)
Офлайн
Вы серьёзно обливаете говном человека за то что он воспользовался GPT и попросил его выдать мне примеры реализации той или иной функции что бы посмотреть и узнать какие методы реализации существуют, вместо тонны времени на поиски всех доступных вариантов в куче разнообразных мануалах, что в принципе привело бы к тому же результату?
Про кролика уже изложено выше
Почему нельзя просто, без эмоций и по факту ответить на заданные вопросы вместо унижения людей которые чего-то не понимают и пришли за помощью что бы им подсказали а не обложили со всех сторон в духе - дебил! это же очевидно!
И почему очередь и асинхронность это противоречия? Хотя зачем я задаю этот вопрос, ведь асинхронные очереди придумали и разработали такие же как я тупые
Не думал что тут такое токсичное комьюнити.
В любом случае, спасибо вам и удачи.
Офлайн
Был неправ
не дебил
Дебилоид
Пришел со своим похапе
Нулевой уровень знаний - книги читать не хочет - лупиться в ачьк GPT
Получает ответ - вместо того что бы почитать что такое RabbitMQ спраишвает у гпт чата и отвечате в форум что для него наверное нужен сервер (на самом деле 4 сервера и один эсминец)
почему очередь и асинхронность это противоречия? Хотя зачем я задаю этот вопрос, ведь асинхронные очереди придумали и разработали такие же как я тупые
Офлайн