Уведомления

Группа в Telegram: @pythonsu

#1 Ноя. 21, 2023 13:45:24

DeeKeiD
Зарегистрирован: 2023-11-21
Сообщения: 5
Репутация: +  0  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

Доброго времени суток, пытаюсь реализовать следующий алгоритм:

1. ф-я get_events - Получает новые данные из Ms SQL (2014) раз в n секунд, сортирует их по параметрам и записывает в очередь
2. ф-и process_email_queue() и process_telegram_queue() постоянно обрабатывают очереди для отправки данных

Вопросы :
1. Данные дублируются (отправляются одни и те же события), не могу додуматься как решить эту проблему
2. Как можно оптимизировать\упростить код

 import aioodbc
import datetime
import time
import asyncio
import logging
from logging.handlers import TimedRotatingFileHandler
import re
import aiogram
from aiogram import Bot, Dispatcher, types
from aiogram.utils import executor
import traceback
import smtplib
from email.message import EmailMessage
from email.mime.text import MIMEText
from email_validator import validate_email, EmailNotValidError
TOKEN = '...'
bot = Bot(token=TOKEN)
dp = Dispatcher(bot)
# Queues
telegram_queue = asyncio.Queue()
email_queue = asyncio.Queue()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Create loggers for different log types
logger_telegram = logging.getLogger('telegram')
logger_system = logging.getLogger('system')
logger_email = logging.getLogger('email')
# Configure TimedRotatingFileHandler for each log type
file_handler_telegram = logging.handlers.TimedRotatingFileHandler('logs/telegram.txt', when='midnight', interval=1)
file_handler_system = logging.handlers.TimedRotatingFileHandler('logs/system.txt', when='midnight', interval=1)
file_handler_email = logging.handlers.TimedRotatingFileHandler('logs/email.txt', when='midnight', interval=1)
# Set the log format
log_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Configure log levels and handlers for each logger
logger_telegram.setLevel(logging.DEBUG)
logger_telegram.addHandler(file_handler_telegram)
file_handler_telegram.setFormatter(log_format)
logger_system.setLevel(logging.DEBUG)
logger_system.addHandler(file_handler_system)
file_handler_system.setFormatter(log_format)
logger_email.setLevel(logging.DEBUG)
logger_email.addHandler(file_handler_system)
file_handler_email.setFormatter(log_format)
# Example usage
logger_telegram.info('Start Telegram log message')
logger_system.info('Start System log message')
logger_email.info('Start Email log message')
# Connecting to database
dsn = 'DRIVER={ODBC Driver 17 for SQL Server};' \
          'SERVER=192.168.1.150;' \
          'DATABASE=davp;' \
          'UID=...;' \
          'PWD=...'
@dp.message_handler(commands=['start'])
async def send_welcome(message: types.Message):
    user_id = message.from_user.id
    user_firstname = message.from_user.first_name
    user_lastname = message.from_user.last_name
    user_login = message.from_user.username
    print(f"Command '/start' received from Name: {user_firstname}, Surname: {user_lastname}, Login: {user_login}, ID: {user_id}")
    await message.answer(f"User ID: {user_id}")
@dp.message_handler(commands=['chatid'])
async def send_chatid(message: types.Message):
    current_chat_id = message.chat.id
    await message.answer(f'Current chat ID: {current_chat_id}')
    
# Telegram Sending function
async def send_telegram(result):
    new_dt = result[4].strftime("%d.%m.%Y %H:%M:%S")
    g_nr = result[2]
    receivers = result[8]
    # Add Emoji to reactions
    reaction_type = str(result[11])
    match reaction_type:
        case "1": # Alarm
            emoji_text = "🚨"
        case "2": #Panic
            emoji_text = "‼"
        case "3": # Fire
            emoji_text = "🧯"
        case "4": # Restore
            emoji_text = "✅"
        case "5": # Close
            emoji_text = "🔐"
        case "6": # Open
            emoji_text = "🔓"
        case "7": # Low Power
            emoji_text = "⚠️"
        case "9": # Power Fault
            emoji_text = "⚠️"
        case "17": # Late open
            emoji_text = "🔓"
        case "18": # Trouble
            emoji_text = "⚠️"
        case "20": # Alarm
            emoji_text = "🚨"
        case _: # None
            emoji_text = ""
    tgID = receivers.replace("t", "")
    message = f'[{new_dt}] [Nr]: {result[9]} [Event]: {emoji_text} {result[0]} [Rajons]: {g_nr}'
    try:
        #await bot.send_message(tgID, message)
        #print('Telegram Success')
        logger_telegram.info(f'[INFO]: Telegram sent: User: {result[6]}, ID: {receivers}, Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}')
    except Exception as e:
        logger_telegram.error(f'Telegram Error {e}')
# E-Mail Sending function
async def send_email(result):
    new_dt = result[4].strftime("%d.%m.%Y %H:%M:%S")
    g_nr = result[2]
    receivers = result[8]
    try:
        valid_email = validate_email(receivers)
        # Constructing email body
        subject = f"User: {receivers} Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}"
        message_body = "Message Sender"
        
        msg = MIMEText(message_body, 'plain', 'utf-8')
        msg['From'] = 'TEST <test@mymail.com>'
        msg['To'] = 'testinbox@mymail.com'
        msg['Subject'] = subject
        
        smtpObj = smtplib.SMTP('smtp-relay.gmail.com', 25)
        smtpObj.sendmail('test@mymail.com', ['testinbox@mymail.com'], msg.as_string())
        smtpObj.quit()
        print('E-Mail Success!')
        print(f"[INFO]: Mail sent: {receivers}, Nr.:{result[9]} {result[0]}, Rajons: {g_nr}, Laiks: {new_dt}")
        
    except EmailNotValidError as e:
        print(f'[ERROR]: Can not send message due to an error [Nr]: {result[3]}, data: {receivers}, converted {tgID}')
        print(str(e))
    except Exception as e:
        print(f'[ERROR]: E-Mail - {e}')
async def process_telegram_queue():
    while True:
        result = await telegram_queue.get()
        if result:
            await send_telegram(result)
        telegram_queue.task_done()
        await asyncio.sleep(2)
async def process_email_queue():
    while True:
        results = []
        # Collect multiple results from the queue
        for _ in range(email_queue.qsize()):
            result = await email_queue.get()
            if result:
                results.append(result)
            email_queue.task_done()
        
        # Process each result
        for result in results:
            await send_email(result)
        await asyncio.sleep(2)
async def get_events():
    last_execution_time = datetime.datetime.now()
    last_event_id = 0
    #receivers = result[8]
    #result = None  # Initialize result outside the loop
    #receivers = None  # Initialize receivers outside the loop
    
    while True:
        try:
            async with aioodbc.connect(dsn=dsn) as connection:
                async with connection.cursor() as cursor:
                    query = """
                    SELECT DISTINCT
                            e.eventname,
                            e.unifier_id,
                            e.groupnr,
                            e.objectdbid,
                            e.receivedtime,
                            d.SendMail,
                            cp.name,
                            cp.objectid,
                            cp.email,
                            ou.unified_id,
                            e.id,
                            e.reactionid
                        FROM
                            (SELECT * FROM davp.dbo.events
                            UNION ALL
                            SELECT * FROM davp_archive.dbo.events) e
                        JOIN davp.dbo.eventsdescription d
                            ON e.eventdescid = d.Id
                        JOIN davp.dbo.object_unifiers ou
                            ON ou.id = e.unifier_id
                        JOIN davp.dbo.corresppersons cp
                            ON e.objectdbid = cp.objectid
                        WHERE d.SendMail = 'Yes' AND cp.SendMail = 'Yes' AND e.repeated = 'No'
                            AND e.receivedtime > ? AND e.id > ?
                            ORDER BY CAST(e.receivedtime AS DATETIME) DESC"""
                    await cursor.execute(query, last_execution_time, last_event_id)
                    results = await cursor.fetchall()
                    # Sort Results by receivedtime
                    results.sort(key=lambda x: x[4], reverse=True)
                    # Check if any results are found
                    for result in results:
                        if result[4] is not None and result is not None:
                            receivers = result[8]
                            last_event_id = result[10]
                            #print(f'Found - {result}')
                            # Check and Split to queues
                            if re.match(r"t\d+$", receivers):
                                await telegram_queue.put(result)
                                #print(f'TG Queue {result}')
                            else:
                                await email_queue.put(result)
                                #print(f'EM Queue {result}')
                        else:
                            print(f'[ERROR]: result[4] value - {result[4]}')
            await asyncio.sleep(10)
        except Exception as e:
            logger_system.error(f'Exception: {type(e).__name__}, Message: {str(e)}')
            logger_system.error(f'Traceback: {traceback.format_exc()}')
            logger_system.error(f'Result[4] value - {result[4]}, Event ID : {result[10]}, E-Mail: {result[8]}')
            await asyncio.sleep(15)
