文章目录
Spark SQL基本操作
将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除id字段;
(4) 筛选出age>30的记录;
(5) 将数据按age分组;
(6) 将数据按name升序排列;
(7) 取出前3行数据;
(8) 查询所有记录的name列,并为其取别名为username;
(9) 查询年龄age的平均值;
(10) 查询年龄age的最小值。
编程实现将RDD转换为DataFrame
源文件内容如下(包含id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。
编程实现利用DataFrame读写MySQL的数据
(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如下表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表5-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

我们在之前创建的sparkdata目录下创建该json文件并将上面信息复制进去并保存命名为employee.json:
cd /usr/local/spark/sparkdata
vim employee.json

然后我们进入到pyspark中,开始做题。
首先我们创建一个DataFrame:
>>> sp=SparkSession.builder.getOrCreate()
>>> df=sp.read.json("file:///usr/local/spark/sparkdata/employee.json")
(1)查询DataFrame的所有数据
>>> df.show()

(2)查询所有数据,并去除重复的数据
>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段
>>> df.drop("id").show()

(4)筛选age>30的记录
df.filter(df.age>30).show()

(5) 将数据按age分组
>>> df.groupBy("age").count().show()

(6) 将数据按name升序排列
>>> df.sort(df.name.asc()).show()

(7) 取出前3行数据
>>> df.take(3)

(8) 查询所有记录的name列,并为其取别名为username
>>> df.select(df.name.alias("username")).show()

(9) 查询年龄age的平均值
>>> df.agg({"age":"mean"}).show()

(10) 查询年龄age的最小值
>>> df.agg({"age":"min"}).show()

首先我们仍然在sparkdata目录下创建我们需要的文件并命令为employee.txt,然后写入信息:
vim employee.txt

然后我们还是在该目录下新建一个py文件命名为rddTodf.py,然后写入如下py程序:
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
sc = SparkContext("local","Simple App")
spark=SparkSession(sc)
peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/sparkdata/employee.txt")
rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
rowRDD.createOrReplaceTempView("employee")
personsDF = spark.sql("select * from employee")
personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

然后我们运行该程序:
python3 rddTodf.py

出现这个结果证明成功。
我们首先启动mysql服务并进入到mysql数据库中:
systemctl start mysqld.service
mysql -u root -p
然后开始接下来的操作。
(1)在MySQL数据库中新建数据库sparktest,再创建表employee,并写入题目中的原始数据
mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4),name char(20),gender char(4),age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,最后打印出age的最大值和age的总和
我们仍然在sparkdata目录下面新建一个py程序并命名为mysqlTest.py:
cd /usr/local/spark/sparkdata
vim mysqlTest.py
然后写入如下py程序:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
if __name__ == "__main__":
sc = SparkContext( 'local', 'test')
spark=SQLContext(sc)
jdbcDF=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user", "root").option("password", "MYsql123!").load()
jdbcDF.filter(jdbcDF.age>20).collect() # 检测是否连接成功
studentRDD = sc.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda line : line.split(" "))
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
rowRDD = studentRDD.map(lambda p : Row(int(p[0]),p[1].strip(), p[2].strip(),int(p[3])))
employeeDF = spark.createDataFrame(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest",'employee','append', prop)
jdbcDF.collect()
jdbcDF.agg({"age": "max"}).show()
jdbcDF.agg({"age": "sum"}).show()

然后直接运行该py程序即可得到结果:
python3 mysqlTest.py


本次实验的话,难度主要在后面两个题目中,在第二题中我遇见了两个错误:
第一个错误我是通过如下解决的:
spark = SparkSession(sc)
解决第一个错误之后,我再次运行的时候就开始报第二个错误了,第二个错误我是这样解决的:
from pyspark.sql import SQLContext
spark.sparkContext.textFile('filepath')
具体可以看我们上面对于的代码就可以明白了。
另外,很明显可以看见第三题第二问后面抛出了异常:
** BEGIN NESTED EXCEPTION **
javax.net.ssl.SSLException
MESSAGE: closing inbound before receiving peer's close_notify
STACKTRACE:
javax.net.ssl.SSLException: closing inbound before receiving peer's close_notify
at sun.security.ssl.Alert.createSSLException(Alert.java:133)
at sun.security.ssl.Alert.createSSLException(Alert.java:117)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:340)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:296)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:287)
at sun.security.ssl.SSLSocketImpl.shutdownInput(SSLSocketImpl.java:737)
at sun.security.ssl.SSLSocketImpl.shutdownInput(SSLSocketImpl.java:716)
at com.mysql.jdbc.MysqlIO.quit(MysqlIO.java:2239)
at com.mysql.jdbc.ConnectionImpl.realClose(ConnectionImpl.java:4267)
at com.mysql.jdbc.ConnectionImpl.close(ConnectionImpl.java:1531)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$close$1(JDBCRDD.scala:259)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$compute$1.apply$mcV$sp(JDBCRDD.scala:308)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这是因为与MySQL数据库的SSL连接失败了,我们只需要将数据源的URL后面添加**?useSSL=false**就可以解决,也就是禁用SSL:


但是它还是抛出了异常,只是异常没有之前那么多了,我上网查阅了一下相关错误,好像这样添加不能完全禁用SSL,具体原因我也不知道,可能跟底层C语言有关,这个我不了解,所以就先这样了。
本次实验到这里就结束了,谢谢你们的阅读!
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
几个月前,我读了一篇关于rubygem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01 客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02 数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co
网络编程套接字网络编程基础知识理解源`IP`地址和目的`IP`地址理解源MAC地址和目的MAC地址认识端口号理解端口号和进程ID理解源端口号和目的端口号认识`TCP`协议认识`UDP`协议网络字节序socket编程接口`sockaddr``UDP`网络程序服务器端代码逻辑:需要用到的接口服务器端代码`udp`客户端代码逻辑`udp`客户端代码`TCP`网络程序服务器代码逻辑多个版本服务器单进程版本多进程版本多线程版本线程池版本服务器端代码客户端代码逻辑客户端代码TCP协议通讯流程TCP协议的客户端/服务器程序流程三次握手(建立连接)数据传输四次挥手(断开连接)TCP和UDP对比网络编程基础知识
我正在尝试在Rails上安装ruby,到目前为止一切都已安装,但是当我尝试使用rakedb:create创建数据库时,我收到一个奇怪的错误:dyld:lazysymbolbindingfailed:Symbolnotfound:_mysql_get_client_infoReferencedfrom:/Library/Ruby/Gems/1.8/gems/mysql2-0.3.11/lib/mysql2/mysql2.bundleExpectedin:flatnamespacedyld:Symbolnotfound:_mysql_get_client_infoReferencedf
文章目录1.开发板选择*用到的资源2.串口通信(个人理解)3.代码分析(注释比较详细)1.主函数2.串口1配置3.串口2配置以及中断函数4.注意问题5.源码链接1.开发板选择我用的是STM32F103RCT6的板子,不过代码大概在F103系列的板子上都可以运行,我试过在野火103的霸道板上也可以,主要看一下串口对应的引脚一不一样就行了,不一样的就更改一下。*用到的资源keil5软件这里用到了两个串口资源,采集数据一个,串口通信一个,板子对应引脚如下:串口1,TX:PA9,RX:PA10串口2,TX:PA2,RX:PA32.串口通信(个人理解)我就从串口采集传感器数据这个过程说一下我自己的理解,