package com.djm.hdfsclient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
public class HdfsClient {
FileSystem fileSystem = null;
@Before
public void init() {
try {
fileSystem = FileSystem.get(URI.create("hdfs://hadoop102:9000"), new Configuration(), "djm");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 上传文件
*/
@Test
public void put() {
try {
fileSystem.copyFromLocalFile(new Path("C:\\Users\\Administrator\\Desktop\\Hadoop 入门.md"), new Path("/"));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 下载文件
*/
@Test
public void download() {
try {
// useRawLocalFileSystem表示是否开启文件校验
fileSystem.copyToLocalFile(false, new Path("/Hadoop 入门.md"), new Path("C:\\Users\\Administrator\\Desktop\\Hadoop 入门1.md"), true);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 删除文件
*/
@Test
public void delete() {
try {
// recursive表示是否递归删除
fileSystem.delete(new Path("/Hadoop 入门.md"), true);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 文件重命名
*/
@Test
public void rename() {
try {
fileSystem.rename(new Path("/tmp"), new Path("/temp"));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 查看文件信息
*/
@Test
public void ls() {
try {
RemoteIterator<locatedfilestatus> listFiles = fileSystem.listFiles(new Path("/etc"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
if (fileStatus.isFile()) {
// 仅输出文件信息
System.out.print(fileStatus.getPath().getName() + " " + fileStatus.getLen() + " " + fileStatus.getPermission() + " " + fileStatus.getGroup() + " ");
// 获取文件块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取节点信息
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.print(host + " ");
}
}
System.out.println();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
@After
public void exit() {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
1、客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
2、NameNode 返回是否可以上传。
3、客户端请求第一个 Block 上传到哪几个 DataNode。
4、NameNode 返回三个节点,分别是 dn1、dn2、dn3。
5、客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。
6、按倒序逐级响应客户端。
7、客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答。
8、当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。
1、客户端通过 Distributed FileSystem 向 NameNode 请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
2、根据就近原则挑选一台 DataNode,请求读取数据。
3、DataNode 开始传输数据给客户端。
4、客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。
NameNode 工作:
1、第一次启动 NameNode格式化后,创建 Fsimage 和 Edits 文件,如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
2、客户端对元数据进行增删改操作。
3、NameNode 记录操作日志,更新滚动日志。
4、NameNode 在内存中对元数据进行增删改操作。
Secondary NameNode 工作:
1、Secondary NameNode 询问 NameNode 是否需要 CheckPoint,直接带回 NameNode 是否检查结果。
2、Secondary NameNode请求执行CheckPoint。
3、NameNode 滚动正在写的 Edits 日志。
4、将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。
5、Secondary NameNode 加载编辑日志和镜像文件到内存合并。
6、生成新的镜像文件 fsimage.chkpoint。
7、拷贝 fsimage.chkpoint 到 NameNode。
8、NameNode 将 fsimage.chkpoint 重命名为 fsimage。
oiv 查看Fsimage 文件
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>操作动作次数</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description> 1分钟检查一次操作次数</description>
</property>
基本命令:
hdfs dfsadmin -safemode get:查看安全模式状态
hdfs dfsadmin -safemode enter:进入安全模式状态
hdfs dfsadmin -safemode leave:离开安全模式状态
hdfs dfsadmin -safemode wait:等待安全模式状态
1、一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
2、DataNode 启动后向 NameNode 注册,通过后,周期性(1小时)的向 NameNode 上报所有的块信息。
3、心跳是每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据到另一台机器,或删除某个数据块,如果超过 10 分钟没有收到某个 DataNode 的心跳,则认为该节点不可用。
4、集群运行中可以安全加入和退出一些机器。
1、当 DataNode 读取 Block 的时候,它会计算 CheckSum。
2、如果计算后的 CheckSum,与 Block 创建时值不一样,说明 Block 已经损坏。
3、Client 读取其他 DataNode 上的 Block。
4、在其文件创建后周期验证。
[hdfs-site.xml]
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
<value>300000</value>
<description>毫秒</description>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>3</value>
<description>秒</description>
</property>
[djm@hadoop101 hadoop]$ touch blacklist
配置加入黑名单的主机
hadoop102
配置 hdfs-site.xml
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/module/hadoop-2.7.2/etc/hadoop/blacklist</value>
</property>
刷新 namenodes
[djm@hadoop102 hadoop-2.7.2]$ hdfs dfsadmin -refreshNodes
更新 ResourceManager 节点
[djm@hadoop102 hadoop-2.7.2]$ yarn rmadmin -refreshNodes
如果数据不均衡,可以用命令实现集群的再平衡
[djm@hadoop102 hadoop-2.7.2]$ start-balancer.sh
[djm@hadoop101 hadoop]$ touch whitelist
配置加入黑名单的主机
hadoop102
hadoop103
hadoop104
配置 hdfs-site.xml
<property>
<name>dfs.hosts</name>
<value>/opt/module/hadoop-2.7.2/etc/hadoop/whitelist</value>
</property>
刷新 namenodes
[djm@hadoop102 hadoop-2.7.2]$ hdfs dfsadmin -refreshNodes
更新 ResourceManager 节点
[djm@hadoop102 hadoop-2.7.2]$ yarn rmadmin -refreshNodes
如果数据不均衡,可以用命令实现集群的再平衡
[djm@hadoop102 hadoop-2.7.2]$ start-balancer.sh
黑白名单的区别:
白名单比较严格,黑名单比较平缓,处于黑名单中的主机会同步数据结束后继续处于集群,只是不在处理请求而已,而不处于白名单中的主机会直接被干掉。
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
</property>
[djm@hadoop102 hadoop-2.7.2]$ hadoop distcp hdfs://haoop102:9000/user/djm/hello.txt hdfs://hadoop103:9000/user/djm/hello.txt
归档文件
[djm@hadoop102 hadoop-2.7.2]$ hadoop archive -archiveName input.har –p /user/djm/input /user/djm/output
查看归档
[djm@hadoop102 hadoop-2.7.2]$ hadoop fs -lsr har:///user/djm/output/input.har
解归档文件
[atguigu@djm hadoop-2.7.2]$ hadoop fs -cp har:/// user/djm/output/input.har/* /user/djm
</locatedfilestatus>
1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模
目录:一、简介二、HQL的执行流程三、索引四、索引案例五、Hive常用DDL操作六、Hive常用DML操作七、查询结果插入到表八、更新和删除操作九、查询结果写出到文件系统十、HiveCLI和Beeline命令行的基本使用十一、Hive配置一、简介Hive是一个构建在Hadoop之上的数据仓库,它可以将结构化的数据文件映射成表,并提供类SQL查询功能,用于查询的SQL语句会被转化为MapReduce作业,然后提交到Hadoop上运行。特点:简单、容易上手(提供了类似sql的查询语言hql),使得精通sql但是不了解Java编程的人也能很好地进行大数据分析;灵活性高,可以自定义用户函数(UDF)和
云计算实验中要求我们在Linux系统安装Hadoop,故来做一个简单的记录。· 注:我的操作系统环境是Ubuntu-20.04.3,安装的JDK版本为jdk1.8.0_301,安装的Hadoop版本为hadoop2.7.1。(不确定其他版本是否会出现版本兼容问题)Hadoop安装步骤如下: 一、更新apt和安装vim编辑器 二、配置本机无密码登录SSH 三、安装JAVA环境 四、下载安装Hadoop 五、伪分布式搭建一、更新apt和安装vim编辑器1、更新aptsudoapt-getupdate2、安装vim
一、设置免密登录1、系统偏好设置-----共享----勾选远程登录,所有用户2、打开终端,输入命令ssh-keygen-trsa,一直回车即可2.查看生成的公钥和私钥 cd~/.ssh ls会看到~/.ssh目录下有两个文件:①私钥:id_rsa②公钥:id_rsa.pub3.将公钥内容写入到~/.ssh/authorized_keys中 cat~/.ssh/id_rsa.pub>>~/.ssh/authorized_keys4.测试在terminal终端输入 sshlocalhost如果出现以下询问输入yes,不需要输入密码就能登录,说明配置成功Areyousureyouw
文章目录实验二:HDFS+MapReduce数据处理与存储实验1.实验目的2.实验环境3.实验内容3.1HDFS部分3.1.1上传文件3.1.2下载文件3.1.3显示文件信息3.1.4显示目录信息3.1.5删除文件3.1.6移动文件3.2MapReduce部分3.2.0Mapreduce原理3.2.1合并和去重3.2.1.1编写Merge.java代码3.2.1.2编译执行3.2.2文件的排序3.2.2.1编写Sort.java代码3.2.2.2编译执行4.踩坑记录5.心得体会6.源码附录6.1Merge.java完整代码6.2Sort.java完整代码实验二:HDFS+MapReduce数据
博学之,审问之,慎思之,明辨之,笃行之🏂hiveonspark搭建好后,任务提交会有问题,因为通过hive会话提交的任务一直存在且不会结束(除非关掉这个hive会话),根本原因是这些任务提交到了Yarn的同一个队列中,前面的任务没有执行完毕后面的任务不会执行,所以解决办法是增加一个Yarn队列,指定任务提交的队列,这样就不会出现任务的阻塞。目录一、情景复现二、原因三、Yarn队列配置—增加队列1.情景复现:搭建好hiveonspark后,在命令行直接进入hive会话,提交任务后,在ResourceManager上jps查看进程可以看到有个进程ApplicationMaster一直存在,打开Re
目录SparkStreaming的核心是DStream一、DStream简介二.DStream编程模型三.DStream转换操作SparkStreaming的核心是DStream一、DStream简介1.Spark Streaming提供了一个高级抽象的流,即DStream(离散流)。2.DStream的内部结构是由一系列连续的RDD组成,每个RDD都是一小段由时间分隔开来的数据集。二.DStream编程模型三.DStream转换操作transform()1.在3个节点启动zookeeper集群服务$zkServer.shstart2.启动kafka(3个节点都要)$/opt/module/k
目录基本语法一、上传二、下载三、其他增删改查操作3.1增3.2删3.3改3.4查基本语法hadoopfs和 hdfsdfs(hadoopfs和hdfsdfs命令等效。)-hdfs dfs只能操作HDFS文件系统-hadoopfs可操作任意文件系统,不仅仅是hdfs文件系统,使用范围更广[root@hadoop102hadoop-3.1.3]$bin/hadoopfs[-appendToFile...][-cat[-ignoreCrc]...][-chgrp[-R]GROUPPATH...][-chmod[-R]PATH...][-chown[-R][OWNER][:[GROUP]]PATH..
packagemainimport("fmt""github.com/colinmarc/hdfs")funcmain(){client,err:=hdfs.New("192.168.0.38:50070")fs,err:=client.ReadDir("/")fmt.Println(err)fmt.Println(fs)}err是意外的EOF而且我发现错误发生在func(c*NamenodeConnection)readResponse(methodstring,respproto.Message)error{..._,err=io.ReadFull(c.conn,packet)..
我正在为HDFS中的写入实现一个数据节点故障转移,当block的第一个数据节点发生故障时,HDFS仍然可以写入一个block。算法是。首先,将识别故障节点。然后,请求一个新block。HDFSportapi提供了excludeNodes,我用它来告诉Namenode不要在那里分配新的block。failedDatanodes被识别为失败的数据节点,它们在日志中是正确的。req:=&hdfs.AddBlockRequestProto{Src:proto.String(bw.src),ClientName:proto.String(bw.clientName),ExcludeNodes:f