使用MLflow扩展Spark

你好,哈布罗夫派。正如我们已经写的那样,本月OTUS将同时启动两个机器学习课程,即基础课程高级课程在这方面,我们继续分享有用的材料。








本文的目的是分享我们对MLflow的初次经验



我们将从其跟踪服务器开始对MLflow进行审查,并继续进行研究的所有迭代。然后,我们将分享使用UDF将Spark连接到MLflow的经验。



语境



Alpha Health,我们使用机器学习和人工智能使人们能够照顾自己的健康和福祉。这就是为什么机器学习模型是我们开发的数据产品的核心的原因,这就是为什么我们吸引人们关注MLflow,它是涵盖机器学习生命周期各个方面的开源平台。



流量



MLflow的主要目标是在机器学习之上提供额外的一层,使数据科学家可以使用几乎任何机器学习库(h2okerasmleappytorchsklearntensorflow),将其提升到一个新的水平。



MLflow提供了三个组件:



  • 跟踪-记录和查询实验:代码,数据,配置和结果。遵循创建模型的过程非常重要。
  • 项目-可在任何平台上运行的打包格式(例如SageMaker
  • 模型是用于将模型提交给各种部署工具的一种通用格式。


MLflow(在撰写本文时为alpha版本)是一个开放源代码平台,可让您管理机器学习生命周期,包括实验,重用和部署。


配置MLflow



要使用MLflow,首先需要设置整个Python环境,为此,我们将使用PyEnv(要在Mac上安装Python,请在此处查看)。因此,我们可以创建一个虚拟环境,在其中安装运行所需的所有库。



```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```


安装所需的库。



```
pip install mlflow==0.7.0 \
            Cython==0.29 \ 
            numpy==1.14.5 \
            pandas==0.23.4 \
            pyarrow==0.11.0
```


注意:我们正在使用PyArrow运行诸如UDF之类的模型。由于最新版本存在冲突,因此需要修复PyArrow和Numpy版本。

启动跟踪界面



MLflow Tracking允许我们使用Python和REST API记录并向实验提出请求。此外,您可以定义存储模型工件的位置(本地主机,Amazon S3Azure Blob存储Google Cloud StorageSFTP服务器)。由于我们在Alpha Health使用AWS,因此S3将用作工件的存储。



# Running a Tracking Server
mlflow server \
    --file-store /tmp/mlflow/fileStore \
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ \
    --host localhost
    --port 5000


MLflow建议使用持久性文件存储。服务器将在其中存储运行和实验元数据的文件存储。启动服务器时,请确保它指向持久性文件存储。在这里,我们仅将其用于实验/tmp



请记住,如果我们要使用mlflow服务器运行旧实验,则这些实验必须存在于文件存储中。但是,即使没有这个,我们也可以在UDF中使用它们,因为我们只需要模型的路径。

注意:请记住,跟踪UI和模型客户端必须有权访问工件的位置。也就是说,无论跟踪UI位于EC2实例中的事实如何,当在本地启动MLflow时,机器都必须直接访问S3才能编写工件模型。




跟踪UI将工件存储在S3存储桶中



运行模型



Tracking Server运行之后,就可以开始训练模型。



作为一个例子,我们将使用葡萄酒修改从MLflow例子Sklearn



MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py \
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv


就像我们已经说过的那样,MLflow允许您记录模型的参数,度量和工件,以便您可以在迭代时跟踪它们的发展方式。此功能非常有用,因为通过这种方式,我们可以通过与跟踪服务器联系或通过git hash提交日志了解哪个代码执行了所需的迭代来重现最佳模型。



with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")




葡萄酒迭代



模型的服务器部分



使用“ mlflow server”命令启动的MLflow跟踪服务器具有REST API,用于跟踪启动并将数据写入本地文件系统。您可以使用环境变量“ MLFLOW_TRACKING_URI”指定跟踪服务器地址,并且MLflow跟踪API会自动在此地址与跟踪服务器联系,以创建/获取启动信息,日志指标等。



来源:文档//运行跟踪服务器
要为模型提供服务器,我们需要一个正在运行的跟踪服务器(请参阅启动界面)和模型的运行ID。





运行编号



# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve \
  --port 5005  \
  --run_id 0f8691808e914d1087cf097a08730f17 \
  --model-path model


要使用MLflow服务功能为模型提供服务,我们需要访问Tracking UI,只需指定即可获取有关模型的信息--run_id



模型与Tracking Server通信后,我们可以获取新的模型端点。



# Query Tracking Server Endpoint
curl -X POST \
  http://127.0.0.1:5005/invocations \
  -H 'Content-Type: application/json' \
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}


从Spark运行模型



尽管Tracking Server足够强大,可以实时提供模型,对其进行训练并使用服务功能(来源:mlflow // docs // models#local),但使用Spark(批量或流式传输)是一种更强大的解决方案分配帐户。



想象一下,您刚刚进行了离线培训,然后将输出模型应用于所有数据。这是Spark和MLflow发挥其最佳性能的地方。



安装PySpark + Jupyter + Spark



资料来源:PySpark-Jupyter入门


为了展示我们如何将MLflow模型应用于Spark数据帧,我们需要设置Jupyter笔记本与PySpark一起使用。



首先安装Apache Spark的最新稳定版本



cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀


在虚拟环境中安装PySpark和Jupyter:



pip install pyspark jupyter


设置环境变量:



export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"


确定后notebook-dir,我们可以将笔记本存储在所需的文件夹中。



从PySpark启动Jupyter



由于我们能够将Jupiter设置为PySpark驱动程序,因此我们现在可以在PySpark上下文中运行Jupyter笔记本。



(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745






如上所述,MLflow提供了在S3中记录模型工件的功能。一旦掌握了所选模型,我们就有机会使用模块将其作为UDF导入mlflow.pyfunc



import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)




PySpark-输出葡萄酒质量预测



到目前为止,我们已经讨论了如何通过在整个葡萄酒数据集上运行葡萄酒质量预测来将PySpark与MLflow结合使用。但是,如果您需要使用Scala Spark中的Python MLflow模块怎么办?



我们也通过在Scala和Python之间拆分Spark上下文进行了测试。也就是说,我们在Python中注册了MLflow UDF,并从Scala中使用了它(是的,也许不是最好的解决方案,但是我们拥有的是)。



Scala Spark + MLflow



对于此示例,我们将Toree内核添加到现有的木星中。



安装Spark + Toree + Jupyter



pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```


从随附的笔记本中可以看到,SDF和PySpark之间共享UDF。我们希望这一部分对喜欢Scala并希望将机器学习模型部署到生产环境的人有所帮助。



import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [\s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[\s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+


下一步



即使在撰写本文时MLflow位于Alpha中,它看起来也很有前途。运行多个机器学习框架并从单个端点使用它们的能力将推荐器系统提升到了一个新水平。



此外,MLflow通过在数据工程师和数据科学家之间创建一个公共层,使他们之间更加紧密。



在对MLflow进行了研究之后,我们相信我们会继续并将其用于我们的Spark管道和推荐系统。



最好将文件存储与数据库而不是文件系统同步。这样,我们需要获得可以使用相同文件存储的多个端点。例如,使用多个Presto实例雅典娜使用相同的Glue Metastore。



总而言之,我要感谢MLFlow社区使我们的数据工作更加有趣。



如果您正在玩MLflow,请随时给我们写信并告诉我们如何使用它,如果在生产中使用它,甚至更多。






了解有关课程的更多信息:

机器学习。基础机器学习课程

高级课程






阅读更多:






All Articles