草庐IT

分布式机器学习:逻辑回归的并行化实现(PySpark)

Orion's Blog 2023-03-28 原文

算法的完整实现代码我已经上传到了GitHub仓库:Distributed-ML-PySpark(包括其它分布式机器学习算法),感兴趣的童鞋可以前往查看。

1 梯度计算式导出

我们在博客《统计学习:逻辑回归与交叉熵损失(Pytorch实现)》中提到,设\(w\)为权值(最后一维为偏置),样本总数为\(N\)\(\{(x_i, y_i)\}_{i=1}^N\)为训练样本集。样本维度为\(D\)\(x_i\in \mathbb{R}^{D+1}\)(最后一维扩充),\(y_i\in\{0, 1\}\)。则逻辑回归的损失函数为:

\[\mathcal{l}(w) = \sum_{i=1}^{N}\left[y_{i} \log \pi_{w}\left(x_{i}\right)+\left(1-y_{i}\right) \log \left(1-\pi_w\left(x_{i}\right)\right)\right] \]

这里

\[\begin{aligned} \pi_w(x) = p(y=1 \mid x; w) =\frac{1}{1+\exp \left(-w^{T} x\right)} \end{aligned} \]

写成这个形式就已经可以用诸如Pytorch这类工具来进行自动求导然后采用梯度下降法求解了。不过若需要用表达式直接计算出梯度,我们还需要将损失函数继续化简为:

\[\mathcal{l}(w) = -\sum_{i=1}^N(y_i w^T x_i - \log(1 + \exp(w^T x_i))) \]

可将梯度表示如下:

\[\nabla_w{\mathcal{l}(w)} = -\sum_{i=1}^N(y_i - \frac{1}{\exp(-w^Tx)+1})x_i \]

2 基于Spark的并行化实现

逻辑回归的目标函数常采用梯度下降法求解,该算法的并行化可以采用如下的Map-Reduce架构(图片来自王树森老师的YouTube课程并行计算与机器学习(1/3)[2]):

先将第\(t\)轮迭代的权重广播到各worker,各worker计算一个局部梯度(map过程),然后再将每个节点的梯度聚合(reduce过程),最终对参数进行更新。

在Spark中每个task对应一个分区,决定了计算的并行度(分区的概念详间我们上一篇博客Spark: 单词计数(Word Count)的MapReduce实现(Java/Python) )。我们假设共有3个分区,则在Spark的实现过程如下:

  • map阶段: 各task运行map()函数对每个样本\((x_i, y_i)\)计算梯度\(g_i\), 然后对每个样本对应的梯度运行进行本地聚合,以减少后面的数据传输量。如第1个task执行reduce()操作得到\(\widetilde{g}_1 = \sum_{i=1}^3 g_i\) 如下图所示:

  • reduce阶段:使用reduce()将所有task的计算结果收集到Driver端进行聚合,然后进行参数更新。

在上图中,训练数据用points:PrallelCollectionRDD来表示,参数向量用\(w\)来表示,注意参数向量不是RDD,只是一个单独的参与运算的变量。

此外需要注意一点,虽然每个task在本地进行了局部聚合,但如果task过多且每个task本地聚合后的结果(单个gradient)过大那么统一传递到Driver端仍然会造成单点的网络平均等问题。为了解决这个问题,Spark设计了性能更好的treeAggregate()操作,使用树形聚合方法来减少网络和计算延迟,我们在第5部分会详细叙述。

3 PySpark实现代码

PySpark的完整实现代码如下:

'''
Descripttion: 
Version: 1.0
Author: ZhangHongYu
Date: 2022-05-26 21:02:38
LastEditors: ZhangHongYu 
LastEditTime: 2022-07-01 16:22:53
'''
from sklearn.datasets import load_breast_cancer
import numpy as np
from pyspark.sql import SparkSession
from operator import add
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import sys
import os

os.environ['PYSPARK_PYTHON'] = sys.executable

n_threads = 3  # Number of local threads
n_iterations = 1500  # Number of iterations
eta = 0.1  # iteration step_size

def logistic_f(x, w):
    return 1 / (np.exp(-x.dot(w)) + 1)


def gradient(point: np.ndarray, w: np.ndarray) -> np.ndarray:
    """ Compute linear regression gradient for a matrix of data points
    """
    y = point[-1]    # point label
    x = point[:-1]   # point coordinate
    # For each point (x, y), compute gradient function, then sum these up
    return - (y - logistic_f(x, w)) * x

def draw_acc_plot(accs, n_iterations):
    def ewma_smooth(accs, alpha=0.9):
        s_accs = np.zeros(n_iterations)
        for idx, acc in enumerate(accs):
            if idx == 0:
                s_accs[idx] = acc
            else:
                s_accs[idx] = alpha * s_accs[idx-1] + (1 - alpha) * acc
        return s_accs

    s_accs = ewma_smooth(accs, alpha=0.9)
    plt.plot(np.arange(1, n_iterations + 1), accs, color="C0", alpha=0.3)
    plt.plot(np.arange(1, n_iterations + 1), s_accs, color="C0")
    plt.title(label="Accuracy on test dataset")
    plt.xlabel("Round")
    plt.ylabel("Accuracy")
    plt.savefig("logistic_regression_acc_plot.png")


if __name__ == "__main__":

    X, y = load_breast_cancer(return_X_y=True)

    D = X.shape[1]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=0)
    n_train, n_test = X_train.shape[0], X_test.shape[0]

    spark = SparkSession\
        .builder\
        .appName("Logistic Regression")\
        .master("local[%d]" % n_threads)\
        .getOrCreate()

    matrix = np.concatenate(
        [X_train, np.ones((n_train, 1)), y_train.reshape(-1, 1)], axis=1)

    points = spark.sparkContext.parallelize(matrix).cache()

    # Initialize w to a random value
    w = 2 * np.random.ranf(size=D + 1) - 1
    print("Initial w: " + str(w))
    
    accs = []
    for t in range(n_iterations):
        print("On iteration %d" % (t + 1))
        
        g = points.map(lambda point: gradient(point, w)).reduce(add)

        w -= eta * g
        
        y_pred = logistic_f(np.concatenate(
            [X_test, np.ones((n_test, 1))], axis=1), w)
        pred_label = np.where(y_pred < 0.5, 0, 1)
        acc = accuracy_score(y_test, pred_label)
        accs.append(acc)
        print("iterations: %d, accuracy: %f" % (t, acc))

    print("Final w: %s " % w)
    print("Final acc: %f" % acc)

    spark.stop()
    
    draw_acc_plot(accs, n_iterations)

注意.master("local[%d]" % n_threads)中的n_threads是我们在本地单机多线程调试模式下所设置的线程数,也就是Spark中的默认分区数,我们此处将n_threads设置为3,则Spark就会默认划分出3个分区。我们在代码中采用breast cancer数据集进行训练和测试,该数据集是个二分类数据集。模型初始权重采用随机初始化。

最后,我们来看一下算法的输出结果。

初始权重如下:

Initial w: [ 0.20733249 -0.68270323 -0.23539134  0.46125717 -0.27736064 -0.36072597
  0.92549048 -0.18432978  0.77991313  0.54054734  0.48559498 -0.23031733
  0.67125099  0.57301018  0.69243332 -0.4791771  -0.76039149  0.15924619
  0.01321836 -0.19976038  0.576716    0.50379885  0.58670905 -0.38590575
 -0.48719581 -0.91967718  0.73359703  0.28669715  0.56688998  0.97444464
 -0.44361797]

最终的模型权重与在测试集上的准确率结果如下:

Final w: [ 1.15974825e+04  1.29973800e+04  6.52553107e+04  2.10241061e+04
  8.86514067e+01 -1.10587723e+02 -2.97300359e+02 -1.27131718e+02
  1.59369309e+02  7.84967515e+01 -4.03071071e+01  8.13799814e+02
 -1.30662140e+03 -4.04474691e+04  5.34490109e+00 -2.28709226e+01
 -4.24236287e+01 -8.04493849e+00  1.12580376e+01  7.93202730e-01
  1.25640151e+04  1.51951403e+04  6.46383775e+04 -3.18968898e+04
  9.95884228e+01 -4.01750499e+02 -6.93005010e+02 -1.78725566e+02
  1.93133380e+02  6.01062122e+01  1.52932953e+03] 
Final acc: 0.947368

4 关于冗余存储的反思

注意根据我们以上的代码实现中的

map(lambda point: gradient(point, w)).reduce(add)

这一行中,我们求梯度的函数gradient会根据w将每一个训练样本点map到其对应梯度值。w的拷贝会被发送给每个计算节点的每个CPU。比如,假设我们有一个4个CPU的计算节点。

默认当map过程发生时,所有被map过程需要的数据会被发往mapper,而此处每个CPU都有一个mapper,故如果该计算节点有4个CPU,我们实际上会发送4个w的拷贝到该节点,如下图所示:

之所以会这样,是因为此处假定w会被修改,必须为每个CPU单独存储w拷贝以解决并发写的问题。然而,当我们计算每一步的梯度时,w并未被修改,故此处不存在并发写的问题。这导致我们浪费了存储空间,因为本可将w存储在各个节点的共享内存中的。

为了解决此问题,我们可以将w进行广播,这样它只会被发到每个计算节点一次(而不是每个CPU一次)。为了实现这个想法,我们将w定义为一个广播变量来使用,如下面代码所示:

# Initialize w to a random value
w = 2 * np.random.ranf(size=D + 1) - 1
print("Initial w: " + str(w))

for t in range(n_iterations):
    print("On iteration %d" % (t + 1))
    w_br = spark.sparkContext.broadcast(w)
    
    g = points.map(lambda point: gradient(point, w_br.value)).reduce(add)

    w -= alpha * g

当我们初始化w时,我们首先将其声明为一个广播变量。在每一轮梯度下降的迭代中,我们需要引用w的值。最后,我们在w被更新后重新广播w。这样,w在每个机器上被高效地存储(每个机器一份,而不是多份)。

5 关于聚合效率的反思

正如我们前面所说,我们可以用性能更好的treeAggregate()操作来替代reduce()操作,该操作使用树形聚合方法来减少网络和计算延迟。
treeAggregate()函数原型如下:

RDD.treeAggregate(zeroValue, seqOp, combOp, depth=2)

其中zeroValue为聚合结果的初始值,seqOp函数用于定义单分区(partition)做聚合操作的方法,它第一个参数为聚合结果,第二个参数为分区中的数据变量。combOp定义对分区之间做聚合的方法,它第一个参数为第二个参数都为聚合结果。
depth为聚合树的深度。

此处我们的聚合操作比较简单,聚合结果初始值设置为0.0seqOpcombOp都设置为add算子即可:

g = points.map(lambda point: gradient(point, w_br.value))\
    .treeAggregate(0.0, add, add)

6 算法收敛性和复杂度分析

6.1 收敛性和计算复杂度

算法的ACC曲线如下图所示:

可见我们的算法总体呈现收敛。

这里的损失函数\(l( \space \cdot \space )\)是光滑凸函数(非强凸函数,它只在局部呈现强凸性),如我们在博客《数值优化:经典一阶确定性算法及其收敛性分析》中所分析,假设目标函数\(f: \mathbb{R}^d \rightarrow \mathbb{R}\)是凸函数,且\(\beta\)-光滑,当步长\(\eta = \frac{1}{\beta}\)时,梯度下降法具有\(\mathcal{O}(\frac{1}{t})\)次线性收敛速率

\[f(w^t) - f(w^*) \leqslant \frac{2\beta \lVert w^0 - w^*\rVert^2}{t} \]

这意味着在梯度下降求解逻辑回归问题的迭代次数复杂度为\(\mathcal{O}(\frac{1}{\varepsilon})\),也即\(\mathcal{O}(\frac{1}{\varepsilon})\)轮后会取得\(\varepsilon\)的精度。

尽管梯度的计算可以被分摊到个计算节点上,然而梯度下降的迭代是串行的。每轮迭代中,Spark会执行同步屏障(synchronization barrier)来确保在各worker开始下一轮迭代前\(w\)已被更新完毕。如果存在掉队者(stragglers),其它worker就会空闲(idle)等待,直到下一轮迭代。故相比梯度的计算,其迭代计算的“深度”(depth)是其计算瓶颈。

6.2 通信复杂度

map过程显然是并行的,并不需要通信。broadcast过程需要一对多通信,并且reduce过程需要多对一通信(都按照树形结构)。故对于每轮迭代,总通信时间按

\[2\text{log}_2(p)(L + \frac{m}{B}) \]

增长。
这里\(p\)为除去driver节点的运算节点个数,\(L\)是节点之间的通信延迟。\(B\)是节点之间的通信带宽。\(M\)是每轮通信中节点间传输的信息大小。故消息能够够以\(\mathcal{O}(\log p)\)的通信轮数在所有节点间传递。

参考

有关分布式机器学习:逻辑回归的并行化实现(PySpark)的更多相关文章

  1. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  2. ruby - 在 Windows 机器上使用 Ruby 进行开发是否会适得其反? - 2

    这似乎非常适得其反,因为太多的gem会在window上破裂。我一直在处理很多mysql和ruby​​-mysqlgem问题(gem本身发生段错误,一个名为UnixSocket的类显然在Windows机器上不能正常工作,等等)。我只是在浪费时间吗?我应该转向不同的脚本语言吗? 最佳答案 我在Windows上使用Ruby的经验很少,但是当我开始使用Ruby时,我是在Windows上,我的总体印象是它不是Windows原生系统。因此,在主要使用Windows多年之后,开始使用Ruby促使我切换回原来的系统Unix,这次是Linux。Rub

  3. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  4. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  5. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  6. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  7. CAN协议的学习与理解 - 2

    最近在学习CAN,记录一下,也供大家参考交流。推荐几个我觉得很好的CAN学习,本文也是在看了他们的好文之后做的笔记首先是瑞萨的CAN入门,真的通透;秀!靠这篇我竟然2天理解了CAN协议!实战STM32F4CAN!原文链接:https://blog.csdn.net/XiaoXiaoPengBo/article/details/116206252CAN详解(小白教程)原文链接:https://blog.csdn.net/xwwwj/article/details/105372234一篇易懂的CAN通讯协议指南1一篇易懂的CAN通讯协议指南1-知乎(zhihu.com)视频推荐CAN总线个人知识总

  8. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  9. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  10. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

随机推荐