久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何使用Mars Remote API執行Python函數

219次閱讀
沒有評論

共計 9414 個字符,預計需要花費 24 分鐘才能閱讀完成。

本篇內容介紹了“如何使用 Mars Remote API 執行 Python 函數”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

Mars 是一個并行和分布式 Python 框架,能輕松把單機大家耳熟能詳的的 numpy、pandas、scikit-learn 等庫,以及 Python 函數利用多核或者多機加速。這其中,并行和分布式 Python 函數主要利用 Mars Remote API。

啟動 Mars 分布式環境可以參考:

命令行方式在集群中部署。

Kubernetes 中部署。

MaxCompute 開箱即用的環境,購買了 MaxCompute 服務的可以直接使用。

如何使用 Mars Remote API

使用 Mars Remote API 非常簡單,只需要對原有的代碼做少許改動,就可以分布式執行。

拿用蒙特卡洛方法計算 π 為例。代碼如下,我們編寫了兩個函數,calc_chunk  用來計算每個分片內落在圓內的點的個數,calc_pi  用來把多個分片  calc_chunk  計算的結果匯總最后得出 π 值。

from typing import Listimport numpy as npdef calc_chunk(n: int, i: int):#  計算 n 個隨機點(x 和 y 軸落在 - 1 到 1 之間)到原點距離小于 1 的點的個數 rs = np.random.RandomState(i)
 a = rs.uniform(-1, 1, size=(n, 2))
 d = np.linalg.norm(a, axis=1)return (d   1).sum()def calc_pi(fs: List[int], N: int):#  將若干次  calc_chunk  計算的結果匯總,計算  pi  的值 return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
fs = [calc_chunk(n, i) for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)

%%time  下可以看到結果:

3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s

在單機需要 12.3 s。

要讓這個計算使用 Mars Remote API 并行起來,我們不需要對函數做任何改動,需要變動的僅僅是最后部分。

import mars.remote as mr#  函數調用改成  mars.remote.spawnfs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)]#  把  spawn  的列表傳入作為參數,再  spawn  新的函數 pi = mr.spawn(calc_pi, args=(fs, N))#  通過  execute()  觸發執行,fetch()  獲取結果 print(pi.execute().fetch())

%%time  下看到結果:

3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s

結果一模一樣,但是卻有數倍的性能提升。

可以看到,對已有的 Python 代碼,Mars remote API 幾乎不需要做多少改動,就能有效并行和分布式來加速執行過程。

一個例子

為了讓讀者理解 Mars Remote API 的作用,我們從另一個例子開始。現在我們有一個數據集,我們希望對它們做一個分類任務。要做分類,我們有很多算法和庫可以選擇,這里我們用 RandomForest、LogisticRegression,以及 XGBoost。

困難的地方是,除了有多個模型選擇,這些模型也會包含多個超參,那哪個超參效果最好呢?對于調參不那么有經驗的同學,跑過了才知道。所以,我們希望能生成一堆可選的超參,然后把他們都跑一遍,看看效果。

準備數據

這個例子里我們使用  otto 數據集。

首先,我們準備數據。讀取數據后,我們按 2:1 的比例把數據分成訓練集和測試集。

import pandas as pdfrom sklearn.preprocessing import LabelEncoderfrom sklearn.model_selection import train_test_splitdef gen_data():df = pd.read_csv( otto/train.csv)
 
 X = df.drop([target ,  id], axis=1)
 y = df[target]
 
 label_encoder = LabelEncoder()
 label_encoder.fit(y)
 y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)
X_train, X_test, y_train, y_test = gen_data()

模型

接著,我們使用 scikit-learn 的 RandomForest 和 LogisticRegression 來處理分類。

RandomForest:

from sklearn.ensemble import RandomForestClassifierdef random_forest(X_train: pd.DataFrame, 
 y_train: pd.Series, 
 verbose: bool = False,
 **kw):model = RandomForestClassifier(verbose=verbose, **kw)
 model.fit(X_train, y_train)return model

接著,我們生成供 RandomForest 使用的超參,我們用 yield 的方式來迭代返回。

def gen_random_forest_parameters():for n_estimators in [50, 100, 600]:for max_depth in [None, 3, 15]:for criterion in [gini ,  entropy]:yield { n_estimators : n_estimators, max_depth : max_depth, criterion : criterion
 }

LogisticRegression 也是這個過程。我們先定義模型。

from sklearn.linear_model import LogisticRegressiondef logistic_regression(X_train: pd.DataFrame,
 y_train: pd.Series,
 verbose: bool = False,
 **kw):model = LogisticRegression(verbose=verbose, **kw)
 model.fit(X_train, y_train)return model

