4.Hadoop分布式文件系统——HDFS

it2024-12-03  19

文章目录

前言HDFS简介HDFS架构Java接口下的程序准备从Hadoop URL读取数据通过FileSystem API读取数据通过seek()方法,显示数据两次将本地文件复制到Hadoop文件系统展示文件状态信息显示Hadoop文件系统中一组路径的文件信息

前言

以下示例均来自《Hadoop 权威指南》

HDFS简介

HDFS(Hadoop Distributed File System),分布式文件系统 HDFS,是Hadoop抽象文件系统的一种实现。Hadoop抽象文件系统可以与本地系统、Amazon S3等集成,甚至可以通过Web协议(webhsfs)来操作。HDFS的文件分布在集群机器上,同时提供副本进行容错及可靠性保证。例如客户端写入读取文件的直接操作都是分布在集群各个机器上的,没有单点性能压力。

HDFS架构

HDFS采用了主从(Master/Slave)结构模型,一个HDFS集群是由一个NameNode和若干个DataNode组成的。其中NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作;集群中的DataNode管理存储的数据。

Java接口下的程序

准备

这里提前在Hadoop的文件系统中的hdfs://localhost:9000/input/docs目录中新建了一个people.txt文件,并在该文件中添加了“123456”的字样,只要没有特殊说明,下面程序的输入(即args[0])均为该文件。若没有添加该文件,可前往 http://localhost:9870/ 的GUI界面进行添加,如图所示: 这样,就可以开始代码的编写了

注意:以下的代码是作者使用的Windows系统下的IDEA工具进行编写和运行,其中的args[0]、args[1]选项本来是用命令java xxx args[0] args[1]来进行运行。但是在IDEA中,可以在运行配置中手动设置这个值,设置步骤如图所示 此外,在运行下面的代码前,记得要在hadoop/sbin目录下用start-all.cmd启动所有服务,并且保证输入jps时,能看到如下的界面,才能说明服务开启完毕 最后,别忘记去Maven导Hadoop的相关jar包,这里贴出必要jar包的代码,方便读者直接添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.3.0</version> </dependency>

从Hadoop URL读取数据

import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; import java.net.URL; public class URLCat { static{ URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) { InputStream in = null; try { in = new URL(args[0]).openStream(); IOUtils.copyBytes(in,System.out,4096,false); } catch (IOException e) { e.printStackTrace(); }finally { IOUtils.closeStream(in); } } }

结果如图所示(为了更好地显示结果,关闭了日志功能):

通过FileSystem API读取数据

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.IOException; import java.io.InputStream; import java.net.URI; public class FileSystemCat { public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri),conf); InputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in,System.out,4096,false); } catch (IOException e) { e.printStackTrace(); }finally { IOUtils.closeStream(in); } } }

结果如图所示:

通过seek()方法,显示数据两次

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.IOException; import java.net.URI; public class FileSystemDoubleCat { public static void main(String[] args) throws IOException { String url = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(url),conf); FSDataInputStream in =null; try { in = fs.open(new Path(url)); IOUtils.copyBytes(in,System.out,4096,false); in.seek(0); IOUtils.copyBytes(in,System.out,4096,false); } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeStream(in); } } }

结果如图所示:

将本地文件复制到Hadoop文件系统

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; import java.io.*; import java.net.URI; public class FileCopyWithProgress { public static void main(String[] args) throws IOException { String localSrc = args[0]; String dst = args[1]; // args[1]为输出,即目标文件 InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst),conf); OutputStream out = fs.create(new Path(dst), new Progressable() { @Override public void progress() { System.out.println("."); } }); IOUtils.copyBytes(in,out,4096,true); } }

运行结果如图(这里将源文件设置为跟people.txt一样的文件,目标文件则为people2.txt):

展示文件状态信息

注意:在运行该段代码前,由于该代码没有main()函数,需要从maven中导入junit包才能进行测试,代码如下:

<!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>

导入包后,该段代码带有@Test注解的方法的左侧均会出现一个带有绿色三角形的图标,点击该图片即可单独运行该方法

package URLReadTwo; import static org.junit.Assert.*; import static org.hamcrest.CoreMatchers.*; import org.junit.Before; import org.junit.After; import org.junit.Test; import java.net.URI; import java.io.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hdfs.MiniDFSCluster; public class ShowFileStatusTest { private MiniDFSCluster cluster; private FileSystem fs; @Before public void setUp() throws IOException { Configuration conf = new Configuration(); if (System.getProperty("test.build.data") == null) { System.setProperty("test.build.data", "/tmp"); } cluster = new MiniDFSCluster(conf, 1, true, null); fs = cluster.getFileSystem(); OutputStream out = fs.create(new Path("/dir/file")); out.write("content".getBytes("UTF-8")); out.close(); } @After public void tearDown() throws IOException { if( fs != null) { fs.close(); } if (cluster != null) { cluster.shutdown(); } } //有几个@Test就会做几个测试,应该是标记其后的代码是要执行测试的 @Test(expected = FileNotFoundException.class) public void ThrowsFileNotFoundForNonExistentFile() throws IOException { fs.getFileStatus(new Path("no-such-file")); } @Test public void fileStatusForFile() throws IOException { Path file = new Path("/dir/file"); FileStatus stat = fs.getFileStatus(file); assertThat(stat.getPath().toUri().getPath(), is("/dir/file")); assertThat(stat.isDir(), is(false)); assertThat(stat.getLen(), is(7L)); //assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); assertThat(stat.getReplication(), is((short)1)); assertThat(stat.getBlockSize(), is(134217728L)); assertThat(stat.getOwner(), is("lishengda")); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rwxr-xr-x")); } @Test public void fileStatusForDirectory() throws IOException { Path dir = new Path("/dir"); FileStatus stat = fs.getFileStatus(dir); assertThat(stat.getPath().toUri().getPath(), is("/dir")); assertThat(stat.isDir(), is(true)); assertThat(stat.getLen(), is(0L)); // assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); assertThat(stat.getReplication(), is((short) 0)); assertThat(stat.getBlockSize(), is(0L)); assertThat(stat.getOwner(), is("lishengda")); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rwxr-xr-x")); } }

显示Hadoop文件系统中一组路径的文件信息

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URI; public class ListStatus { public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri),conf); Path[] paths = new Path[args.length]; for (int i = 0; i < paths.length; i++) { paths[i] = new Path(args[i]); } FileStatus[] status = fs.listStatus(paths); Path[] listedPaths = FileUtil.stat2Paths(status); for (Path listedPath : listedPaths) { System.out.println(listedPath); } } }

运行结果如图:

最新回复(0)