Форум сайта python.su
72
Бери ношу по себе, чтоб не падать при ходьбе.
Офлайн
0
Проверьте пожалуйста правильность кода. Подходит ли он под мои задачи?
1.Для параметров используется конфигурационный файл, считывающийся при запуске сервера;
2.“Черный” список доменных имен находится в конфигурационном файле;
3.Адрес вышестоящего сервера также находится в конфигурационном файле;
4.Сервер принимает запросы DNS-клиентов на стандартном порту;
5.Если запрос содержит доменное имя, включенное в “черный” список, сервер возвращает клиенту ответ, заданный конфигурационным файлом (варианты: not resolved, адрес в локальной сети, …).
6.Если запрос содержит доменное имя, не входящее в “черный” список, сервер перенаправляет запрос вышестоящему серверу, дожидается ответа и возвращает его клиенту
import datetime import sys import time import threading import traceback import SocketServer import dns.resolver bls = ["zen.spamhaus.org", "spam.abuse.ch", "cbl.abuseat.org", "virbl.dnsbl.bit.nl", "dnsbl.inps.de", "ix.dnsbl.manitu.net", "dnsbl.sorbs.net", "bl.spamcannibal.org", "bl.spamcop.net", "xbl.spamhaus.org", "pbl.spamhaus.org", "dnsbl-1.uceprotect.net", "dnsbl-2.uceprotect.net", "dnsbl-3.uceprotect.net", "db.wpbl.info"] if len(sys.argv) != 2: print 'Usage: %s <ip>' %(sys.argv[0]) quit() myIP = sys.argv[1] for bl in bls: try: my_resolver = dns.resolver.Resolver() query = '.'.join(reversed(str(myIP).split("."))) + "." + bl answers = my_resolver.query(query, "A") answer_txt = my_resolver.query(query, "TXT") print 'IP: %s IS listed in %s (%s: %s)' %(myIP, bl, answers[0], answer_txt[0]) except dns.resolver.NXDOMAIN: print 'IP: %s NOT resolved %s' %(myIP, bl) raise RuntimeError("Something bad happened") from dnslib import * class DomainName(str): def __getattr__(self, item): return DomainName(item + '.' + self) D = DomainName('example.com') IP = '127.0.0.1' TTL = 60 * 5 PORT = 5053 soa_record = SOA( mname=D.ns1, # primary name server rname=D.andrei, # email of the domain administrator times=( 201307231, # serial number 60 * 60 * 1, # refresh 60 * 60 * 3, # retry 60 * 60 * 24, # expire 60 * 60 * 1, # minimum ) ) ns_records = [NS(D.ns1), NS(D.ns2)] records = { D: [A(IP), AAAA((0,) * 16), MX(D.mail), soa_record] + ns_records, D.ns1: [A(IP)], # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3) D.ns2: [A(IP)], D.mail: [A(IP)], D.andrei: [CNAME(D)], } def dns_response(data): request = DNSRecord.parse(data) print request reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q) qname = request.q.qname qn = str(qname) qtype = request.q.qtype qt = QTYPE[qtype] if qn == D or qn.endswith('.' + D): for name, rrs in records.iteritems(): if name == qn: for rdata in rrs: rqt = rdata.__class__.__name__ if qt in ['*', rqt]: reply.add_answer(RR(rname=qname, rtype=QTYPE[rqt], rclass=1, ttl=TTL, rdata=rdata)) for rdata in ns_records: reply.add_ns(RR(rname=D, rtype=QTYPE.NS, rclass=1, ttl=TTL, rdata=rdata)) reply.add_ns(RR(rname=D, rtype=QTYPE.SOA, rclass=1, ttl=TTL, rdata=soa_record)) print "---- Reply:\n", reply return reply.pack() class BaseRequestHandler(SocketServer.BaseRequestHandler): def get_data(self): raise NotImplementedError def send_data(self, data): raise NotImplementedError def handle(self): now = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f') print "\n\n%s request %s (%s %s):" % (self.__class__.__name__[:3], now, self.client_address[0], self.client_address[1]) try: data = self.get_data() print len(data), data.encode('hex') # repr(data).replace('\\x', '')[1:-1] self.send_data(dns_response(data)) except Exception: traceback.print_exc(file=sys.stderr) class TCPRequestHandler(BaseRequestHandler): def get_data(self): data = self.request.recv(8192).strip() sz = int(data[:2].encode('hex'), 16) if sz < len(data) - 2: raise Exception("Wrong size of TCP packet") elif sz > len(data) - 2: raise Exception("Too big TCP packet") return data[2:] def send_data(self, data): sz = hex(len(data))[2:].zfill(4).decode('hex') return self.request.sendall(sz + data) class UDPRequestHandler(BaseRequestHandler): def get_data(self): return self.request[0].strip() def send_data(self, data): return self.request[1].sendto(data, self.client_address) if __name__ == '__main__': print "Starting nameserver..." servers = [ SocketServer.ThreadingUDPServer(('', PORT), UDPRequestHandler), SocketServer.ThreadingTCPServer(('', PORT), TCPRequestHandler), ] for s in servers: thread = threading.Thread(target=s.serve_forever) # that thread will start one more thread for each request thread.daemon = True # exit the server thread when the main thread terminates thread.start() print "%s server loop running in thread: %s" % (s.RequestHandlerClass.__name__[:3], thread.name) try: while 1: time.sleep(1) sys.stderr.flush() sys.stdout.flush() except KeyboardInterrupt: pass finally: for s in servers: s.shutdown()
Прикреплённый файлы:
simple2.py (5,0 KБ)
Офлайн
0
PooHЯ это прекрасно понимаю
Бери ношу по себе, чтоб не падать при ходьбе.
Хочется турбо старт себе устроить) по этому обратился за помощью.Офлайн
568
class DomainName(str): def __getattr__(self, item): return DomainName(item + '.' + self)
c = DomainName("google.ru") print c.3d # EXCEPTION!
Офлайн
0
FishHook
А касательно остального?все нормально?
Отредактировано Tsarneba (Ноя. 24, 2016 15:40:17)
Офлайн