scala 编写 hdfs 工具类
scala 创建 删除 hdfs 文件或目录
scala 上传 下载 hdfs 文件
scala 读取 写入 hdfs 文件
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.lihaozhe</groupId><artifactId>scala-code</artifactId><version>1.0.0</version><name>${project.artifactId}</name><properties><jdk.version>21</jdk.version><!-- 公共配置 --><maven.compiler.source>21</maven.compiler.source><maven.compiler.target>21</maven.compiler.target><maven.compiler.compilerVersion>21</maven.compiler.compilerVersion><maven.compiler.encoding>utf-8</maven.compiler.encoding><project.build.sourceEncoding>utf-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.test.failure.ignore>true</maven.test.failure.ignore><maven.test.skip>true</maven.test.skip><commons-io.version>2.18.0</commons-io.version><commons-lang3.version>3.17.0</commons-lang3.version><druid.version>1.2.24</druid.version><fastjson.version>2.0.53</fastjson.version><fastjson2.version>2.0.53</fastjson2.version><gson.version>2.11.0</gson.version><hutool.version>5.8.34</hutool.version><jackson.version>2.18.2</jackson.version><junit.version>5.11.3</junit.version><lombok.version>1.18.36</lombok.version><mysql.version>9.1.0</mysql.version><scala.version>2.13.15</scala.version></properties><dependencies><dependency><groupId>org.scala-tools.testing</groupId><artifactId>specs_2.10</artifactId><version>1.6.9</version><scope>test</scope></dependency><dependency><groupId>org.scalatest</groupId><artifactId>scalatest_2.13</artifactId><version>3.2.19</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api --><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-api</artifactId><version>${junit.version}</version><!-- 作用域 --><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-engine --><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-engine</artifactId><version>${junit.version}</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>${commons-lang3.version}</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>${commons-io.version}</version></dependency><!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>${gson.version}</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!--jackson--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>${jackson.version}</version></dependency><!--java 小工具--><dependency><groupId>com.github.binarywang</groupId><artifactId>java-testdata-generator</artifactId><version>1.1.2</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.4.0</version></dependency></dependencies><build><finalName>${project.artifactId}</finalName><!--<outputDirectory>../package</outputDirectory>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.13.0</version><configuration><!-- 设置编译字符编码 --><encoding>UTF-8</encoding><!-- 设置编译jdk版本 --><source>${jdk.version}</source><target>${jdk.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-clean-plugin</artifactId><version>3.4.0</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>3.3.1</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.4.2</version></plugin><!-- 编译级别 --><!-- 打包的时候跳过测试junit begin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>3.5.2</version><configuration><skip>true</skip></configuration></plugin><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>4.9.2</version><configuration><scalaCompatVersion>${scala.version}</scalaCompatVersion><scalaVersion>${scala.version}</scalaVersion></configuration><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution><execution><goals><goal>testCompile</goal></goals></execution><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.7.1</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><repositories><repository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases></repository></repositories><pluginRepositories><pluginRepository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories>
</project>
HDFS工具类
package cn.lhz.util.hadoopimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystemimport java.io.IOException
import java.net.URI
import scala.io.{Codec, Source}/*** HDFS 工具类 scala版** @author 李昊哲* @version 1.0.0 */
object HdfsUtil {private val username = "root"/*** 获取 HDFS 伪分布式文件系统** @return 伪分布式文件系统* @throws IOException IOException*/@throws[IOException]def getFs(url: String): FileSystem = getFs(username, url)/*** 获取 HDFS 伪分布式文件系统** @return 伪分布式文件系统* @throws IOException IOException*/@throws[IOException]def getFs(username: String, url: String): FileSystem = {System.setProperty("HADOOP_USER_NAME", username)// 配置项val conf = new Configuration()// 设置 HDFS 地址conf.set("fs.defaultFS", "hdfs://" + url + ":8020")// 获取hdfs资源FileSystem.get(conf)}/*** 断开 伪分布式文件系统 HDFS 连接** @param fs HDFS 伪分布式文件系统* @throws IOException IOException*/@throws[IOException]def close(fs: FileSystem): Unit = {fs.close()}/*** 获取 HDFS 完全分布式文件系统** @return 完全分布式文件系统* @throws IOException IOException*/@throws[IOException]def getDfs: DistributedFileSystem = getDfs(username)/*** 获取 HDFS 完全分布式文件系统** @return 完全分布式文件系统* @throws IOException IOException*/@throws[IOException]def getDfs(username: String): DistributedFileSystem = {// 通过这种方式设置java客户端身份System.setProperty("HADOOP_USER_NAME", username)// 配置参数val conf = new Configuration()val dfs = new DistributedFileSystem()val nameService = conf.get("dfs.nameservices")val hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020// 分布式文件系初始化dfs.initialize(URI.create(hdfsRPCUrl), conf)dfs}/*** 断开 HDFS 完全分布式文件系统 连接** @param dfs HDFS 完全分布式文件系统* @throws IOException IOException*/@throws[IOException]def close(dfs: DistributedFileSystem): Unit = {dfs.close()}/*** 创建目录 (完全分布式集群)** @param path 文件路径* @return 创建是否成功* @throws IOException IOException*/@throws[IOException]def mkdirs(path: String): Boolean = {val dfs = getDfsval result = dfs.mkdirs(new Path(path))dfs.close()result}/*** 创建目录 (完全分布式集群)** @param dfs hdfs文件系统* @param path 文件路径* @return 创建是否成功* @throws IOException IOException*/@throws[IOException]def mkdirs(dfs: DistributedFileSystem, path: String): Boolean = dfs.mkdirs(new Path(path))/*** 创建目录 (伪分布式集群)** @param path 文件路径* @return 创建是否成功* @throws IOException IOException*/@throws[IOException]def mkdir(path: String): Boolean = {val fs = getFs(path)val result = mkdir(fs, path)fs.close()result}/*** 创建目录 (伪分布式集群)** @param fs hdfs文件系统* @param path 文件路径* @return 创建是否成功* @throws IOException IOException*/@throws[IOException]def mkdir(fs: FileSystem, path: String): Boolean = fs.mkdirs(new Path(path))/*** 修改文件或目录名称 (完全分布式集群)** @param src 原文件路径* @param dst 新文件路径* @return 修改是否成功* @throws IOException IOException*/@throws[IOException]def rename(src: Path, dst: Path): Boolean = {val dfs = getDfsval result = dfs.rename(src, dst)dfs.close()result}/*** 修改文件或目录名称 (完全分布式集群)** @param dfs hdfs文件系统* @param src 原文件路径* @param dst 新文件路径* @return 修改是否成功* @throws IOException IOException*/@throws[IOException]def rename(dfs: DistributedFileSystem, src: Path, dst: Path): Boolean = dfs.rename(src, dst)/*** 修改文件或目录名称 (伪分布式集群)** @param fs hdfs文件系统* @param src 原文件路径* @param dst 新文件路径* @return 修改是否成功* @throws IOException IOException*/@throws[IOException]def rename(fs: FileSystem, src: Path, dst: Path): Boolean = fs.rename(src, dst)/*** 删除文件或目录 (完全分布式集群)** @param path 文件路径* @return 删除是否成功* @throws IOException IOException*/@throws[IOException]def delete(path: String): Boolean = {val dfs = getDfsval result = delete(dfs, path)dfs.close()result}/*** 删除文件或目录 (完全分布式集群)** @param dfs hdfs文件系统* @param path 文件路径* @return 删除是否成功* @throws IOException IOException*/@throws[IOException]def delete(dfs: DistributedFileSystem, path: String): Boolean = dfs.delete(new Path(path), true)/*** 删除文件或目录 (伪分布式集群)** @param fs hdfs文件系统* @param path 文件路径* @return 删除是否成功* @throws IOException IOException*/@throws[IOException]def delete(fs: FileSystem, path: String): Boolean = fs.delete(new Path(path), true)/*** 文件上传 (完全分布式集群)** @param src 本地文件系统路径* @param dst hdfs文件系统路径* @throws IOException IOException*/@throws[IOException]def copyFromLocalFile(src: String, dst: String): Unit = {copyFromLocalFile(new Path(src), new Path(dst))}/*** 文件上传 (完全分布式集群)** @param src 本地文件系统路径* @param dst hdfs文件系统路径* @throws IOException IOException*/@throws[IOException]def copyFromLocalFile(src: Path, dst: Path): Unit = {val dfs = getDfsdfs.copyFromLocalFile(src, dst)dfs.close()}/*** 文件上传 (完全分布式集群)** @param dfs hdfs文件系统* @param src 本地文件系统路径* @param dst hdfs文件系统路径* @throws IOException IOException*/@throws[IOException]def copyFromLocalFile(dfs: DistributedFileSystem, src: Path, dst: Path): Unit = {dfs.copyFromLocalFile(src, dst)}/*** 文件下载 (完全分布式集群)** @param src hdfs文件系统路径* @param dst 本地文件系统路径* @throws IOException IOException*/@throws[IOException]def copyToLocalFile(src: String, dst: String): Unit = {copyToLocalFile(new Path(src), new Path(dst))}/*** 文件下载 (完全分布式集群)** @param src hdfs文件系统路径* @param dst 本地文件系统路径* @throws IOException IOException*/@throws[IOException]def copyToLocalFile(src: Path, dst: Path): Unit = {val dfs = getDfsdfs.copyToLocalFile(src, dst)dfs.close()}/*** 文件下载 (完全分布式集群)** @param dfs hdfs文件系统* @param src hdfs文件系统路径* @param dst 本地文件系统路径* @throws IOException IOException*/@throws[IOException]def copyToLocalFile(dfs: DistributedFileSystem, src: Path, dst: Path): Unit = {dfs.copyToLocalFile(src, dst)}/*** 文件上传 (伪分布式集群)** @param fs hdfs文件系统* @param src 本地文件系统路径* @param dst hdfs文件系统路径* @throws IOException IOException*/@throws[IOException]def copyFromLocalFile(fs: FileSystem, src: Path, dst: Path): Unit = {fs.copyFromLocalFile(src, dst)}/*** 文件下载 (伪分布式集群)** @param fs hdfs文件系统* @param src hdfs文件系统路径* @param dst 本地文件系统路径* @throws IOException IOException*/@throws[IOException]def copyToLocalFile(fs: FileSystem, src: Path, dst: Path): Unit = {fs.copyToLocalFile(src, dst)}/*** 获取文件列表 (完全分布式集群)** @param path hdfs文件系统路径* @return FileStatus[]* @throws IOException IOException*/@throws[IOException]def getList(path: String): Array[FileStatus] = {val dfs = getDfsval fileStatuses = dfs.listStatus(new Path(path))dfs.close()fileStatuses}/*** 获取文件列表 (完全分布式集群)** @param dfs hdfs文件系统* @param path hdfs文件系统路径* @return FileStatus[]* @throws IOException IOException*/@throws[IOException]def getList(dfs: DistributedFileSystem, path: String): Array[FileStatus] = dfs.listStatus(new Path(path))/*** 获取文件列表 (伪分布式集群)** @param fs hdfs文件系统* @param path hdfs文件系统路径* @return FileStatus[]* @throws IOException IOException*/@throws[IOException]def getList(fs: FileSystem, path: String): Array[FileStatus] = fs.listStatus(new Path(path))/*** 读取完全分布式文件** @param path hdfs文件系统路径* @return 文件内容* @throws IOException IOException*/@throws[IOException]def readFile(path: String): String = {val dfs = getDfs(path)val string = readFile(dfs, path)dfs.close()string}/*** 读取完全分布式文件** @param dfs hdfs文件系统* @param path hdfs文件系统路径* @return 文件内容* @throws IOException IOException*/@throws[IOException]def readFile(dfs: DistributedFileSystem, path: String): String = {var content: String = ""val stream = dfs.open(new Path(path))val lines = Source.fromInputStream(stream)(Codec.UTF8).getLines()for (line <- lines) {content += s"$line\n"}if (content.isEmpty) {content} else {content.substring(0, content.lastIndexOf("\n"))}}/*** 写入完全分布式文件** @param path hdfs文件系统路径* @param content 文件内容* @throws IOException IOException*/@throws[IOException]def writeFile(path: String, content: String): Unit = {val dfs = getDfswriteFile(dfs, path, content)dfs.close()}/*** 写入完全分布式文件** @param dfs hdfs文件系统* @param path hdfs文件系统路径* @param content 文件内容* @throws IOException IOException*/@throws[IOException]def writeFile(dfs: DistributedFileSystem, path: String, content: String): Unit = {val fsDataOutputStream = dfs.create(new Path(path), true)fsDataOutputStream.write(content.getBytes)fsDataOutputStream.close()}/*** 读取伪分布式文件** @param path hdfs文件系统路径* @return 文件内容* @throws IOException IOException*/@throws[IOException]def getString(path: String): String = {val fs = getFs(path)val string = getString(fs, path)fs.close()string}/*** 读取伪分布式文件** @param fs hdfs文件系统路径* @param path hdfs文件系统路径* @return 文件内容* @throws IOException IOException*/@throws[IOException]def getString(fs: FileSystem, path: String) = {var content: String = ""val stream = fs.open(new Path(path))val lines = Source.fromInputStream(stream)(Codec.UTF8).getLines()for (line <- lines) {content += s"$line\n"}if (content.isEmpty) {content} else {content.substring(0, content.lastIndexOf("\n"))}}/*** 写入完全分布式文件** @param path hdfs文件系统路径* @param content 文件内容* @throws IOException IOException*/@throws[IOException]def writeString(path: String, content: String): Unit = {val fs = getFs(path)writeString(fs, path, content)fs.close()}/*** 写入伪分布式文件** @param fs hdfs文件系统* @param path hdfs文件系统路径* @param content 文件内容* @throws IOException IOException*/@throws[IOException]def writeString(fs: FileSystem, path: String, content: String): Unit = {val fsDataOutputStream = fs.create(new Path(path), true)fsDataOutputStream.write(content.getBytes)fsDataOutputStream.close()}
}
测试 HDFS 工具类
package cn.lhz.util.hadoop/*** HDFS 工具类 scala版** @author 李昊哲* @version 1.0.0 */
object HdfsUtilTest {def main(args: Array[String]): Unit = {// hdfs 创建文件上传目录HdfsUtil.mkdirs("/scala/input")HdfsUtil.mkdirs("/scala/out")// 上传本地文件到 hdfsHdfsUtil copyFromLocalFile("course.txt", "/scala/input")// 读取 hdfs 文件内容val content = HdfsUtil.readFile("/scala/input/course.txt")// 控制台数据读取内容println(content)// 将从hdfs读取到的文件内容写入到hdfs文件当中HdfsUtil writeFile("/scala/out/course.txt", content)// 将写入到hdfs的文件下载到本地文件系统HdfsUtil copyToLocalFile("/scala/out/course.txt", "course.log")println("success")}
}