async def main():
    first_iteration = True
    while True:
        try:
            if not first_iteration:
                print("Second iteration, sending messages")
                asyncio.create_task(get_events())
                asyncio.create_task(process_telegram_queue())
                asyncio.create_task(process_email_queue())
                await asyncio.sleep(10)
                await asyncio.gather(get_events(), process_telegram_queue(), process_email_queue())
            else:
                await asyncio.sleep(5)
                print("First iteration, skip sending messages")
                first_iteration = False
                await asyncio.sleep(5)
                pass
        except Exception as e:
            logger_system.error(f'Error : {e}')
            await asyncio.sleep(15)
if __name__ == "__main__":
    asyncio.run(main())

Офлайн

#2 Ноя. 21, 2023 17:45:58

ZerG
Зарегистрирован: 2012-04-05
Сообщения: 2627
Репутация: +  61  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

То что выложил простынку хорошо
Но оно тут лишнее ты немного не вкуриваешь понятие очереди и так далее
Так что почитай доку по RabbitMQ
Аналогов море но этот пожалуй самый простой для освоения на уровне простых задач

А задача простая - кролик это как раз менеджер очереди
Ты создаешь в нем поток сознания и лупишь в него свои данные раз в нужное тебе время секунд - они все становяться в очередь как бабки
А далее ты уже средстваи самого кролика эту очередь ровняешь/обрабатываешь/выполняешь действия

Если на пальцах
Сервис апдейта сообщений кидает список на отправку в очередь
Кролик(Джон Уик) следит за ней и как только появились новые задачи обрабатывает их (при обработке могут подходить новые бабки)
Выполнил - смотри опять новые и так далее
Таким образом у тебя нет привязки основному процессу ждать всех отправок



Влодение рускай арфаграфией - это как владение кунг-фу: настаящие мастира не преминяют ево бес ниабхадимости

Офлайн

#3 Ноя. 21, 2023 20:29:02

DeeKeiD
Зарегистрирован: 2023-11-21
Сообщения: 5
Репутация: +  0  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

Под кролика вроде нужен сервак, чует моя 5-я точка, это можно сделать средствами asyncio, головушка только не догоняет, сложно после с php прыгнуть в змейку…
Можно самый простой пример как для не далёкого, с питоном пока что на вы, и то при помощи gpt, вроде что-то понятно, вроде нет, может заинклюдить проверку на обработанный ID (он всегда уникален) который был обработан?

Офлайн

#4 Ноя. 21, 2023 22:59:21

ZerG
Зарегистрирован: 2012-04-05
Сообщения: 2627
Репутация: +  61  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

как я и сказал выше ты ничего не понимаешь не в асинхронности не в мнопогопоточности.
Я могу скинуть код но ты не сможешь его применить в свой набор инструкций потому что ты тупой очередной верующий который думает что программирование это просто

Но если ты все же другой
попробую заюзать python.threads
тогда за усилие поможем кодом

А за копипасту говнобота не будем



Влодение рускай арфаграфией - это как владение кунг-фу: настаящие мастира не преминяют ево бес ниабхадимости

Офлайн

#5 Ноя. 22, 2023 10:55:41

DeeKeiD
Зарегистрирован: 2023-11-21
Сообщения: 5
Репутация: +  0  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

Сразу “тупой”, “говнобот” и т.д. я попросил помощи т.к я не понимаю почему идёт дубляж данных, хотя метод .get() должен удалять обработанный элемент из очереди, чего в моём случае почему-то не происходит…

Вы же мне советуете кролика или потоки не понятно зачем, этот как советовать поставить мотор на велосипед для того что бы проехать 20 метров, вместо того что бы подсказать каким болтом закрутить гайку на педали…

По поводу копипасты с “говнобота” какая разница использую я шаблоны из мануалов или из нейронки, если и там и там описана общая конструкция, которую я взял за основу и переделал под свои нужды… К тому же нейрока экономит время, но к сожалению не всегда с её помощью можно до конца понять ньюансы, по этому и пришел к людям за помощью. Если нет желания тратить на меня своё драгоценное время, не пишите, спасибо.

