根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。
数据说明如下: a.txt
数据切分方式:,
数据所在位置:/user/test/input/a.txt
15733218050,15778423030,1542457633,1542457678,450000,530000
| 15733218050 | 15778423030 | 1542457633 | 1542457678 | 450000 | 530000 |
|---|---|---|---|---|---|
| 呼叫者手机号 | 接受者手机号 | 开始时间戳(s) | 接受时间戳(s) | 呼叫者地址省份编码 | 接受者地址省份编码 |
Mysql数据库:
用户名:root 密码:123123
数据库名:mydb
用户表:userphone
| 列名 | 类型 | 非空 | 是否自增 | 介绍 |
|---|---|---|---|---|
| id | int(11) | √ | √ | 用户ID |
| phone | varchar(255) | 手机号 | ||
| trueName | varchar(255) | 真实姓名 |
地址省份表:allregion
| 列名 | 类型 | 非空 | 是否自增 | 介绍 |
|---|---|---|---|---|
| id | int(11) | √ | √ | 用户ID |
| CodeNum | varchar(255) | 编号 | ||
| Address | varchar(255) | 地址 |
清洗规则:
处理数据中的时间戳(秒级)将其转化为2017-06-21 07:01:58,年-月-日 时:分:秒 这种格式;
处理数据中的省份编码,结合mysql的表数据对应,将其转换成省份名称;
处理用户手机号,与mysql的表数据对应,关联用户的真实姓名;
处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);
设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /user/test/input/a.txt (HDFS); 清洗后的数据存放于:/user/test/output (HDFS)。
数据清洗后如下:
邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市
| 邓二 | 张倩 | 13666666666 | 15151889601 | 2018-03-29 10:58:12 | 2018-03-29 10:58:42 | 30 | 黑龙江省 | 上海市 |
|---|---|---|---|---|---|---|---|---|
| 用户名A | 用户名B | 用户A的手机号 | 用户B的手机号 | 开始时间 | 结束时间 |
step/com/LogMR.java
package com;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogMR {
/********** begin **********/
static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {
Map<String, String> userMap = new HashMap<>();
Map<String, String> addressMap = new HashMap<>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
PhoneLog pl = new PhoneLog();
Text text = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Connection connection = DBHelper.getConnection();
try {
Statement statement = connection.createStatement();
String sql = "select * from userphone";
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String phone = resultSet.getString(2);
String trueName = resultSet.getString(3);
userMap.put(phone, trueName);
}
String sql2 = "select * from allregion";
ResultSet resultSetA = statement.executeQuery(sql2);
while (resultSetA.next()) {
String phone = resultSetA.getString(2);
String trueName = resultSetA.getString(3);
addressMap.put(phone, trueName);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String[] split = str.split(",");
if (split.length == 6) {
String trueName1 = userMap.get(split[0]);
String trueName2 = userMap.get(split[1]);
String address1 = addressMap.get(split[4]);
String address2 = addressMap.get(split[5]);
long startTimestamp = Long.parseLong(split[2]);
String startTime = sdf.format(startTimestamp * 1000);
long endTimestamp = Long.parseLong(split[3]);
String endTime = sdf.format(endTimestamp * 1000);
long timeLen = endTimestamp - startTimestamp;
pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,
address2);
context.write(pl, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogMR.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(PhoneLog.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
Path inPath = new Path("/user/test/input/a.txt");
Path out = new Path("/user/test/output");
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, out);
job.waitForCompletion(true);
}
/********** end **********/
}
step/com/DBHelper.java
package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBHelper {
/********** begin **********/
private static final String driver = "com.mysql.jdbc.Driver";
private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
private static final String username = "root";// 数据库的用户名
private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
private static Connection conn = null; // 声明数据库连接对象
static {
try {
Class.forName(driver);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static Connection getConnection() {
if (conn == null) {
try {
conn = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
e.printStackTrace();
} // 连接数据库
return conn;
}
return conn;
}
/********** end **********/
}
step/com/phonelog.java
package com;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class PhoneLog implements WritableComparable<PhoneLog> {
private String userA;
private String userB;
private String userA_Phone;
private String userB_Phone;
private String startTime;
private String endTime;
private Long timeLen;
private String userA_Address;
private String userB_Address;
public PhoneLog() {
}
public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
String endTime, Long timeLen, String userA_Address, String userB_Address) {
this.userA = userA;
this.userB = userB;
this.userA_Phone = userA_Phone;
this.userB_Phone = userB_Phone;
this.startTime = startTime;
this.endTime = endTime;
this.timeLen = timeLen;
this.userA_Address = userA_Address;
this.userB_Address = userB_Address;
}
public String getUserA_Phone() {
return userA_Phone;
}
public void setUserA_Phone(String userA_Phone) {
this.userA_Phone = userA_Phone;
}
public String getUserB_Phone() {
return userB_Phone;
}
public void setUserB_Phone(String userB_Phone) {
this.userB_Phone = userB_Phone;
}
public String getUserA() {
return userA;
}
public void setUserA(String userA) {
this.userA = userA;
}
public String getUserB() {
return userB;
}
public void setUserB(String userB) {
this.userB = userB;
}
public String getStartTime() {
return startTime;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getEndTime() {
return endTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
public Long getTimeLen() {
return timeLen;
}
public void setTimeLen(Long timeLen) {
this.timeLen = timeLen;
}
public String getUserA_Address() {
return userA_Address;
}
public void setUserA_Address(String userA_Address) {
this.userA_Address = userA_Address;
}
public String getUserB_Address() {
return userB_Address;
}
public void setUserB_Address(String userB_Address) {
this.userB_Address = userB_Address;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(userA);
out.writeUTF(userB);
out.writeUTF(userA_Phone);
out.writeUTF(userB_Phone);
out.writeUTF(startTime);
out.writeUTF(endTime);
out.writeLong(timeLen);
out.writeUTF(userA_Address);
out.writeUTF(userB_Address);
}
@Override
public void readFields(DataInput in) throws IOException {
userA = in.readUTF();
userB = in.readUTF();
userA_Phone = in.readUTF();
userB_Phone = in.readUTF();
startTime = in.readUTF();
endTime = in.readUTF();
timeLen = in.readLong();
userA_Address = in.readUTF();
userB_Address = in.readUTF();
}
@Override
public String toString() {
return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
+ timeLen + "," + userA_Address + "," + userB_Address;
}
@Override
public int compareTo(PhoneLog pl) {
if(this.hashCode() == pl.hashCode()) {
return 0;
}
return -1;
}
}
最后重启hadoop#start-all.sh 完成评测
对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此
我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r
刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr
我是一个Rails初学者,但我想从我的RailsView(html.haml文件)中查看Ruby变量的内容。我试图在ruby中打印出变量(认为它会在终端中出现),但没有得到任何结果。有什么建议吗?我知道Rails调试器,但更喜欢使用inspect来打印我的变量。 最佳答案 您可以在View中使用puts方法将信息输出到服务器控制台。您应该能够在View中的任何位置使用Haml执行以下操作:-puts@my_variable.inspect 关于ruby-on-rails-如何在我的R
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
是否可以在应用程序中包含的gem代码中知道应用程序的Rails文件系统根目录?这是gem来源的示例:moduleMyGemdefself.included(base)putsRails.root#returnnilendendActionController::Base.send:include,MyGem谢谢,抱歉我的英语不好 最佳答案 我发现解决类似问题的解决方案是使用railtie初始化程序包含我的模块。所以,在你的/lib/mygem/railtie.rbmoduleMyGemclassRailtie使用此代码,您的模块将在
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD