草庐IT

YMatrix + PLPython替代Spark实现车联网算法

db_shidb 2024-07-23 原文

PySpark算法开发实战

一、PySpark介绍

Spark是一种快速、通用、可扩展的大数据分析引擎,PySpark是Spark为Python开发者提供的API。在有非常多可视化和机器学习算法需求的应用场景,使用PySpark比Spark-Scala可以更好地和python中丰富的库配合使用。

使用Python开发Spark需要使用到pyspark,pyspark是Spark为Python开发者提供的API。pyspark使用Py4J库,使得Python可以使用JVM对象。

二、运行环境搭建

操作系统 CentOS Linux release 7.8.2003 (Core)
Java 1.8.0_151
Python 3.6.13
Spark 2.4.0
Miniconda 4.5.4
pyspark 3.2.1   
pyarrow 6.0.1

Miniconda

  1. 安装Miniconda
    conda和virtualenv是Python的包管理与环境管理工具。conda的安装程序中包含conda软件包管理器和Python,不需要再单独安装Python,使用起来较为方便。Miniconda为conda精简版,大小约为50M。
    由于我们使用的Spark版本与Python版本为历史版本,需要用4.5.4版本的Miniconda(对应Python 3.6)进行安装。当前官网下载页的miniconda支持到最低3.7版本Python,需要在https://repo.anaconda.com/miniconda/上下载。根据机型选择Miniconda2-4.5.4-Linux-x86_64.sh下载。
    下载完成之后运行脚本Miniconda2-4.5.4-Linux-x86_64.sh进行安装。完成之后可以使用conda -V检查安装结果。
    conda -V conda 4.5.4
  2. 设置用于Spark的虚拟环境
    首先建立一个pyspark_env的环境
    conda create --name pyspark_env python=3.6
    新建完成之后可以从过conda activate进入虚拟环境
    conda activate pyspark_env
    进入环境之后命令行会有环境名的标识用于区分

创建好并进入pyspark_env的虚拟环境之后,我们需要安装两个Spark相关的库,pyspark和pyarrow。可以使用conda install安装或者也可使用pip,这里以使用pip安装为例:
pip3 install pyarrow pyspark
安装完毕之后可以使用conda list查看安装好的库

此,环境搭建中的conda部分已经完成。详细的操作可以参考Spark的最新文档,pyspark conda部署的部分是多版本通用的:Installation - PySpark 3.2.1 documentation

Spark

  1. 下载Spark
    我们下载Spark已经编译好的压缩包,所有的历史版本可以在这个链接中找到:https://archive.apache.org/dist/spark/,
    本文下载spark-2.4.0-bin-hadoop2.7.tgz
    下载完成之后解压文件。完成之后可以进入目录运行bin/spark-shell进行测试

  2. Standalone模式启动集群
    Spark的集群模式总共分为四种

  • Standalone
  • Apache Mesos
  • Hadoop YARN
  • Kubernetes
    2、3、4都比较好理解,Standalone模式是Spark自身实现的资源调度框架。
    复制spark根目录下的conf/spark-env.sh.template -> conf/spark-env.sh
    在其中添加
    SPARK_MASTER_HOST = [hostname] # master的主机名
    SPARK_MASTER_PORT= 7077
    在master节点上运行
    ./sbin/start-master.sh
    启动之后可以登录webui查看,地址为IP:8080

同样,在slave节点设置好环境变量之后运行
./sbin/start-slave.sh

三、Spark分布式运行算法

下面的代码是Spark 运行Pandas UDF的例子。

def scalar_pandas_udf_example(spark):
    import pandas as pd
    from pyspark.sql.functions import col, pandas_udf
    from pyspark.sql.types import LongType
    def multiply_func(a, b):
        return a * b
    multiply = pandas_udf(multiply_func, returnType=LongType())
    x = pd.Series([1, 2, 3])
    print(multiply_func(x, x))
    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
    df.select(multiply(col("x"), col("x"))).show()
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master('local')\
        .appName("UDFTest") \
        .getOrCreate()
    print("Running pandas_udf scalar example")
    scalar_pandas_udf_example(spark)
    spark.stop()

首先生成一个SparkSession对象,参数master->'local’指的是local模式运行,如果是集群的话这里local换成spark:\masterip:7077,appName->'UDFTest’定义了任务名称

spark = SparkSession \
        .builder \
        .master('local')\
        .appName("UDFTest") \
        .getOrCreate()

定义一个简单的函数

def multiply_func(a, b):
        return a * b

生成UDF对象

multiply = pandas_udf(multiply_func, returnType=LongType())

生成一个pandas的数据

x = pd.Series([1, 2, 3])

创建一个Spark dataframe对象并让spark执行UDF

df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
df.select(multiply(col("x"), col("x"))).show()

该代码运行的结果为:

四、Spark数据处理的缺点

  1. 一般生产环境下的数据想要使用Spark做计算,数据需要从存储的数据库->落盘文件/消息队列->Spark集群数据链路需要建设和维护
  2. PySpark + Pandas UDF处理数据,尽管利用了Apache Arrow,核心还是需要JVM与Python之间数据传输,开销大,不适用于性能敏感的场景。
    YMatrix+ PLPython处理方式
    上文描述了Spark在车联网信号分析的实际使用案例。Spark的优点很明显,作为分布式的内存计算引擎,社区活跃、支持多语言开发、易于融合其他如Hadoop等框架。但使用上的缺点在上文中也有描述:需要搭建并维护一整条新的数据链路;并且除去Scala,使用Python等其他语言研发不适合在性能要求高的场景下使用。
    那么回到我们实现车联网信号分析这个案例上,实际上最终的需求是从一部分数据中取出想要的数据、并且经过一定的要求与计算,筛选出最终的数据结果——实际上就是简单的一条SQL+代码实现算法处理数据。上述例子中,“一条SQL”变成了整条数据链路,从数据库取出数据处理完丢进消息队列,“代码实现算法”的代码加载到了Spark分布式的消费数据和运行。如果能够把“代码实现算法”这部分与下沉到数据库这层,那么我们不仅减少了维护一整套数据链路的开销,还能利用数据库的算力提供性能,加速数据流转。想要利用数据库帮我们进行数据分析与计算,我们需要MPP架构的数据库。

一、YMatrix与PLPython介绍

YMatrix
YMatrix是超融合数据库,将交易型数据库(OLTP)、分析型数据库(OLAP)和时序数据库能力融为一体的超融合型分布式数据库产品,具备严格分布式事务一致性、水平在线扩容、安全可靠、成熟稳定、兼容PostgreSQL/Greenplum协议和生态等重要特性。为万物互联的智能时代提供坚实、简洁的智能数据核心基础设施,为物联网应用、工业互联网、智能运维、智慧城市、实时数仓、智能家居、车联网等场景提供一站式高效解决方案,YMatrix为公司自主研发的国产数据库,公司拥有该产品全部知识产权。产品的架构如下。

YMatrix不但对经典的Greenplum数据仓库场景进行了大幅增强,而且可以极佳的支持大规模时序数据处理、支持时空数据、结构化数据和半结构化数据,一套数据库解决各种数据类型,避免为了处理不同类型数据引入不同类型的产品。实现提高开发运维效率、提升系统性能、降低整体成本的目标。

PLPython
PL/Python过程语言允许用Python编写 PostgreSQL函数。Python有非常多成熟的库能够提供给我们做数据分析,如numpy、pandas等。
使用PLPython方便数据分析的算法实现,可以充分利用YMatrix分布式储存和算力。

二、PLPython调用外部Python代码

上文中描述了如何使用Python开发Spark应用,让我们的算法能够使用Spark的分布式计算能力,整体的数据流程是从csv的数据文件中读取数据->Spark Arrow内存的数据类型中->Spark分布式计算输出结果。
相同的算法也可以通过PLPython,将数据转存到YMatrix查询计算实现
将数据导入YMatrix
使用Mxgate将csv中的数据导入数据库,在此之前需要新建表

        vin text,
        daq_time DATE,
        status INT,
        c_stat INT,
        mode INT,
        speed INT,
        mileage INT,
        t_volt INT,
        t_current INT,
        soc INT,
        dcdc_stat INT,
        isulate_r INT,
        lng BIGINT,
        lat BIGINT,
        max_volt_bat_id INT,
        max_volt_cell_id INT,
        max_cell_volt INT,
        min_volt_bat_id INT,
        min_cell_volt_id INT,
        min_cell_volt INT,
        max_temp_sys_id INT,
        max_temp_probe_id INT,
        max_temp INT,
        min_temp_sys_id INT,
        min_temp_probe_id INT,
        min_temp INT,
        max_alarm_lvl INT,
        genral_alarm INT,
        cell_volt_list text,
        cell_temp_list TEXT,
        pdate date) 
distributed BY (vin)

表格建好了之后导入数据

tail -n +1 data.csv | mxgate --source stdin --db-database test --db-master-host localhost.localdomain --db-master-port 5432 --db-user mxadmin --time-format raw --target suanfa_data  --delimiter ','

编写PLPython函数调用算法
首先我们需要把算法代码上传到服务器上,在本例中路径为/home/mxadmin/plpython/
我们需要查询suanfa_data表中的所有数据,并将结果转化成pandas的Dataframe格式,传递给算法函数去做处理

sql = "SELECT * FROM suanfa_data;"
df = psql.frame_query(sql, cnxn)


create function suanfa_detector() returns void as $$

import sys
sys.path.append('/home/mxadmin/plpython/')
from src.analyzer import analyzer

import pyodbc
import pandas.io.sql as psql
sql_result = plpy.execute("SELECT * FROM suanfa_data;")
df = pd.DataFrame.from_records(sql_result)

result = analyzer(df)
plpy.notice(result)

$$ language plpython3u;

下面是算法函数,输入是我们suanfa_detector()中sql的查询结果转换成的dataframe对象,经过数据处理,最后输出结果的dataframe

Def  analyzer(data: pandas.Dataframe)-> pandas.Dataframe:
    data_wash(data)
    sign_data_veh_state(data)
    detect_two_alarm_tuples = detect_two_analyze(data)
    return detect_two_alarm_tuples

有关YMatrix + PLPython替代Spark实现车联网算法的更多相关文章

  1. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

  2. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  3. ruby-on-rails - 更好的替代方法 try( :output). try( :data). try( :name)? - 2

    “输出”是一个序列化的OpenStruct。定义标题try(:output).try(:data).try(:title)结束什么会更好?:) 最佳答案 或者只是这样:deftitleoutput.data.titlerescuenilend 关于ruby-on-rails-更好的替代方法try(:output).try(:data).try(:name)?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.c

  4. 区块链之加解密算法&数字证书 - 2

    目录一.加解密算法数字签名对称加密DES(DataEncryptionStandard)3DES(TripleDES)AES(AdvancedEncryptionStandard)RSA加密法DSA(DigitalSignatureAlgorithm)ECC(EllipticCurvesCryptography)非对称加密签名与加密过程非对称加密的应用对称加密与非对称加密的结合二.数字证书图解一.加解密算法加密简单而言就是通过一种算法将明文信息转换成密文信息,信息的的接收方能够通过密钥对密文信息进行解密获得明文信息的过程。根据加解密的密钥是否相同,算法可以分为对称加密、非对称加密、对称加密和非

  5. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  6. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  7. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  8. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  9. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

  10. ruby - "public/protected/private"方法是如何实现的,我该如何模拟它? - 2

    在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定

随机推荐