Офлайн

#6 Ноя. 22, 2023 12:54:59

py.user.next
От:
Зарегистрирован: 2010-04-29
Сообщения: 9846
Репутация: +  853  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

DeeKeiD
По поводу копипасты с “говнобота” какая разница использую я шаблоны из мануалов или из нейронки, если и там и там описана общая конструкция, которую я взял за основу и переделал под свои нужды… К тому же нейрока экономит время
Блин, я думал, ты шутишь.

DeeKeiD
Можно самый простой пример как для не далёкого, с питоном пока что на вы, и то при помощи gpt, вроде что-то понятно, вроде нет
Ты правда через ChatGPT пытаешься программу написать? Да ты совсем дурак что ли? Это же просто расхайпованное говно, которое генерирует всякую ересь и просто врёт, что это мол идеальный исходный код. Для дебилов приманка. Надеюсь, ты её не оплачивал. Они там, помню, её сходу монетизировали, пока никто не успел понять, что это фуфло обычное.

ZerG
Я могу скинуть код но ты не сможешь его применить в свой набор инструкций потому что ты тупой очередной верующий который думает что программирование это просто
Да, ему сначала нужно лечение пройти в психоневрологическом диспансере. А то человечек совсем потерялся в этом мире. В ChatGPT он решил бота написать. facepalm



Отредактировано py.user.next (Ноя. 22, 2023 12:58:30)

Офлайн

#7 Ноя. 22, 2023 14:26:28

ZerG
Зарегистрирован: 2012-04-05
Сообщения: 2627
Репутация: +  61  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

Под кролика вроде нужен сервак, чует моя 5-я точка, это можно сделать средствами asyncio, головушка только не догоняет, сложно после с php прыгнуть в змейку…

Тут мои полномочия ВСЁ

Даже не удосужился открыть сайт кролика - судя по всему про кроля ему гпт отвечал
ОЧередь и асихнронность это противоречия
секрет говнокода на пыхе раскрыт - его пишут гптшники



Влодение рускай арфаграфией - это как владение кунг-фу: настаящие мастира не преминяют ево бес ниабхадимости

Отредактировано ZerG (Ноя. 22, 2023 14:27:26)

Офлайн

#8 Ноя. 22, 2023 16:16:11

DeeKeiD
Зарегистрирован: 2023-11-21
Сообщения: 5
Репутация: +  0  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

Вы серьёзно обливаете говном человека за то что он воспользовался GPT и попросил его выдать мне примеры реализации той или иной функции что бы посмотреть и узнать какие методы реализации существуют, вместо тонны времени на поиски всех доступных вариантов в куче разнообразных мануалах, что в принципе привело бы к тому же результату?

Про кролика уже изложено выше

Почему нельзя просто, без эмоций и по факту ответить на заданные вопросы вместо унижения людей которые чего-то не понимают и пришли за помощью что бы им подсказали а не обложили со всех сторон в духе - дебил! это же очевидно!

И почему очередь и асинхронность это противоречия? Хотя зачем я задаю этот вопрос, ведь асинхронные очереди придумали и разработали такие же как я тупые

Не думал что тут такое токсичное комьюнити.

В любом случае, спасибо вам и удачи.

Офлайн

#9 Ноя. 22, 2023 18:47:04

ZerG
Зарегистрирован: 2012-04-05
Сообщения: 2627
Репутация: +  61  -
Профиль   Отправить e-mail  

Асинхронная обработка и отправка данных

Был неправ
не дебил
Дебилоид
Пришел со своим похапе
Нулевой уровень знаний - книги читать не хочет - лупиться в ачьк GPT
Получает ответ - вместо того что бы почитать что такое RabbitMQ спраишвает у гпт чата и отвечате в форум что для него наверное нужен сервер (на самом деле 4 сервера и один эсминец)

почему очередь и асинхронность это противоречия? Хотя зачем я задаю этот вопрос, ведь асинхронные очереди придумали и разработали такие же как я тупые

Сука !!! Да потому что очередь предполагает поочердный проход от А до Я в строгом порядке
А асинхронность это противоположность очереди - там кто первый встал того и тапки
И тут ненужно быть программистом что бы отличать очередь от от асинхронности

Удача пригодиться тебе



Влодение рускай арфаграфией - это как владение кунг-фу: настаящие мастира не преминяют ево бес ниабхадимости

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version