Найти - Пользователи
Полная версия: Помогите понять многопоточность.
Начало » Python для новичков » Помогите понять многопоточность.
1 2 3
vlados
Когда я запускаю скрипт, давая ему 100К дроменов, он начинает писать тучу ошибок. Почему? Как от них избавится?
Пример ошибок.
    self.run()
File "C:\work\tic.py", line 14, in run
self.tic = urllib.urlopen('http://bar-navig.yandex.ru/u?ver=2&show=32&url=ht
tp://'+str(self.url)+'/').read()
File "C:\Program Files (x86)\python 2.7\lib\urllib.py", line 84, in urlopen
return opener.open(url)
File "C:\Program Files (x86)\python 2.7\lib\urllib.py", line 205, in open
return getattr(self, name)(url)
File "C:\Program Files (x86)\python 2.7\lib\urllib.py", line 343, in open_http

errcode, errmsg, headers = h.getreply()
File "C:\Program Files (x86)\python 2.7\lib\httplib.py", line 1099, in getrepl
y
response = self._conn.getresponse()
File "C:\Program Files (x86)\python 2.7\lib\httplib.py", line 1027, in getresp
onse
response.begin()
File "C:\Program Files (x86)\python 2.7\lib\httplib.py", line 407, in begin
version, status, reason = self._read_status()
File "C:\Program Files (x86)\python 2.7\lib\httplib.py", line 365, in _read_st
atus
line = self.fp.readline()
File "C:\Program Files (x86)\python 2.7\lib\socket.py", line 430, in readline
data = recv(1)
IOError: [Errno socket error] [Errno 10054]

Exception in thread Thread-8109:
Traceback (most recent call last):
File "C:\Program Files (x86)\python 2.7\lib\threading.py", line 552, in __boot
strap_inner
self.run()
File "C:\work\tic.py", line 14, in run
self.tic = urllib.urlopen('http://bar-navig.yandex.ru/u?ver=2&show=32&url=ht
tp://'+str(self.url)+'/').read()
File "C:\Program Files (x86)\python 2.7\lib\urllib.py", line 84, in urlopen
return opener.open(url)
File "C:\Program Files (x86)\python 2.7\lib\urllib.py", line 205, in open
return getattr(self, name)(url)
File "C:\Program Files (x86)\python 2.7\lib\urllib.py", line 343, in open_http

errcode, errmsg, headers = h.getreply()
File "C:\Program Files (x86)\python 2.7\lib\httplib.py", line 1099, in getrepl
y
response = self._conn.getresponse()
File "C:\Program Files (x86)\python 2.7\lib\httplib.py", line 1027, in getresp
onse
response.begin()
File "C:\Program Files (x86)\python 2.7\lib\httplib.py", line 407, in begin
version, status, reason = self._read_status()
File "C:\Program Files (x86)\python 2.7\lib\httplib.py", line 365, in _read_st
atus
line = self.fp.readline()
File "C:\Program Files (x86)\python 2.7\lib\socket.py", line 430, in readline
data = recv(1)
IOError: [Errno socket error] [Errno 10054]
Soteric
Я подозреваю, что серверу не нравится, что ему практически единомоментно приходит 100К запросов. Подобная задача реализуется при помощи пула потоков и очереди задач (Queue), и является типичным примером задачи Consumer/Producer (Потребитель/Производитель). В вашем случае основной поток является производителем и кладет урлы для обработки в очередь. А некое фиксированное число потоков (например 10) являются потребителями, работают в цикле и достают урлы из очереди один за другим.
vlados
Ну, мне нужно 10 потоков допустим, как это реализовать?
Ed
vlados
Ну, мне нужно 10 потоков допустим, как это реализовать?
Создаете первые десять. По завершении каждого создаете новый. Чтобы вовремя реагировать на завершение можно, как вариант, использовать очередь (from Queue import Queue).
Работу с очередью я бы закрыл например этим: http://docs.python.org/library/threading.html#semaphore-objects во избежание коллизий при работе из разных тредов. Хотя может это и не нужно, поскольку очередь thread safe, то есть сама собой лочится.
В очередь можно совать некий идентификатор треда и результаты выполнения. Индекс в списке тредов сойдет за такой идентификатор. Основной тред будет тупо ждать когда в очередь приедут результаты и создавать по приезду новый тред, кладя его на место отработавшего в список тредов по приехавшему индексу. И так пока урлы не кончатся. Как-то так.
Если это непонятно, то могу набросать некий код, но лучше вы сами попробуйте.
vlados
Да, к сожалению не совсем понял вас, не могли бы ли вы показать пример? Сойдет даже псевдо код.
Ed
Вот, пожалуйста. Только я заменил возню с вытаскиванием урлов просто подсчетом количества символов в урле и 2х секундным слипом.
#!/usr/bin/python -tt

import sys
from threading import Thread
from time import sleep
from Queue import Queue

class Job(Thread):
def __init__(self, tid, url, queue):
Thread.__init__(self)
self.url = url
self.tid = tid
self.queue = queue
def run(self):
sleep(2)
result = (self.tid, self.url, len(self.url))
self.queue.put(result)
return result

def new_thread(tid, url, queue):
print 'Creating new tread %d(%s)' % (tid, url)
thread = Job(tid, url, queue)
thread.start()
return thread

def main(argv):
if len(argv) > 1:
psize = int(argv[1])
else:
psize = 10 # default pool size

count = 100 # number of urls generated
urls = ['http://url%s' % i for i in xrange(count)]
threads = []
queue = Queue()

# run first bunch of threads
for i in range(min(psize, len(urls))):
threads.append(new_thread(i, urls.pop(), queue))

# main loop
# create new threads as soon as jobs are done
for _ in xrange(count):
tid, url, ulen = queue.get()
print 'Thread %d(%s) is finished. Result: %d' % (tid, url, ulen)
if len(urls):
threads[tid] = new_thread(tid, urls.pop(), queue)

if __name__ == '__main__':
sys.exit(main(sys.argv))
Все равно лучше было бы, если бы вы сами это сделали. Так было бы больше толку.
Задавайте вопросы, если что непонятно.
vlados
Что-то я не шибко понял, но нашел интересный пример на хабре.
#!usr/bin/env python
#-*-encoding:UTF-8-*-

#==================<Имортирование необходимых модулей>==================
import urllib2
#Модуль для работы с протоколом HTTP, высокоуровневый
import urllib
#Модуль для работы с протоколом HTTP, более низкоуровневый чем urllib2,
#фактически из него необходима одна функция - urllib.urlquote
import Queue
#Модуль, который представляет собой "Pool", фактически это список, в
#котором на нужных местах вставлены замки таким образом, чтобы к нему
#одновременно мог обращаться только один поток
import threading
#Модуль для работы с потоками, из него понадобится только
#threading.active_count, threading.Thread, threading.Thread.start,
#threading.Rlock
import re
#Модуль для работы с регулярными выражениями, его использование выходит
#за пределы статьи
import time
#Модуль для работы со временем, из него нужна только функция sleep
queue = Queue.Queue()
#Обязательное присваивание, нужно делать именно так (т.е. импортировать
#класс Queue из модуля Queue и инициализировать его)
#==================</Имортирование необходимых модулей>=================

#==============================<Настройки>==============================
PROXY = "10.10.31.103:3128"
#Во время написания статьи сижу за прокси-сервером, поэтому в статье
#затрагивается и этот вопрос, этой строкой обьявляется глобальная
#переменная PROXY, в которой находится адрес прокси-сервера. Для работы
#напрямую необходимо указать значение None
HEADERS = {"User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1",
"Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1",
"Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7",
"Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1",
"Accept-Encoding" : "identity, *;q=0",
"Connection" : "Keep-Alive"}
#Для того чтобы получить страницу с www.google.com НЕОБХОДИМО использовать
#заголовки браузера, они представлены выше в ассоциативном массиве HEADERS,
#соответствуют реальным заголовкам браузера Opera с маленько модификацией, эти
#заголовки означают что клиент не может принимать zlib compressed data, т.е.
#сжатые данные - не хотел я заморачиваться еще и с разархивироанием страниц, тем
#более что не все сайты их сжимают...
THREADS_COUNT = 10
#В принципе это все настройки приложения, это-количество потоков
DEEP = 30
#Это - значение, которое отвечает за глубину страниц поиска, которые
#нужно просматривать, фактически же определяет собой количество ссылок,
#которые будут собраны сборщиком.
ENCODING = "UTF-8"
#Кодировка ваших файлов (для загрузки данных из файла с запросами и
#последующего их перевода в юникод)
#==============================</Настройки>===================================

LOCK = threading.RLock()
# Вот тут то впервые и затрагивается модуль threading
#создается обьект LOCK, который представляет собой класс threading.RLock из
#модуля threading, это -простейший замок, который запрещает исполнение
#несколькими потоками участка кода который идет после вызова его метода
#acquire() Основным отличием threading.RLock от threading.Lock (тоже класс из
#модуля threading) является то, что каждый поток может обращаться к обьекту
#threading.RLock неограниченное количество раз, обьект threading.Lock может
#вызываться каждым потоком только единожды.
#///////////////////////////////////////////////////////////////////////
def worker():
# Обьявление функции worker, входных аргументов нет
global queue
#Здесь и далее я буду обьявлять функции из глобального пространства
#имен в локальном для лучшей читабельности кода, хотя в написании
#софта такое делать строго не рекомендую (!)
while True:
#Запуск бесконечного цикла, в котором будет происходить работа
try:
#Обработка ошибок, блок try/except, когда обработается
#ошибка Queue.Empty это значит, что список задач пуст, и поток
#должен завершить свою работу
target_link = queue.get_nowait()
#Эта строчка олицетворяет собой получение задачи потоком из
#списка задач queue
except Queue.Empty, error:
#сам перехват ошибки
return
#Завершение работы функции
parsed_data = get_and_parse_page(target_link)
#Позже будет реализована функция, которая будет получать
#страницу и доставать из нее необходимые значения
if parsed_data != "ERROR":
#Проверка на то, была ли получена страница
write_to_file(parsed_data)
#Также будет реализована функция для записи собранных данных в файл
else:
queue.put(target_link)
#Если страница не была получена, то забрасываем ее обратно в queue
#///////////////////////////////////////////////////////////////////////
def write_to_file(parsed_data):
#Обявление функции write_to_file, аргумент –массив данных для записи
global LOCK
global ENCODING
LOCK.acquire()
#"Накидывание замка", следующий далее участок кода может выполнятся
#только одним потоком в один и тот же момент времени
with open("parsed_data.txt", "a") as out:
#Используется with statement, открывается файл parsed_data.txt с
#правами "a", что означает дозапись в конец файла, и присваиваевается
#хэндлеру на файл имя out (я так привык)
for site in parsed_data:
#Проход циклом по всем элементам parsed data, имя активного в
#данный момент элемента будет site
link, title = site[0], site[1]
#Присваивание переменным link и title значений из кортежа site
title = title.replace("<em>", "").replace("</em>", "").replace("<b>", "").replace("</b>", "")
#.replace -это замена HTML-тэгов, которые проскакивают в title и совершено не нужны
out.write(u"{link}|{title}\n".format(link=link, title=title).encode("cp1251"))
#Производится сама запись в файл, используется оператор форматирования
#строк .format, в отличие от % он поддерживает именованные аргументы, чем я и не
#преминул воспользоваться, таким образом в файл пишется строка вида:
#ссылка на сайт | title страницы\n -символ переноса строки(все это переводится
#из юникода в cp1251)
LOCK.release()
#"Отпирание" замка, в противном случае ни один из следующих
#потоков не сможет работать с этим участком кода. По-хорошему, тут тоже нужно
#сделать обработку ошибок, но это учебный пример, да и ошибка там может
#возникнуть (после добавки замка в этот участок кода) только если во время
#работы приложения выставить атрибут “только чтение” для данного пользователя
#относительно файла parsed_data.txt
#///////////////////////////////////////////////////////////////////////
def get_and_parse_page(target_link):
#Обьявление функции, аргумент – ссылка на страницу
global PROXY
#Указывает на то, что в данной функции используется переменная PROXY
#из глобального пространства имен
global HEADERS
#То же и для переменной Headers
if PROXY is not None:
#Если значение PROXY не равно None
proxy_handler = urllib2.ProxyHandler( { "http": "http://"+PROXY+"/" } )
#Создается Прокси-Хэндлер с указанным прокси
opener = urllib2.build_opener(proxy_handler)
#Далее создается opener c созданным ранее Прокси-Хэндлером
urllib2.install_opener(opener)
#И наконец-то он устанавливается, теперь нет необходимости в
#шаманствах, все запросы в которых будет использоваться urllib2
#(в пределах этой функции будут направляться через указанный ранее
#PROXY)
page_request = urllib2.Request(url=target_link, headers=HEADERS)
#Создается обьект Request, который олицетворяет собой Request instance,
#фактически это GET запрос к серверу с указанными параметрами, мне
#же необходимо использовать заголовки...
try:
#Обработка всех возможных ошибок, возникающих во время получения
#страницы, это нехорошо, но лучше чем полное отсутствие обработки
page = urllib2.urlopen(url=page_request).read().decode("UTF-8", "replace")
#Переменной page присваиваем прочитанное значение страницы запроса, переведенное
#в unicode из кодировки UTF-8 (кодировка, используемая на www.google.com) (в
#Python 2.6 unicode -это отдельный тип данных(!))
except Exception ,error:
#Сам перехват ошибки и сохранение ее значения в переменную error
print str(error)
#Вывод ошибки в консоль, прведварительно переведя ее в строку
#(просто на всякий случай)
return "ERROR"
#Возврат из функции в том случае, если во время работы возникла ошибка
harvested_data = re.findall(r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''', page)
#Сбор со страницы поиска ссылок и title найденных страниц
#Очистка данных от результатов поиска по блогам, картинкам и др. сервисам гугла
for data in harvested_data:
#Для каждого элемента массива harvested_data присвоить ему имя data
if data[0].startswith("/"):
#Если нулевой элемент массива data(ссылка) начинается с символа /
harvested_data.remove(data)
#Удаляем его из массива harvested_data
if ".google.com" in data[0]:
#Если нулевой элемент массива data(ссылка) имеет в себе .google.com
harvested_data.remove(data)
#Также удаляем его из массива harvested_data
return harvested_data
#Возвращаем собранные значения из функции
#///////////////////////////////////////////////////////////////////////
def main():
#Обявление функции, входных аргментов нет
print "STARTED"
#Вывод в консоль о начале процесса
global THREADS_COUNT
global DEEP
global ENCODING
#Обьявляние о том что эти переменные будут использоваться
#из глобального пространства имен
with open("requests.txt") as requests:
#Открываем файл requests в котором находятся запросы к поисковику
for request in requests:
#На данном файлхэндлере доступен итератор, поэтому можно
#пройтись по файлу циклом, без загрузки файл в оперативку, но это
#тоже не важно, я все равно его туда загружу:)
request = request.translate(None, "\r\n").decode(ENCODING, "replace")
#Очистка запроса от символов конца строки а также их
#перевод в юникод (с заменой конфликтных символов)
empty_link = "http://www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
#Это пустой адрес страницы поиска, отформатирован
for i in xrange(0, DEEP, 10):
#Проход итератором по диапазону #чисел от 0 до DEEP,
#который представляет собой максимальную глубину поиска с
#шагом в 10, т.е. получаем из этого диапазона только
#числа десятков, т.е. 10, 20, 30 (как идет поиск у гугла)
queue.put(empty_link.format(request=request.encode("UTF-8"), N=i))
#Добавление в очередь каждой сгенерированной ссылки
#и перевод её в кодировку UTF-8 (для гугла)
for _ in xrange(THREADS_COUNT):
#Проход циклом по диапазону чисел количества потоков
thread_ = threading.Thread(target=worker)
#Создается поток, target-имя функции, которая являет собой
#участок кода, выполняемый многопоточно
thread_.start()
#Вызывается метод start() , таким образом поток запускается
while threading.active_count() >1:
#До тех пор, пока количество активных потоков больше 1 (значит,
#запущенные потоки продолжают работу)
time.sleep(1)
#Основной поток засыпает на 1 секунду
print "FINISHED"
#Вывод в консоль о завершении работы приложения
#///////////////////////////////////////////////////////////////////////
if __name__ == "__main__":
main()
Меня интересует вопрос получения значения допустим открытой странице с результатом гугла, и спарсеным количеством результатов. Как это записать в файл вместе с (в моем случае с подставляемой ссылкой, т.е. записываться файл в виде “{Site_url}:{Tic}”, где {Site_url} - сайт, который подставляется вот тут - "http://bar-navig.yandex.ru/u?ver=2&show=32&url=http://'+str(url)+'/'“, а {Tic} - это результат действия regx ”'value=“(.*?)”'" или после if else)?
vlados
Ну, я все в принципе понял, но у меня скрипт перезапустился после ошибки indexerror. Что это за ошибка и как ее предотвратить?
Ed
Что именно непонятно? Насчет indexerror - покажите трэйсбэк.
vlados
Хотя может и не из-за ошибки.
Вот посмотрите код, он после завершения(которого не происходит почему-то), начинает работу заново. Вот скрипт:
#!/usr/bin/env python
import Queue
import threading
import urllib2
import time
import datetime
import urllib
import re
import sys

hosts = open('list', 'r').readlines()



goods = open('good', 'w').close()

queue = Queue.Queue()

class ThreadUrl(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue

def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and prints first 1024 bytes of page
self.tic = urllib.urlopen('http://bar-navig.yandex.ru/u?ver=2&show=32&url=http://'+str(host.replace('\r', '').replace('\n', ''))+'/').read()
self.tic = re.findall('value="(.*?)"', self.tic)

if self.tic == None:
self.tic = 0
else:
self.tic = int(self.tic[0])
if int(self.tic) != 0:
open('good', 'a').write(str(host.replace('\r', '').replace('\n', ''))+';'+str(self.tic)+'\n')

#signals to queue job is done
self.queue.task_done()

start = time.time()
def main():

#spawn a pool of threads, and pass them queue instance
for i in range(60):
t = ThreadUrl(queue)
t.setDaemon(True)
t.start()

#populate queue with data
for host in hosts:
queue.put(host)

#wait on the queue until everything has been processed
queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB