Найти - Пользователи
Полная версия: concurrent.futures смешивает потоки
Начало » Python для экспертов » concurrent.futures смешивает потоки
1
Marat__
Такая вот функция которая записывает результат в файлы.
 def my_func(item_f): :           
    for item_se in all__2:
        for item_th in all__3:
            X_data_f_train = data_f_with2010[[item_f, item_se, item_th]]
       #     model_f = RandomForestClassifier(n_estimators=n_est, n_jobs=-1, max_depth=deep__, random_state=1)
            model_f = GradientBoostingClassifier(n_estimators=n_est, max_depth=deep__, random_state=1, learning_rate=0.01)
            model_f.fit(X_data_f_train, y_data_f_train)
            pred_f = model_f.predict_proba(X_data_f_train)
            fe_imp  = model_f.feature_importances_
       #     print(item_f)
       #     print(item_se)
       #     print(item_th)
            
            global data_f_analiz_2010
            data_f_analiz_2010 = data_f_analiz_2010.assign(profit = 
                data_f_with2010['res_profit'], 
                        feature_f = str(item_f) + '-' + str(item_se) + '-' + str(item_th), 
                        feature1 = str(item_f),   
                        feature2 = str(item_se),
                        feature3 = str(item_th),
                        namber   = data_f_with2010[item_f],
                        namber1  = data_f_with2010[item_se],
                        namber2  = data_f_with2010[item_th],                                   
                        n_est   = n_est,
                        deep__  = deep__,                                   
                        proba1  = pred_f[:, 0], 
                        proba2  = pred_f[:, 1],
                        Data    = data_f_with2010['Date_test'],
                        fe_imp1  = fe_imp[0],
                        fe_imp2  = fe_imp[1],
                        fe_imp3  = fe_imp[2],
                                                          )
            global data_f_analiz_year
            data_f_analiz_year = data_f_analiz_year.append(data_f_analiz_2010)
            data_f_analiz_2010 = pd.DataFrame()
        #рисуем
        #рисуем
       # print(deep__)
       # print(n_est)
        print(item_se)
        data_f_analiz_year['data_year'] = data_f[column]
        data_f_analiz_year['proba2_range'] = \
        np.where(data_f_analiz_year['proba2']>0.55, 0.55,         
        np.where(data_f_analiz_year['proba2']>0.54, 0.54, 
        np.where(data_f_analiz_year['proba2']>0.53, 0.53,
        np.where(data_f_analiz_year['proba2']>0.52, 0.52, 
        np.where(data_f_analiz_year['proba2']>0.51, 0.51, 
        np.where(data_f_analiz_year['proba2']>0.50, 0.50, 0.49))))))        
     #   display(data_f_analiz_year)
        dddd = table()
     #   display(dddd)
        dddd.to_csv('2c_f' + '_' + str(deep__) + str(n_est) + str(item_f) + str(item_se) +'dddd.csv')
        #ОБНУЛИЛ
        data_f_analiz_year = pd.DataFrame()
        dddd_all = pd.DataFrame()

Которую я засовываю сюда
 %%time
pd.options.display.precision = 3
data_f['res_profit'] = data_f['Profit %_1030']
data_f['res_profit_PM'] = np.where(data_f['res_profit']>0,1,0)
column = 'Year'
#n_est = 1
# for анализ фичей 3
data_f.dropna(0, inplace=True)
data_f_with2010 = pd.DataFrame()
data_f_analiz_year = pd.DataFrame()
data_f_analiz_2010 = pd.DataFrame()
data_f_with2010 = data_f[data_f[column] >= 2010]
y_data_f_train = data_f_with2010['res_profit_PM']
start = time.time()
print(start)
for deep__ in [3]:    
    for n_est in [5]:
               my_func(item_f): 
stop = time.time()
print(stop)
print('spend time {}'.format(stop-start))
Решил ускорить вычисленния. ProcessPoolExecutor у меня не пошел (пишет “A process in the process pool was terminated abruptly while the future was running or pending”. Может кто занчет в чем причина?)
А вот ThreadPoolExecutor пошел. Код выглядит так:

 %%time
pd.options.display.precision = 3
data_f['res_profit'] = data_f['Profit %_1030']
data_f['res_profit_PM'] = np.where(data_f['res_profit']>0,1,0)
column = 'Year'
#n_est = 1
# for анализ фичей 3
data_f.dropna(0, inplace=True)
data_f_with2010 = pd.DataFrame()
data_f_analiz_year = pd.DataFrame()
data_f_analiz_2010 = pd.DataFrame()
data_f_with2010 = data_f[data_f[column] >= 2010]
y_data_f_train = data_f_with2010['res_profit_PM']
start = time.time()
print(start)
for deep__ in [3]:    
    for n_est in [5]:
               
        executor = concurrent.futures.ThreadPoolExecutor()
        executor.submit(my_func, 'Min10')
        executor.submit(my_func, 'wdif')
        
stop = time.time()
print(stop)
print('spend time {}'.format(stop-start))

Только вот если глянуть в файлы, то видно что как то потоки перемешиваются и результаты по “МИн10” записываются как в файл 2c_f_35wdifwdifdddd.csv так в файл 2c_f_35Min10wdifdddd.csv. Хотя должны только в первый.
Это вообще законно? Так и предусмотренна при работе с потоками? Иля я некоректно что то написал? Спасибо
rumactep
может быть, функция не потокобезопасна? может быть затираются данные в global data_f_analiz_2010 одновременно выполняющимися потоками? (это предположение)
AD0DE412
 with ThreadPoolExecutor(max_workers=2) as executor:
	executor.submit(my_func, 'Min10')
	executor.submit(my_func, 'wdif')
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB