Найти - Пользователи
Полная версия: Многопоточность
Начало » Python для новичков » Многопоточность
1 2
Mozart
Всем привет, прошу великих наставить меня на путь истинный, или объяснить почему мой истинный путь не работает.

Есть исходный скрипт (анализирует сайтмап.xml, заходит на каждую ссылку и дергает оттуда ключевики (использует бьютифул суп)) :

# -*- coding: cp1251 -*-
import urllib, urlparse, re
from BeautifulSoup import BeautifulSoup
from BeautifulSoup import BeautifulStoneSoup

url_start = "http://site.ru"
url_end = "sitemap.xml"
url = urlparse.urljoin(url_start, url_end)

print u"Ждите ..."

link = urllib.urlopen(url)
sup = BeautifulStoneSoup(link)
supik = str(sup)
k = 1

p = re.compile(r"<loc>(.*?)</loc>", re.S | re.I)

def starting(gg):
global k
link1 = urllib.urlopen(gg)
soup = BeautifulSoup(link1, fromEncoding="utf-8")
a = soup(attrs={"name": "Keywords"})[0]["content"]
f1 = open("file1.txt", "a")
f1.write(a.encode("cp1251") + "\n")
f1.close()
link1.close()
print u"Спарсили ключевик № " + str(k)
k += 1

q = raw_input("Parse ? y/n: ")

if q == "y":
f = open("file1.txt", "w")
for i in p.findall(supik):
starting(i)
f.close()
link.close()
else:
print "Okay ..."

raw_input()
Работает достаточно медленно, ну я и подумал я добавить к нему многопоточности, вроде сделал все как по статьям с использованием очередей Queue, скрипт работает но скорость почему то не изменяется, не подскажите в чем моя ошибка? Вот модифицированный код :

# -*- coding: cp1251 -*-
import urllib, urlparse, re, Queue, threading, time
from BeautifulSoup import BeautifulSoup
from BeautifulSoup import BeautifulStoneSoup

queue = Queue.Queue()
lock = threading.RLock()
threads_count = 100

url_start = "http://site.ru"
url_end = "sitemap.xml"
url = urlparse.urljoin(url_start, url_end)

print u"Ждите ..."

link = urllib.urlopen(url)
sup = BeautifulStoneSoup(link)
supik = str(sup)
k = 1

p = re.compile(r"<loc>(.*?)</loc>", re.S | re.I)

def worker():
global queue
while True:
try:
target = queue.get_nowait()
except Queue.Empty:
return
starting(target)

def write(data, s):
global k
lock.acquire()
if s == True:
f1 = open("file1.txt", "a")
f1.write(data + "\n")
f1.close()
elif s != True:
pass
lock.release()
print u"Спарсили ключевик № " + str(k)
k += 1

def starting(gg):
try:
link1 = urllib.urlopen(gg)
soup = BeautifulSoup(link1, fromEncoding="utf-8")
a = soup(attrs={"name": "Keywords"})[0]["content"]
b = a.encode("cp1251")
write(b, True)
link1.close()
except:
write(b, False)

def main():
global link
f = open("file1.txt", "w")
for i in p.findall(supik):
queue.put(i)
for _ in xrange(threads_count):
thread_ = threading.Thread(target=worker)
thread_.start()
while threading.active_count() >1:
time.sleep(1)
f.close()
link.close()

q = raw_input("Parse ? y/n: ")

if q == "y":
main()
else:
print "Okay ..."

raw_input()
Благодарю за ответ.
Lexander
Как вы думаете, что вернет threading.active_count() ?
Mozart
Спасибо за наводку, это вернет число объектов представляющих активные потоки, в результате этот мелкий цикл будет бесконечным и выше 2 потоков не уйдет, убрал этот мини бесконечный цикл, но проблема не решилась, теперь запускаются все 10 потоков и срабатывают сразу все моментально, а потом ждут столько же времени как если бы работал всего 1 поток …
Lexander
Каждому созданному потоку все таки нужно сделать join().
После этого программа может ждать только вашего ввода - последний raw_input.
Кстати, напишите в нем сообщение и поймете зачем нужен join.
Soteric
Если не ошибаюсь, правильнее сделать join() очереди, а потоки после обработки каждого target должны делать ей task_done().

http://docs.python.org/library/queue.html#Queue.Queue.join
Mozart
В общем в join() тоже ничего не меняется, я проверил этот код многопоточности для тестирования сокетов в range(1, 1000) - действительно виден прирост скорости в несколько раз в отличие от парсинга страниц по моему коду. Мне кажется что все дело в занятых переменных - тот же link1 и soup, ведь пока они заняты одним потоком они не будут доступны для другого … Может быть я и ошибаюсь.
Soteric
link1 = urllib.urlopen(gg)
Возможно стоит добавить здесь какой-нибудь разумный таймаут. Дефолтный может быть очень большим и все потоки большую часть времени будут проводить в попытке подключиться к недоступному ресурсу.
Lexander
Mozart
В общем в join() тоже ничего не меняется
Это вряд ли ;)
Mozart
Мне кажется что все дело в занятых переменных - тот же link1 и soup, ведь пока они заняты одним потоком они не будут доступны для другого … Может быть я и ошибаюсь.
Они ведь не глобальные, создаются в отдельной функции внутри каждого потока и имеют локальную область видимости.
Я BeautifulSoup не пользуюсь, поэтому запустить ваш код у себя не могу.
Если сделаете тестовый код без него, посмотрю.
Soteric
Если не ошибаюсь, правильнее сделать join() очереди, а потоки после обработки каждого target должны делать ей task_done().
Сомневаюсь.
В коде сначала формируется очередь и только потом запускаются потоки, ее обрабатывающие.
Программа должна быть завершена после завершения именно потоков, а не очереди.
Soteric
В текущем виде да. Правильнее, на мой взгляд, потоки сделать демонами, а join() на очереди. Тогда когда потоки закончат с очередью, то завершится программа, а вместе с ней завершатся потоки. Мы ведь не можем сделать join() на каждом из потоков.
Lexander
Soteric
Мы ведь не можем сделать join() на каждом из потоков.
Это почему же?
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB