Найти - Пользователи
Полная версия: Class instance + multiprocessing
Начало » Python для новичков » Class instance + multiprocessing
1
grundic
Привет

Хочу через Pool вызывать метод объекта. В методе внутренней переменной пристаивается некое значение. Если делать последовательно - все работает. В потоках значение не сохраняется.

Небольшой пример:

#!/usr/bin/python

from multiprocessing import Pool

#------------------------------------------------------------
# Some hack to make methods be pickable
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)

import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
#------------------------------------------------------------


class MyClass(object):
''' Simple simple class '''

def __init__(self):
''' got just single variable '''
self.x = 0

def setX(self, value=0):
''' and just a single method. Returns instance variable '''
self.x = value
return self.x

if __name__ == "__main__":

mylist = []

# first - create several objects of MyClass
for i in range(10):
mylist.append(MyClass())
# set internal variable via instance method
mylist[i].setX(i ** 2)

# check - expecting squares from zero to nine
for c in mylist:
print c.x, '\t',

print ''
print '*'*80

# Now we do this in parralel
pool = Pool(5)
results = [pool.apply_async(MyClass.setX, (mylist[i], i ** 3)) for i in range(len(mylist))]

# results via get methos are ok
for r in results:
print r.get(), ' - ',

print ''
print '*'*80

# but class instanses did't changed :(
for c in mylist:
print c.x, '\t',


'''
RESULTS:

0 1 4 9 16 25 36 49 64 81
********************************************************************************
0 - 1 - 8 - 27 - 64 - 125 - 216 - 343 - 512 - 729 -
********************************************************************************
0 1 4 9 16 25 36 49 64 81
'''
Как написать “правильно”? Нужно использовать какую-то магию потоков?
Спасибо.
Ed
На первый взляд проблема в ваших хаках с pickle.
Ed
На второй взгляд проблема в том, что вы не понимаете как работает multiprocessing. А работает он через pickle/unpickle, то есть в процесс передается результат pickle, а не исходный объект. Поэтому исходный объект и остается нетронутым. Хотите получить измененный объект - возвращайте его, например. Тогда он тоже запиклится на стороне вызываемого процесса, а потом распиклится на принимающей стороне и вы сможете, как вариант затолкать его в ваш mylist.
grundic
Ed
На первый взляд проблема в ваших хаках с pickle.
FYI: Can’t pickle <type ‘instancemethod’>


Ed
На второй взгляд проблема в том, что вы не понимаете как работает multiprocessing. А работает он через pickle/unpickle, то есть в процесс передается результат pickle, а не исходный объект.
А вот об этом не знал - спасибо.
Как же тогда вызывать методы объекта в потоках?
Есть идея использовать pool.apply_async и потом получать результат через get, надеясь, что он будет сортирован:
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
16.6. multiprocessing

Есть еще callback - но как понять, к какому объекту относится полученный результат?
Ed
grundic
Can’t pickle <type ‘instancemethod’>
Я уже это сам нашел, когда разбирался с вашим кодом :)

Как же тогда вызывать методы объекта в потоках?
Вызывать-то так, как вы делаете. Только не ожидать, что ваш исходный объект изменится. Это же побочный эффект вызова, а не результат, поэтому он и не вяжется с multiprocessing.

Если же хотите получить назад измененный объект, то верните его, я же написал. Для привязки его к вашему списку как вариант можно передать порядковый номер элемента, и потом вернуть его же. Выглядит не очень красиво, но так всегда бывает, когда инструмент используется не по назначению.
Вот вам пример того, что я имею в виду:
...
def setX(self, index, value=0):
''' and just a single method. Returns instance variable '''
self.x = value
return index, self
...

for r in [pool.apply_async(mylist[i].setX, (i, i ** 3,)) for i in range(len(mylist))]:
i, myobj = r.get()
mylist[i] = myobj
print myobj.x, ' - ',
...
А вообще лучше подумать и сделать нормально, а не выкручивать руки multiprocessingу.
grundic
Нашел небольшую библиотеку ThreadPool - с помощью нее удалось сделать то, что планировал.

#!/usr/bin/python

from ThreadPool import ThreadPool, ThreadPoolTask
import threading
from time import sleep

class MyClass(object):
''' Simple simple class '''

def __init__(self):
''' got just single variable '''
self.x = 0

def setX(self, value=0):
''' and just a single method. Returns instance variable '''
self.x = value
sleep(1)
return self.x

class MyClassRunner(ThreadPoolTask):
def __init__(self, func, args):
self.func = func
self.args = args

def _action(self):
# execute any function with any params
self.func(*self.args)

if __name__ == "__main__":

mylist = []

# first - create several objects of MyClass
for i in range(10):
mylist.append(MyClass())
# set internal variable via instance method
mylist[i].setX(i ** 2)

# check - expecting squares from zero to nine
for c in mylist:
print c.x, '\t',

print ''
print '*'*80

tp = ThreadPool(5, verbose=False)
task_list = []
for i in range(len(mylist)):
task_list.append(MyClassRunner(MyClass.setX, (mylist[i], i ** 3)))
tp.addTask(task_list[i], 1)

tp.start()

while not tp.queueEmpty():
#print 'Some threads are still alive: {0}'.format(tp.nbTasksRunning())
#sleep(0.5)
pass
tp.terminate()


for c in mylist:
print c.x, '\t',
Единственное, что беспокоит, так это одновременный доступ к одному и тому же обекту из нескольких потоков. Вроде как не очень это безопасно и надо использовать локи. К тому же у меня тривиальный пример, а в действительности будет массив обектов в массиве других объектов.
Не очень понятно, почему multiprocessing так коряво работает с классами.
Ed
grundic
Нашел небольшую библиотеку ThreadPool - с помощью нее удалось сделать то, что планировал.
Это можно сделать и на обычных тредах. Только больше одного core/процессора не заюзаете из-за GIL. Я думал вы именно поэтому начали с multiprocessing.

Единственное, что беспокоит, так это одновременный доступ к одному и тому же обекту из нескольких потоков.
К какому это?

Не очень понятно, почему multiprocessing так коряво работает с классами.
Именно потому, что юзает отдельные процессы.
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB