SpringBoot集成Hadoop系列一 您所在的位置:网站首页 如何将数据上传到hdfs SpringBoot集成Hadoop系列一

SpringBoot集成Hadoop系列一

2024-06-30 07:42| 来源: 网络整理| 查看: 265

一.对HDFS操作设计以下几个主要的类:

Configuration:封装了客户端或者服务器的配置信息

FileSystem:此类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作通过FileSystem的静态方法get获得该对象,例:FileSystem hdfs = FileSystem.get(conf);

FSDataInputStream:这是HDFS中的输入流,通过由FileSystem的open方法获取

FSDataOutputStream:这是HDFS中的输出流,通过由FileSystem的create方法获取

二.依赖配置 4.0.0 com.hdfs HadoopTest 0.0.1-SNAPSHOT jar HadoopTest http://maven.apache.org org.springframework.boot spring-boot-starter-parent 2.0.0.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-web org.apache.hadoop hadoop-common 3.1.1 org.apache.hadoop hadoop-hdfs 3.1.1 org.apache.hadoop hadoop-client 3.1.1 org.apache.hadoop hadoop-mapreduce-client-core 3.1.1 cn.bestwu ik-analyzers 5.1.0 jdk.tools jdk.tools 1.8 system ${JAVA_HOME}/lib/tools.jar junit junit test org.springframework.boot spring-boot-maven-plugin org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 # tomcat thread = 200 server.tomcat.max-threads=1000 server.port=8900 # session time 30 server.session-timeout=60 spring.application.name=hadoop spring.servlet.multipart.max-file-size=50MB spring.servlet.multipart.max-request-size=50MB hdfs.path=hdfs://localhost:9000 hdfs.username=linhaiy logging.config=classpath:logback.xml 三.HDFS文件操作接口开发 package com.hadoop.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; /** * HDFS配置类 * @author linhaiy * @date 2019.05.18 */ @Configuration public class HdfsConfig { @Value("${hdfs.path}") private String path; public String getPath() { return path; } public void setPath(String path) { this.path = path; } } package com.hadoop.hdfs.entity; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 用户实体类 * @author linhaiy * @date 2019.05.18 */ public class User implements Writable { private String username; private Integer age; private String address; public User() { super(); // TODO Auto-generated constructor stub } public User(String username, Integer age, String address) { super(); this.username = username; this.age = age; this.address = address; } @Override public void write(DataOutput output) throws IOException { // 把对象序列化 output.writeChars(username); output.writeInt(age); output.writeChars(address); } @Override public void readFields(DataInput input) throws IOException { // 把序列化的对象读取到内存中 username = input.readUTF(); age = input.readInt(); address = input.readUTF(); } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User [username=" + username + ", age=" + age + ", address=" + address + "]"; } } package com.hadoop.hdfs.service; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.web.multipart.MultipartFile; import com.hadoop.util.JsonUtil; @Component public class HdfsService { @Value("${hdfs.path}") private String path; @Value("${hdfs.username}") private String username; private static String hdfsPath; private static String hdfsName; private static final int bufferSize = 1024 * 1024 * 64; /** * 获取HDFS配置信息 * @return */ private static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); return configuration; } /** * 获取HDFS文件系统对象 * @return * @throws Exception */ public static FileSystem getFileSystem() throws Exception { // 客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份 // DHADOOP_USER_NAME=hadoop // 也可以在构造客户端fs对象时,通过参数传递进去 FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName); return fileSystem; } /** * 在HDFS创建文件夹 * @param path * @return * @throws Exception */ public static boolean mkdir(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } if (existFile(path)) { return true; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); boolean isOk = fs.mkdirs(srcPath); fs.close(); return isOk; } /** * 判断HDFS文件是否存在 * @param path * @return * @throws Exception */ public static boolean existFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } FileSystem fs = getFileSystem(); Path srcPath = new Path(path); boolean isExists = fs.exists(srcPath); return isExists; } /** * 读取HDFS目录信息 * @param path * @return * @throws Exception */ public static List readPathInfo(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path newPath = new Path(path); FileStatus[] statusList = fs.listStatus(newPath); List list = new ArrayList(); if (null != statusList && statusList.length > 0) { for (FileStatus fileStatus : statusList) { Map map = new HashMap(); map.put("filePath", fileStatus.getPath()); map.put("fileStatus", fileStatus.toString()); list.add(map); } return list; } else { return null; } } /** * HDFS创建文件 * @param path * @param file * @throws Exception */ public static void createFile(String path, MultipartFile file) throws Exception { if (StringUtils.isEmpty(path) || null == file.getBytes()) { return; } String fileName = file.getOriginalFilename(); FileSystem fs = getFileSystem(); // 上传时默认当前目录,后面自动拼接文件的目录 Path newPath = new Path(path + "/" + fileName); // 打开一个输出流 FSDataOutputStream outputStream = fs.create(newPath); outputStream.write(file.getBytes()); outputStream.close(); fs.close(); } /** * 读取HDFS文件内容 * @param path * @return * @throws Exception */ public static String readFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); FSDataInputStream inputStream = null; try { inputStream = fs.open(srcPath); // 防止中文乱码 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String lineTxt = ""; StringBuffer sb = new StringBuffer(); while ((lineTxt = reader.readLine()) != null) { sb.append(lineTxt); } return sb.toString(); } finally { inputStream.close(); fs.close(); } } /** * 读取HDFS文件列表 * @param path * @return * @throws Exception */ public static List listFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); // 递归找到所有文件 RemoteIterator filesList = fs.listFiles(srcPath, true); List returnList = new ArrayList(); while (filesList.hasNext()) { LocatedFileStatus next = filesList.next(); String fileName = next.getPath().getName(); Path filePath = next.getPath(); Map map = new HashMap(); map.put("fileName", fileName); map.put("filePath", filePath.toString()); returnList.add(map); } fs.close(); return returnList; } /** * HDFS重命名文件 * @param oldName * @param newName * @return * @throws Exception */ public static boolean renameFile(String oldName, String newName) throws Exception { if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) { return false; } FileSystem fs = getFileSystem(); // 原文件目标路径 Path oldPath = new Path(oldName); // 重命名目标路径 Path newPath = new Path(newName); boolean isOk = fs.rename(oldPath, newPath); fs.close(); return isOk; } /** * 删除HDFS文件 * @param path * @return * @throws Exception */ public static boolean deleteFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } if (!existFile(path)) { return false; } FileSystem fs = getFileSystem(); Path srcPath = new Path(path); boolean isOk = fs.deleteOnExit(srcPath); fs.close(); return isOk; } /** * 上传HDFS文件 * @param path * @param uploadPath * @throws Exception */ public static void uploadFile(String path, String uploadPath) throws Exception { if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) { return; } FileSystem fs = getFileSystem(); // 上传路径 Path clientPath = new Path(path); // 目标路径 Path serverPath = new Path(uploadPath); // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false fs.copyFromLocalFile(false, clientPath, serverPath); fs.close(); } /** * 下载HDFS文件 * @param path * @param downloadPath * @throws Exception */ public static void downloadFile(String path, String downloadPath) throws Exception { if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) { return; } FileSystem fs = getFileSystem(); // 上传路径 Path clientPath = new Path(path); // 目标路径 Path serverPath = new Path(downloadPath); // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false fs.copyToLocalFile(false, clientPath, serverPath); fs.close(); } /** * HDFS文件复制 * @param sourcePath * @param targetPath * @throws Exception */ public static void copyFile(String sourcePath, String targetPath) throws Exception { if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) { return; } FileSystem fs = getFileSystem(); // 原始文件路径 Path oldPath = new Path(sourcePath); // 目标路径 Path newPath = new Path(targetPath); FSDataInputStream inputStream = null; FSDataOutputStream outputStream = null; try { inputStream = fs.open(oldPath); outputStream = fs.create(newPath); IOUtils.copyBytes(inputStream, outputStream, bufferSize, false); } finally { inputStream.close(); outputStream.close(); fs.close(); } } /** * 打开HDFS上的文件并返回byte数组 * @param path * @return * @throws Exception */ public static byte[] openFileToBytes(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); try { FSDataInputStream inputStream = fs.open(srcPath); return IOUtils.readFullyToByteArray(inputStream); } finally { fs.close(); } } /** * 打开HDFS上的文件并返回java对象 * @param path * @return * @throws Exception */ public static T openFileToObject(String path, Class clazz) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } String jsonStr = readFile(path); return JsonUtil.fromObject(jsonStr, clazz); } /** * 获取某个文件在HDFS的集群位置 * @param path * @return * @throws Exception */ public static BlockLocation[] getFileBlockLocations(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); FileStatus fileStatus = fs.getFileStatus(srcPath); return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); } @PostConstruct public void getPath() { hdfsPath = this.path; } @PostConstruct public void getName() { hdfsName = this.username; } public static String getHdfsPath() { return hdfsPath; } public String getUsername() { return username; } } package com.hadoop.hdfs.controller; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.BlockLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; import com.hadoop.hdfs.entity.User; import com.hadoop.hdfs.service.HdfsService; import com.hadoop.util.Result; @RestController @RequestMapping("/hadoop/hdfs") public class HdfsAction { private static Logger LOGGER = LoggerFactory.getLogger(HdfsAction.class); /** * 创建文件夹 * @param path * @return * @throws Exception */ @RequestMapping(value = "mkdir", method = RequestMethod.POST) @ResponseBody public Result mkdir(@RequestParam("path") String path) throws Exception { if (StringUtils.isEmpty(path)) { LOGGER.debug("请求参数为空"); return new Result(Result.FAILURE, "请求参数为空"); } // 创建空文件夹 boolean isOk = HdfsService.mkdir(path); if (isOk) { LOGGER.debug("文件夹创建成功"); return new Result(Result.SUCCESS, "文件夹创建成功"); } else { LOGGER.debug("文件夹创建失败"); return new Result(Result.FAILURE, "文件夹创建失败"); } } /** * 读取HDFS目录信息 * @param path * @return * @throws Exception */ @PostMapping("/readPathInfo") public Result readPathInfo(@RequestParam("path") String path) throws Exception { List list = HdfsService.readPathInfo(path); return new Result(Result.SUCCESS, "读取HDFS目录信息成功", list); } /** * 获取HDFS文件在集群中的位置 * @param path * @return * @throws Exception */ @PostMapping("/getFileBlockLocations") public Result getFileBlockLocations(@RequestParam("path") String path) throws Exception { BlockLocation[] blockLocations = HdfsService.getFileBlockLocations(path); return new Result(Result.SUCCESS, "获取HDFS文件在集群中的位置", blockLocations); } /** * 创建文件 * @param path * @return * @throws Exception */ @PostMapping("/createFile") public Result createFile(@RequestParam("path") String path, @RequestParam("file") MultipartFile file) throws Exception { if (StringUtils.isEmpty(path) || null == file.getBytes()) { return new Result(Result.FAILURE, "请求参数为空"); } HdfsService.createFile(path, file); return new Result(Result.SUCCESS, "创建文件成功"); } /** * 读取HDFS文件内容 * @param path * @return * @throws Exception */ @PostMapping("/readFile") public Result readFile(@RequestParam("path") String path) throws Exception { String targetPath = HdfsService.readFile(path); return new Result(Result.SUCCESS, "读取HDFS文件内容", targetPath); } /** * 读取HDFS文件转换成Byte类型 * @param path * @return * @throws Exception */ @PostMapping("/openFileToBytes") public Result openFileToBytes(@RequestParam("path") String path) throws Exception { byte[] files = HdfsService.openFileToBytes(path); return new Result(Result.SUCCESS, "读取HDFS文件转换成Byte类型", files); } /** * 读取HDFS文件装换成User对象 * @param path * @return * @throws Exception */ @PostMapping("/openFileToUser") public Result openFileToUser(@RequestParam("path") String path) throws Exception { User user = HdfsService.openFileToObject(path, User.class); return new Result(Result.SUCCESS, "读取HDFS文件装换成User对象", user); } /** * 读取文件列表 * @param path * @return * @throws Exception */ @PostMapping("/listFile") public Result listFile(@RequestParam("path") String path) throws Exception { if (StringUtils.isEmpty(path)) { return new Result(Result.FAILURE, "请求参数为空"); } List returnList = HdfsService.listFile(path); return new Result(Result.SUCCESS, "读取文件列表成功", returnList); } /** * 重命名文件 * @param oldName * @param newName * @return * @throws Exception */ @PostMapping("/renameFile") public Result renameFile(@RequestParam("oldName") String oldName, @RequestParam("newName") String newName) throws Exception { if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) { return new Result(Result.FAILURE, "请求参数为空"); } boolean isOk = HdfsService.renameFile(oldName, newName); if (isOk) { return new Result(Result.SUCCESS, "文件重命名成功"); } else { return new Result(Result.FAILURE, "文件重命名失败"); } } /** * 删除文件 * @param path * @return * @throws Exception */ @PostMapping("/deleteFile") public Result deleteFile(@RequestParam("path") String path) throws Exception { boolean isOk = HdfsService.deleteFile(path); if (isOk) { return new Result(Result.SUCCESS, "delete file success"); } else { return new Result(Result.FAILURE, "delete file fail"); } } /** * 上传文件 * @param path * @param uploadPath * @return * @throws Exception */ @PostMapping("/uploadFile") public Result uploadFile(@RequestParam("path") String path, @RequestParam("uploadPath") String uploadPath) throws Exception { HdfsService.uploadFile(path, uploadPath); return new Result(Result.SUCCESS, "upload file success"); } /** * 下载文件 * @param path * @param downloadPath * @return * @throws Exception */ @PostMapping("/downloadFile") public Result downloadFile(@RequestParam("path") String path, @RequestParam("downloadPath") String downloadPath) throws Exception { HdfsService.downloadFile(path, downloadPath); return new Result(Result.SUCCESS, "download file success"); } /** * HDFS文件复制 * @param sourcePath * @param targetPath * @return * @throws Exception */ @PostMapping("/copyFile") public Result copyFile(@RequestParam("sourcePath") String sourcePath, @RequestParam("targetPath") String targetPath) throws Exception { HdfsService.copyFile(sourcePath, targetPath); return new Result(Result.SUCCESS, "copy file success"); } /** * 查看文件是否已存在 * @param path * @return * @throws Exception */ @PostMapping("/existFile") public Result existFile(@RequestParam("path") String path) throws Exception { boolean isExist = HdfsService.existFile(path); return new Result(Result.SUCCESS, "file isExist: " + isExist); } } 四.一些测试结果截图

 

 

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有