Форум сайта python.su
Привет
Хочу через Pool вызывать метод объекта. В методе внутренней переменной пристаивается некое значение. Если делать последовательно - все работает. В потоках значение не сохраняется.
Небольшой пример:
#!/usr/bin/python
from multiprocessing import Pool
#------------------------------------------------------------
# Some hack to make methods be pickable
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
#------------------------------------------------------------
class MyClass(object):
''' Simple simple class '''
def __init__(self):
''' got just single variable '''
self.x = 0
def setX(self, value=0):
''' and just a single method. Returns instance variable '''
self.x = value
return self.x
if __name__ == "__main__":
mylist = []
# first - create several objects of MyClass
for i in range(10):
mylist.append(MyClass())
# set internal variable via instance method
mylist[i].setX(i ** 2)
# check - expecting squares from zero to nine
for c in mylist:
print c.x, '\t',
print ''
print '*'*80
# Now we do this in parralel
pool = Pool(5)
results = [pool.apply_async(MyClass.setX, (mylist[i], i ** 3)) for i in range(len(mylist))]
# results via get methos are ok
for r in results:
print r.get(), ' - ',
print ''
print '*'*80
# but class instanses did't changed :(
for c in mylist:
print c.x, '\t',
'''
RESULTS:
0 1 4 9 16 25 36 49 64 81
********************************************************************************
0 - 1 - 8 - 27 - 64 - 125 - 216 - 343 - 512 - 729 -
********************************************************************************
0 1 4 9 16 25 36 49 64 81
'''
Офлайн
На первый взляд проблема в ваших хаках с pickle.
Офлайн
На второй взгляд проблема в том, что вы не понимаете как работает multiprocessing. А работает он через pickle/unpickle, то есть в процесс передается результат pickle, а не исходный объект. Поэтому исходный объект и остается нетронутым. Хотите получить измененный объект - возвращайте его, например. Тогда он тоже запиклится на стороне вызываемого процесса, а потом распиклится на принимающей стороне и вы сможете, как вариант затолкать его в ваш mylist.
Офлайн
EdFYI: Can’t pickle <type ‘instancemethod’>
На первый взляд проблема в ваших хаках с pickle.
EdА вот об этом не знал - спасибо.
На второй взгляд проблема в том, что вы не понимаете как работает multiprocessing. А работает он через pickle/unpickle, то есть в процесс передается результат pickle, а не исходный объект.
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
Отредактировано (Апрель 25, 2010 23:37:10)
Офлайн
grundicЯ уже это сам нашел, когда разбирался с вашим кодом :)
Can’t pickle <type ‘instancemethod’>
Как же тогда вызывать методы объекта в потоках?Вызывать-то так, как вы делаете. Только не ожидать, что ваш исходный объект изменится. Это же побочный эффект вызова, а не результат, поэтому он и не вяжется с multiprocessing.
...
def setX(self, index, value=0):
''' and just a single method. Returns instance variable '''
self.x = value
return index, self
...
for r in [pool.apply_async(mylist[i].setX, (i, i ** 3,)) for i in range(len(mylist))]:
i, myobj = r.get()
mylist[i] = myobj
print myobj.x, ' - ',
...
Офлайн
Нашел небольшую библиотеку ThreadPool - с помощью нее удалось сделать то, что планировал.
#!/usr/bin/python
from ThreadPool import ThreadPool, ThreadPoolTask
import threading
from time import sleep
class MyClass(object):
''' Simple simple class '''
def __init__(self):
''' got just single variable '''
self.x = 0
def setX(self, value=0):
''' and just a single method. Returns instance variable '''
self.x = value
sleep(1)
return self.x
class MyClassRunner(ThreadPoolTask):
def __init__(self, func, args):
self.func = func
self.args = args
def _action(self):
# execute any function with any params
self.func(*self.args)
if __name__ == "__main__":
mylist = []
# first - create several objects of MyClass
for i in range(10):
mylist.append(MyClass())
# set internal variable via instance method
mylist[i].setX(i ** 2)
# check - expecting squares from zero to nine
for c in mylist:
print c.x, '\t',
print ''
print '*'*80
tp = ThreadPool(5, verbose=False)
task_list = []
for i in range(len(mylist)):
task_list.append(MyClassRunner(MyClass.setX, (mylist[i], i ** 3)))
tp.addTask(task_list[i], 1)
tp.start()
while not tp.queueEmpty():
#print 'Some threads are still alive: {0}'.format(tp.nbTasksRunning())
#sleep(0.5)
pass
tp.terminate()
for c in mylist:
print c.x, '\t',
Офлайн
grundicЭто можно сделать и на обычных тредах. Только больше одного core/процессора не заюзаете из-за GIL. Я думал вы именно поэтому начали с multiprocessing.
Нашел небольшую библиотеку ThreadPool - с помощью нее удалось сделать то, что планировал.
Единственное, что беспокоит, так это одновременный доступ к одному и тому же обекту из нескольких потоков.К какому это?
Не очень понятно, почему multiprocessing так коряво работает с классами.Именно потому, что юзает отдельные процессы.
Офлайн