久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

怎么用Mars Remote API執行Python函數

172次閱讀
沒有評論

共計 9341 個字符,預計需要花費 24 分鐘才能閱讀完成。

這篇文章主要講解了“怎么用 Mars Remote API 執行 Python 函數”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“怎么用 Mars Remote API 執行 Python 函數”吧!

Mars 是一個并行和分布式 Python 框架,能輕松把單機大家耳熟能詳的的 numpy、pandas、scikit-learn 等庫,以及 Python 函數利用多核或者多機加速。這其中,并行和分布式 Python 函數主要利用 Mars Remote API。

啟動 Mars 分布式環境可以參考:

命令行方式在集群中部署。

Kubernetes 中部署。

MaxCompute 開箱即用的環境,購買了 MaxCompute 服務的可以直接使用。

如何使用 Mars Remote API

使用 Mars Remote API 非常簡單,只需要對原有的代碼做少許改動,就可以分布式執行。

采用蒙特卡洛方法計算 π 為例。代碼如下,我們編寫了兩個函數,calc_chunk 用來計算每個分片內落在圓內的點的個數,calc_pi 用來把多個分片 calc_chunk 計算的結果匯總最后得出 π 值。

from typing import List
import numpy as np
def calc_chunk(n: int, i: int):
 #  計算 n 個隨機點(x 和 y 軸落在 - 1 到 1 之間)到原點距離小于 1 的點的個數
 rs = np.random.RandomState(i)
 a = rs.uniform(-1, 1, size=(n, 2))
 d = np.linalg.norm(a, axis=1)
 return (d   1).sum()
def calc_pi(fs: List[int], N: int):
 #  將若干次  calc_chunk  計算的結果匯總,計算  pi  的值
 return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
fs = [calc_chunk(n, i)
 for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)

%%time 下可以看到結果:

3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s

在單機需要 12.3 s。

要讓這個計算使用 Mars Remote API 并行起來,我們不需要對函數做任何改動,需要變動的僅僅是最后部分。