接著生成供 LogisticRegression 使用的超參。

def gen_lr_parameters():for penalty in [ l2 ,  none]:for tol in [0.1, 0.01, 1e-4]:yield { penalty : penalty, tol : tol
 }

XGBoost 也是一樣,我們用  XGBClassifier  來執行分類任務。

from xgboost import XGBClassifierdef xgb(X_train: pd.DataFrame,
 y_train: pd.Series,
 verbose: bool = False,
 **kw):model = XGBClassifier(verbosity=int(verbose), **kw)
 model.fit(X_train, y_train)return model

生成一系列超參。

def gen_xgb_parameters():for n_estimators in [100, 600]:for criterion in [gini ,  entropy]:for learning_rate in [0.001, 0.1, 0.5]:yield { n_estimators : n_estimators, criterion : criterion, learning_rate : learning_rate
 }

驗證

接著我們編寫驗證邏輯,這里我們使用  log_loss  來作為評價函數。

from sklearn.metrics import log_lossdef metric_model(model, 
 X_test: pd.DataFrame,
 y_test: pd.Series) -  float:if isinstance(model, bytes):
 model = pickle.loads(model)
 y_pred = model.predict_proba(X_test)return log_loss(y_test, y_pred)def train_and_metric(train_func,
 train_params: dict,
 X_train: pd.DataFrame, 
 y_train: pd.Series, 
 X_test: pd.DataFrame, 
 y_test: pd.Series,
 verbose: bool = False
 ):#  把訓練和驗證封裝到一起 model = train_func(X_train, y_train, verbose=verbose, **train_params)
 metric = metric_model(model, X_test, y_test)return model, metric

找出最好的模型

做好準備工作后,我們就開始來跑模型了。針對每個模型,我們把每次生成的超參們送進去訓練,除了這些超參,我們還把  n_jobs  設成 -1,這樣能更好利用單機的多核。

results = []# -------------# Random Forest# -------------for params in gen_random_forest_parameters():
 print(f calculating on {params} )# fixed random_stateparams[random_state] = 123# use all CPU coresparams[n_jobs] = -1model, metric = train_and_metric(random_forest, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({model : model,  metric : metric}) 
# -------------------# Logistic Regression# -------------------for params in gen_lr_parameters():
 print(f calculating on {params} )# fixed random_stateparams[random_state] = 123# use all CPU coresparams[n_jobs] = -1model, metric = train_and_metric(logistic_regression, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({model : model,  metric : metric}) 
# -------# XGBoost# -------for params in gen_xgb_parameters():
 print(f calculating on {params} )# fixed random_stateparams[random_state] = 123# use all CPU coresparams[n_jobs] = -1model, metric = train_and_metric(xgb, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({model : model,  metric : metric})

運行一下,需要相當長時間,我們省略掉一部分輸出內容。

calculating on {n_estimators : 50,  max_depth : None,  criterion :  gini}
metric: 0.6964123781828575calculating on {n_estimators : 50,  max_depth : None,  criterion :  entropy}
metric: 0.6912312790832288#  省略其他模型的輸出結果 CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s

從 CPU 時間和 Wall 時間,能看出來這些訓練還是充分利用了多核的性能。但整個過程還是花費了 31 分鐘。

使用 Remote API 分布式加速

現在我們嘗試使用 Remote API 通過分布式方式加速整個過程。

集群方面,我們使用最開始說的第三種方式,直接在 MaxCompute 上拉起一個集群。大家可以選擇其他方式,效果是一樣的。

n_cores = 8mem = 2 * n_cores # 16G# o  是  MaxCompute  入口,這里創建  10  個  worker  的集群,每個  worker 8 核 16Gcluster = o.create_mars_cluster(10, n_cores, mem, image= extended)

為了方便在分布式讀取數據,我們對數據處理稍作改動,把數據上傳到 MaxCompute 資源。對于其他環境,用戶可以考慮 HDFS、Aliyun OSS 或者 Amazon S3 等存儲。

if not o.exist_resource(otto_train.csv):with open(otto/train.csv) as f:#  上傳資源 o.create_resource(otto_train.csv ,  file , fileobj=f) 
def gen_data():#  改成從資源讀取 df = pd.read_csv(o.open_resource( otto_train.csv))
 
 X = df.drop([target ,  id], axis=1)
 y = df[target]
 
 label_encoder = LabelEncoder()
 label_encoder.fit(y)
 y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)

稍作改動之后,我們使用  mars.remote.spawn  方法來讓  gen_data  調度到集群上運行。

