1. ф-я get_events - Получает новые данные из Ms SQL (2014) раз в n секунд, сортирует их по параметрам и записывает в очередь
2. ф-и process_email_queue() и process_telegram_queue() постоянно обрабатывают очереди для отправки данных
Вопросы :
1. Данные дублируются (отправляются одни и те же события), не могу додуматься как решить эту проблему
2. Как можно оптимизировать\упростить код
import aioodbc import datetime import time import asyncio import logging from logging.handlers import TimedRotatingFileHandler import re import aiogram from aiogram import Bot, Dispatcher, types from aiogram.utils import executor import traceback import smtplib from email.message import EmailMessage from email.mime.text import MIMEText from email_validator import validate_email, EmailNotValidError TOKEN = '...' bot = Bot(token=TOKEN) dp = Dispatcher(bot) # Queues telegram_queue = asyncio.Queue() email_queue = asyncio.Queue() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Create loggers for different log types logger_telegram = logging.getLogger('telegram') logger_system = logging.getLogger('system') logger_email = logging.getLogger('email') # Configure TimedRotatingFileHandler for each log type file_handler_telegram = logging.handlers.TimedRotatingFileHandler('logs/telegram.txt', when='midnight', interval=1) file_handler_system = logging.handlers.TimedRotatingFileHandler('logs/system.txt', when='midnight', interval=1) file_handler_email = logging.handlers.TimedRotatingFileHandler('logs/email.txt', when='midnight', interval=1) # Set the log format log_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # Configure log levels and handlers for each logger logger_telegram.setLevel(logging.DEBUG) logger_telegram.addHandler(file_handler_telegram) file_handler_telegram.setFormatter(log_format) logger_system.setLevel(logging.DEBUG) logger_system.addHandler(file_handler_system) file_handler_system.setFormatter(log_format) logger_email.setLevel(logging.DEBUG) logger_email.addHandler(file_handler_system) file_handler_email.setFormatter(log_format) # Example usage logger_telegram.info('Start Telegram log message') logger_system.info('Start System log message') logger_email.info('Start Email log message') # Connecting to database dsn = 'DRIVER={ODBC Driver 17 for SQL Server};' \ 'SERVER=192.168.1.150;' \ 'DATABASE=davp;' \ 'UID=...;' \ 'PWD=...' @dp.message_handler(commands=['start']) async def send_welcome(message: types.Message): user_id = message.from_user.id user_firstname = message.from_user.first_name user_lastname = message.from_user.last_name user_login = message.from_user.username print(f"Command '/start' received from Name: {user_firstname}, Surname: {user_lastname}, Login: {user_login}, ID: {user_id}") await message.answer(f"User ID: {user_id}") @dp.message_handler(commands=['chatid']) async def send_chatid(message: types.Message): current_chat_id = message.chat.id await message.answer(f'Current chat ID: {current_chat_id}') # Telegram Sending function async def send_telegram(result): new_dt = result[4].strftime("%d.%m.%Y %H:%M:%S") g_nr = result[2] receivers = result[8] # Add Emoji to reactions reaction_type = str(result[11]) match reaction_type: case "1": # Alarm emoji_text = "🚨" case "2": #Panic emoji_text = "‼" case "3": # Fire emoji_text = "🧯" case "4": # Restore emoji_text = "✅" case "5": # Close emoji_text = "🔐" case "6": # Open emoji_text = "🔓" case "7": # Low Power emoji_text = "⚠️" case "9": # Power Fault emoji_text = "⚠️" case "17": # Late open emoji_text = "🔓" case "18": # Trouble emoji_text = "⚠️" case "20": # Alarm emoji_text = "🚨" case _: # None emoji_text = "" tgID = receivers.replace("t", "") message = f'[{new_dt}] [Nr]: {result[9]} [Event]: {emoji_text} {result[0]} [Rajons]: {g_nr}' try: #await bot.send_message(tgID, message) #print('Telegram Success') logger_telegram.info(f'[INFO]: Telegram sent: User: {result[6]}, ID: {receivers}, Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}') except Exception as e: logger_telegram.error(f'Telegram Error {e}') # E-Mail Sending function async def send_email(result): new_dt = result[4].strftime("%d.%m.%Y %H:%M:%S") g_nr = result[2] receivers = result[8] try: valid_email = validate_email(receivers) # Constructing email body subject = f"User: {receivers} Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}" message_body = "Message Sender" msg = MIMEText(message_body, 'plain', 'utf-8') msg['From'] = 'TEST <test@mymail.com>' msg['To'] = 'testinbox@mymail.com' msg['Subject'] = subject smtpObj = smtplib.SMTP('smtp-relay.gmail.com', 25) smtpObj.sendmail('test@mymail.com', ['testinbox@mymail.com'], msg.as_string()) smtpObj.quit() print('E-Mail Success!') print(f"[INFO]: Mail sent: {receivers}, Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}") except EmailNotValidError as e: print(f'[ERROR]: Can not send message due to an error [Nr]: {result[3]}, data: {receivers}, converted {tgID}') print(str(e)) except Exception as e: print(f'[ERROR]: E-Mail - {e}') async def process_telegram_queue(): while True: result = await telegram_queue.get() if result: await send_telegram(result) telegram_queue.task_done() await asyncio.sleep(2) async def process_email_queue(): while True: results = [] # Collect multiple results from the queue for _ in range(email_queue.qsize()): result = await email_queue.get() if result: results.append(result) email_queue.task_done() # Process each result for result in results: await send_email(result) await asyncio.sleep(2) async def get_events(): last_execution_time = datetime.datetime.now() last_event_id = 0 #receivers = result[8] #result = None # Initialize result outside the loop #receivers = None # Initialize receivers outside the loop while True: try: async with aioodbc.connect(dsn=dsn) as connection: async with connection.cursor() as cursor: query = """ SELECT DISTINCT e.eventname, e.unifier_id, e.groupnr, e.objectdbid, e.receivedtime, d.SendMail, cp.name, cp.objectid, cp.email, ou.unified_id, e.id, e.reactionid FROM (SELECT * FROM davp.dbo.events UNION ALL SELECT * FROM davp_archive.dbo.events) e JOIN davp.dbo.eventsdescription d ON e.eventdescid = d.Id JOIN davp.dbo.object_unifiers ou ON ou.id = e.unifier_id JOIN davp.dbo.corresppersons cp ON e.objectdbid = cp.objectid WHERE d.SendMail = 'Yes' AND cp.SendMail = 'Yes' AND e.repeated = 'No' AND e.receivedtime > ? AND e.id > ? ORDER BY CAST(e.receivedtime AS DATETIME) DESC""" await cursor.execute(query, last_execution_time, last_event_id) results = await cursor.fetchall() # Sort Results by receivedtime results.sort(key=lambda x: x[4], reverse=True) # Check if any results are found for result in results: if result[4] is not None and result is not None: receivers = result[8] last_event_id = result[10] #print(f'Found - {result}') # Check and Split to queues if re.match(r"t\d+$", receivers): await telegram_queue.put(result) #print(f'TG Queue {result}') else: await email_queue.put(result) #print(f'EM Queue {result}') else: print(f'[ERROR]: result[4] value - {result[4]}') await asyncio.sleep(10) except Exception as e: logger_system.error(f'Exception: {type(e).__name__}, Message: {str(e)}') logger_system.error(f'Traceback: {traceback.format_exc()}') logger_system.error(f'Result[4] value - {result[4]}, Event ID : {result[10]}, E-Mail: {result[8]}') await asyncio.sleep(15) async def main(): first_iteration = True while True: try: if not first_iteration: print("Second iteration, sending messages") asyncio.create_task(get_events()) asyncio.create_task(process_telegram_queue()) asyncio.create_task(process_email_queue()) await asyncio.sleep(10) await asyncio.gather(get_events(), process_telegram_queue(), process_email_queue()) else: await asyncio.sleep(5) print("First iteration, skip sending messages") first_iteration = False await asyncio.sleep(5) pass except Exception as e: logger_system.error(f'Error : {e}') await asyncio.sleep(15) if __name__ == "__main__": asyncio.run(main())