import mars.remote as mr
#  函數調用改成  mars.remote.spawn
fs = [mr.spawn(calc_chunk, args=(n, i))
 for i in range(N // n)]
#  把  spawn  的列表傳入作為參數,再  spawn  新的函數
pi = mr.spawn(calc_pi, args=(fs, N))
#  通過  execute()  觸發執行,fetch()  獲取結果
print(pi.execute().fetch())

%%time 下看到結果:

3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s

結果一模一樣,但是卻有數倍的性能提升。

可以看到,對已有的 Python 代碼,Mars remote API 幾乎不需要做多少改動,就能有效并行和分布式來加速執行過程。

一個例子

為了讓讀者理解 Mars Remote API 的作用,我們從另一個例子開始?,F在我們有一個數據集,我們希望對它們做一個分類任務。要做分類,我們有很多算法和庫可以選擇,這里我們用 RandomForest、LogisticRegression,以及 XGBoost。

困難的地方是,除了有多個模型選擇,這些模型也會包含多個超參,那哪個超參效果最好呢?對于調參不那么有經驗的同學,跑過了才知道。所以,我們希望能生成一堆可選的超參,然后把他們都跑一遍,看看效果。

準備數據

這個例子里我們使用 otto 數據集。

首先,我們準備數據。讀取數據后,我們按 2:1 的比例把數據分成訓練集和測試集。

import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
def gen_data():
 df = pd.read_csv(otto/train.csv)
 
 X = df.drop([target ,  id], axis=1)
 y = df[target]
 
 label_encoder = LabelEncoder()
 label_encoder.fit(y)
 y = label_encoder.transform(y)
 
 return train_test_split(X, y, test_size=0.33, random_state=123)
X_train, X_test, y_train, y_test = gen_data()

模型

接著,我們使用 scikit-learn 的 RandomForest 和 LogisticRegression 來處理分類。

RandomForest:

from sklearn.ensemble import RandomForestClassifier
def random_forest(X_train: pd.DataFrame, 
 y_train: pd.Series, 
 verbose: bool = False,
 **kw):
 model = RandomForestClassifier(verbose=verbose, **kw)
 model.fit(X_train, y_train)
 return model

接著,我們生成供 RandomForest 使用的超參,我們用 yield 的方式來迭代返回。

def gen_random_forest_parameters():
 for n_estimators in [50, 100, 600]:
 for max_depth in [None, 3, 15]:
 for criterion in [gini ,  entropy]:
 yield {
  n_estimators : n_estimators,
  max_depth : max_depth,
  criterion : criterion
 }

LogisticRegression 也是這個過程。我們先定義模型。

from sklearn.linear_model import LogisticRegression
def logistic_regression(X_train: pd.DataFrame,
 y_train: pd.Series,
 verbose: bool = False,
 **kw):
 model = LogisticRegression(verbose=verbose, **kw)
 model.fit(X_train, y_train)
 return model

接著生成供 LogisticRegression 使用的超參。

def gen_lr_parameters():
 for penalty in [l2 ,  none]:
 for tol in [0.1, 0.01, 1e-4]:
 yield {
  penalty : penalty,
  tol : tol
 }

XGBoost 也是一樣,我們用 XGBClassifier 來執行分類任務。

from xgboost import XGBClassifier
def xgb(X_train: pd.DataFrame,
 y_train: pd.Series,
 verbose: bool = False,
 **kw):
 model = XGBClassifier(verbosity=int(verbose), **kw)
 model.fit(X_train, y_train)
 return model

生成一系列超參。

def gen_xgb_parameters():
 for n_estimators in [100, 600]:
 for criterion in [gini ,  entropy]:
 for learning_rate in [0.001, 0.1, 0.5]:
 yield {
  n_estimators : n_estimators,
  criterion : criterion,
  learning_rate : learning_rate
 }

驗證

接著我們編寫驗證邏輯,這里我們使用 log_loss 來作為評價函數。

from sklearn.metrics import log_loss
def metric_model(model, 
 X_test: pd.DataFrame,
 y_test: pd.Series) -  float:
 if isinstance(model, bytes):
 model = pickle.loads(model)
 y_pred = model.predict_proba(X_test)
 return log_loss(y_test, y_pred)

 #  把訓練和驗證封裝到一起  model = train_func(X_train, y_train, verbose=verbose, **train_params)  metric = metric_model(model, X_test, y_test)  return model, metric

找出最好的模型

做好準備工作后,我們就開始來跑模型了。針對每個模型,我們把每次生成的超參們送進去訓練,除了這些超參,我們還把 n_jobs 設成 -1,這樣能更好利用單機的多核。

results = []
# -------------
# Random Forest
# -------------
for params in gen_random_forest_parameters():
 print(f calculating on {params} )
 # fixed random_state
 params[random_state] = 123
 # use all CPU cores
 params[n_jobs] = -1
 model, metric = train_and_metric(random_forest, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({ model : model, 
  metric : metric})
 
# -------------------
# Logistic Regression
# -------------------
for params in gen_lr_parameters():
 print(f calculating on {params} )
 # fixed random_state
 params[random_state] = 123
 # use all CPU cores
 params[n_jobs] = -1
 model, metric = train_and_metric(logistic_regression, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({ model : model, 
  metric : metric})
 
# -------
# XGBoost
# -------
 
for params in gen_xgb_parameters():
 print(f calculating on {params} )
 # fixed random_state
 params[random_state] = 123
 # use all CPU cores
 params[n_jobs] = -1
 model, metric = train_and_metric(xgb, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({ model : model, 
  metric : metric})

運行一下,需要相當長時間,我們省略掉一部分輸出內容。

calculating on {n_estimators : 50,  max_depth : None,  criterion :  gini}
metric: 0.6964123781828575
calculating on {n_estimators : 50,  max_depth : None,  criterion :  entropy}
metric: 0.6912312790832288
#  省略其他模型的輸出結果
CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s

從 CPU 時間和 Wall 時間,能看出來這些訓練還是充分利用了多核的性能。但整個過程還是花費了 31 分鐘。

使用 Remote API 分布式加速

現在我們嘗試使用 Remote API 通過分布式方式加速整個過程。

集群方面,我們使用最開始說的第三種方式,直接在 MaxCompute 上拉起一個集群。大家可以選擇其他方式,效果是一樣的。

n_cores = 8
mem = 2 * n_cores # 16G
# o  是  MaxCompute  入口,這里創建  10  個  worker  的集群,每個  worker 8 核 16G
cluster = o.create_mars_cluster(10, n_cores, mem, image= extended)

為了方便在分布式讀取數據,我們對數據處理稍作改動,把數據上傳到 MaxCompute 資源。對于其他環境,用戶可以考慮 HDFS、Aliyun OSS 或者 Amazon S3 等存儲。

if not o.exist_resource(otto_train.csv):
 with open(otto/train.csv) as f:
 #  上傳資源
 o.create_resource(otto_train.csv ,  file , fileobj=f)
 
def gen_data():
 #  改成從資源讀取
 df = pd.read_csv(o.open_resource( otto_train.csv))
 
 X = df.drop([target ,  id], axis=1)
 y = df[target]
 
 label_encoder = LabelEncoder()
 label_encoder.fit(y)
 y = label_encoder.transform(y)
 
 return train_test_split(X, y, test_size=0.33, random_state=123)

稍作改動之后,我們使用 mars.remote.spawn 方法來讓 gen_data 調度到集群上運行。

import mars.remote as mr
# n_output  說明是  4  輸出
# execute()  執行后,數據會讀取到  Mars  集群內部
data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()
# remote_  開頭的都是  Mars  對象,這時候數據在集群內,這些對象只是引用
remote_X_train, remote_X_test, remote_y_train, remote_y_test = data

目前 Mars 能正確序列化 numpy ndarray、pandas DataFrame 等,還不能序列化模型,所以,我們要對 train_and_metric 稍作改動,把模型 pickle 了之后再返回。

def distributed_train_and_metric(train_func,
 train_params: dict,
 X_train: pd.DataFrame, 
 y_train: pd.Series, 
 X_test: pd.DataFrame, 
 y_test: pd.Series,
 verbose: bool = False
 ):
 model, metric = train_and_metric(train_func, train_params,
 X_train, y_train, 
 X_test, y_test, verbose=verbose)
 return pickle.dumps(model), metric

后續 Mars 支持了序列化模型后可以直接 spawn 原本的函數。

接著我們就對前面的執行過程稍作改動,把函數調用全部都用 mars.remote.spawn 來改寫。

import numpy as np
tasks = []
models = []
metrics = []
# -------------
# Random Forest
# -------------
for params in gen_random_forest_parameters():
 # fixed random_state
 params[random_state] = 123
 task = mr.spawn(distributed_train_and_metric,
 args=(random_forest, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2
 )
 tasks.extend(task)
 #  把模型和評價分別存儲
 models.append(task[0])
 metrics.append(task[1])
 
 
# -------------------
# Logistic Regression
# -------------------
for params in gen_lr_parameters():
 # fixed random_state
 params[random_state] = 123
 task = mr.spawn(distributed_train_and_metric,
 args=(logistic_regression, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2
 )
 tasks.extend(task)
 #  把模型和評價分別存儲
 models.append(task[0])
 metrics.append(task[1])
# -------
# XGBoost
# -------
 
for params in gen_xgb_parameters():
 # fixed random_state
 params[random_state] = 123
 #  再指定并發為核的個數
 params[n_jobs] = n_cores
 task = mr.spawn(distributed_train_and_metric,
 args=(xgb, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2
 )
 tasks.extend(task)
 #  把模型和評價分別存儲
 models.append(task[0])
 metrics.append(task[1])

#  把順序打亂,目的是能分散到  worker  上平均一點 shuffled_tasks = np.random.permutation(tasks) _ = mr.ExecutableTuple(shuffled_tasks).execute()

可以看到代碼幾乎一致。

運行查看結果:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s

時間一下子從 31 分鐘多來到了 2 分鐘,提升 15x+。但代碼修改的代價可以忽略不計。

細心的讀者可能注意到了,分布式運行的代碼中,我們把模型的 verbose 給打開了,在分布式環境下,因為這些函數遠程執行,打印的內容只會輸出到 worker 的標準輸出流,我們在客戶端不會看到打印的結果,但 Mars 提供了一個非常有用的接口來讓我們查看每個模型運行時的輸出。

以第 0 個模型為例,我們可以在 Mars 對象上直接調用 fetch_log 方法。

print(models[0].fetch_log())

輸出我們簡略一部分。

building tree 1 of 50
building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
#  中間省略
building tree 49 of 50
building tree 50 of 50

要看哪個模型都可以通過這種方式。試想下,如果沒有 fetch_log API,你確想看中間過程的輸出有多麻煩。首先這個函數在哪個 worker 上執行,不得而知;然后,即便知道是哪個 worker,因為每個 worker 上可能有多個函數執行,這些輸出就可能混雜在一起,甚至被龐大日志淹沒了。fetch_log 接口讓用戶不需要關心在哪個 worker 上執行,也不用擔心日志混合在一起。

感謝各位的閱讀,以上就是“怎么用 Mars Remote API 執行 Python 函數”的內容了,經過本文的學習后,相信大家對怎么用 Mars Remote API 執行 Python 函數這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計9341字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 太谷县| 镇安县| 宁德市| 莒南县| 区。| 普定县| 富宁县| 伊宁县| 施甸县| 泌阳县| 太谷县| 天等县| 新民市| 高青县| 长阳| 天柱县| 湖北省| 康保县| 湄潭县| 江华| 山东省| 资中县| 西乌| 安泽县| 无极县| 明水县| 绥棱县| 沽源县| 河西区| 莎车县| 宽甸| 壤塘县| 孟津县| 邳州市| 襄汾县| 巴塘县| 蛟河市| 师宗县| 长丰县| 保山市| 开鲁县|