import mars.remote as mr# n_output  說明是  4  輸出 # execute()  執行后,數據會讀取到  Mars  集群內部 data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()# remote_  開頭的都是  Mars  對象,這時候數據在集群內,這些對象只是引用 remote_X_train, remote_X_test, remote_y_train, remote_y_test = data

目前 Mars 能正確序列化 numpy ndarray、pandas DataFrame 等,還不能序列化模型,所以,我們要對  train_and_metric  稍作改動,把模型 pickle 了之后再返回。

def distributed_train_and_metric(train_func,
 train_params: dict,
 X_train: pd.DataFrame, 
 y_train: pd.Series, 
 X_test: pd.DataFrame, 
 y_test: pd.Series,
 verbose: bool = False
 ):model, metric = train_and_metric(train_func, train_params,
 X_train, y_train, 
 X_test, y_test, verbose=verbose)return pickle.dumps(model), metric

后續 Mars 支持了序列化模型后可以直接 spawn 原本的函數。

接著我們就對前面的執行過程稍作改動,把函數調用全部都用  mars.remote.spawn  來改寫。

import numpy as np
tasks = []
models = []
metrics = []# -------------# Random Forest# -------------for params in gen_random_forest_parameters():# fixed random_stateparams[random_state] = 123task = mr.spawn(distributed_train_and_metric,
 args=(random_forest, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2)
 tasks.extend(task)#  把模型和評價分別存儲 models.append(task[0])
 metrics.append(task[1]) 
 
# -------------------# Logistic Regression# -------------------for params in gen_lr_parameters():# fixed random_stateparams[ random_state] = 123task = mr.spawn(distributed_train_and_metric,
 args=(logistic_regression, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2)
 tasks.extend(task)#  把模型和評價分別存儲 models.append(task[0])
 metrics.append(task[1])# -------# XGBoost# -------for params in gen_xgb_parameters():# fixed random_stateparams[ random_state] = 123#  再指定并發為核的個數 params[n_jobs] = n_cores
 task = mr.spawn(distributed_train_and_metric,
 args=(xgb, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2)
 tasks.extend(task)#  把模型和評價分別存儲 models.append(task[0])
 metrics.append(task[1])#  把順序打亂,目的是能分散到  worker  上平均一點 shuffled_tasks = np.random.permutation(tasks)
_ = mr.ExecutableTuple(shuffled_tasks).execute()

可以看到代碼幾乎一致。

運行查看結果:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s

時間一下子從 31 分鐘多來到了 2 分鐘,提升 15x+。但代碼修改的代價可以忽略不計。

細心的讀者可能注意到了,分布式運行的代碼中,我們把模型的 verbose 給打開了,在分布式環境下,因為這些函數遠程執行,打印的內容只會輸出到 worker 的標準輸出流,我們在客戶端不會看到打印的結果,但 Mars 提供了一個非常有用的接口來讓我們查看每個模型運行時的輸出。

以第 0 個模型為例,我們可以在 Mars 對象上直接調用  fetch_log  方法。

print(models[0].fetch_log())

輸出我們簡略一部分。

building tree 1 of 50building tree 2 of 50building tree 3 of 50building tree 4 of 50building tree 5 of 50building tree 6 of 50#  中間省略 building tree 49 of 50building tree 50 of 50

要看哪個模型都可以通過這種方式。試想下,如果沒有  fetch_log API,你確想看中間過程的輸出有多麻煩。首先這個函數在哪個 worker 上執行,不得而知;然后,即便知道是哪個 worker,因為每個 worker 上可能有多個函數執行,這些輸出就可能混雜在一起,甚至被龐大日志淹沒了。fetch_log  接口讓用戶不需要關心在哪個 worker 上執行,也不用擔心日志混合在一起。

想要了解  fetch_log  接口,可以查看   文檔。

“如何使用 Mars Remote API 執行 Python 函數”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計9414字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 普安县| 台中县| 阜阳市| 两当县| 和顺县| 邹城市| 阳江市| 禄丰县| 安仁县| 上栗县| 柞水县| 广饶县| 黔东| 岳阳县| 江城| 滦平县| 讷河市| 波密县| 陈巴尔虎旗| 麦盖提县| 黎城县| 宜章县| 海丰县| 东乡族自治县| 闽侯县| 建始县| 临清市| 汉川市| 威海市| 洪湖市| 赣州市| 顺昌县| 阿拉善右旗| 弥勒县| 明光市| 林西县| 铜梁县| 方正县| 水富县| 衢州市| 新闻|