您的位置:首页 > 健康 > 美食 > 广州最新动态_品牌查询网站_百度搜索引擎使用技巧_北京优化网站推广

广州最新动态_品牌查询网站_百度搜索引擎使用技巧_北京优化网站推广

2025/2/26 15:21:24 来源:https://blog.csdn.net/X_StarX/article/details/144690385  浏览:    关键词:广州最新动态_品牌查询网站_百度搜索引擎使用技巧_北京优化网站推广
广州最新动态_品牌查询网站_百度搜索引擎使用技巧_北京优化网站推广

RDD:

// 导入SparkConf和SparkContext类,用于配置和创建Spark上下文
import org.apache.spark.{SparkConf, SparkContext}// 定义一个名为TopN的对象
object TopN {def main(args: Array[String]): Unit = {// 创建一个新的SparkConf对象,并设置应用程序名称为"TopN",主节点为"local"val conf = new SparkConf().setAppName("TopN").setMaster("local")val sc = new SparkContext(conf)// 设置日志级别为ERROR,以减少输出的信息量sc.setLogLevel("ERROR")// 从HDFS读取数据,使用2个分区val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/mycode/rdd/examples", 2)  // 初始化计数器var num = 0// 对读取的数据进行一系列转换操作:// 1. 过滤掉空行或长度为0的行// 2. 过滤掉不能被逗号分割成4部分的行// 3. 将每一行按逗号分割成两部分// 4. 将分割后的第二部分转换为整数// 5. 按照第二部分(整数)降序排序// 6. 取排序后的前5个元素// 7. 遍历这5个元素,打印其索引和值val result = lines.filter(line => (line.trim.length > 0) && (line.split(",").length == 4)).map(_.split(",")(2)).map(x => (x.toInt,"")).sortByKey(false).map(x => x._1).take(5).foreach(x => {num = num + 1println(num + "\t" + x)})}
}


import org.apache.spark.{SparkConf, SparkContext}
object MaxAndMin {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MaxAndMin").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/chapter5", 2)// 对读取的数据进行一系列转换操作:// 1. 过滤掉空行或长度为0的行// 2. 将每一行按逗号分割成两部分:键(key)和值(value)// 3. 按键分组// 4. 计算每个键对应的最大值和最小值// 5. 收集结果并打印val result = lines.filter(_.trim.length > 0).map(line => ("key", line.trim.toInt)).groupByKey().map(x => {var min = Integer.MAX_VALUEvar max = Integer.MIN_VALUEfor (num <- x._2) {if (num > max) {max = num}if (num < min) {min = num}}(max, min)}).collect().foreach(x => {println("max\t" + x._1)println("min\t" + x._2)})}
}

案例3:文件排序

任务描述:

有多个输入文件,每个文件中的每一行内容均为一个整数。要求读取所有文件中的整数,进行排序后,输出到一个新的文件中,输出的内容个数为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排序的整数。

                                    


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitioner
object FileSort {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FileSort")val sc = new SparkContext(conf)// 设置数据文件路径val dataFile = "file:///usr/local/spark/mycode/rdd/data"// 从指定路径读取数据文件,使用3个分区val lines = sc.textFile(dataFile, 3)// 初始化索引变量var index = 0// 对读取的数据进行一系列转换操作:// 1. 过滤掉空行或长度为0的行// 2. 将每一行按逗号分割成两部分:键(key)和值(value)// 3. 使用HashPartitioner进行分区// 4. 按键排序// 5. 添加索引并重新组合结果val result = lines.filter(_.trim.length > 0).map(n => (n.trim.toInt, "")).partitionBy(new HashPartitioner(1)).sortByKey().map(t => {index += 1(index, t._1)})// 将处理后的结果保存到指定路径result.saveAsTextFile("file:///usr/local/spark/mycode/rdd/examples/result")}
}

案例4:二次排序

任务要求:

对于一个给定的文件(数据如file1.txt所示),请对数据进行排序,首先根据第1列数据降序排序,如果第1列数据相等,则根据第2列数据降序排序。

// 定义一个SecondarySortKey类,用于实现自定义排序逻辑
package cn.edu.xmu.sparkclass SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {// 实现compare方法,用于比较两个SecondarySortKey对象def compare(other: SecondarySortKey): Int = {if (this.first - other.first != 0) {this.first - other.first} else {this.second - other.second}}
}// 定义一个SecondarySortApp对象,用于执行主程序
object SecondarySortApp {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")// 创建SparkContext上下文val sc = new SparkContext(conf)    // 读取文件数据val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/examples/file1.txt", 1)  // 将每行数据转换为SecondarySortKey对象val pairWithSortKey = lines.map(line => new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt))// 按照SecondarySortKey进行排序val sorted = pairWithSortKey.sortByKey(false)// 提取排序后的结果val sortedResult = sorted.map(sortedLine => sortedLine._2)// 打印排序结果sortedResult.collect().foreach(println)}
}

案例五:连接操作

任务描述:在推荐领域有一个著名的开放测试集,下载链接是:http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读:README.txt。请编程实现:通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m

import org.apache.spark._
import org.apache.spark.SparkContext._object SparkJoin {def main(args: Array[String]): Unit = {// 检查命令行参数的数量是否正确,确保提供三个参数:评分文件路径、电影文件路径、输出路径if (args.length != 3) {println("usage is SparkJoin <rating> <movie> <output>")return}val conf = new SparkConf().setAppName("SparkJoin").setMaster("local")// 创建Spark上下文对象val sc = new SparkContext(conf)try {// 从HDFS文件系统读取评分数据val textFile = sc.textFile(args(0))// 提取(movieId, rating)键值对val ratings = textFile.map(line => {val fields = line.split("::")(fields(1).toInt, fields(2).toDouble)})// 计算每个电影的平均评分val movieScores = ratings.groupByKey() // 将相同电影ID的评分组合在一起.map(data => { // 对每个电影ID的评分组计算平均值val avg = data._2.sum / data._2.size(data._1, avg)})// 从HDFS文件系统读取电影数据val movies = sc.textFile(args(1))// 提取(MovieID, MovieName)键值对,并基于MovieID创建键值对val moviesKey = movies.map(line => {val fields = line.split("::")(fields(0).toInt, fields(1)) // (MovieID, MovieName)}).keyBy(tup => tup._1)// 通过join操作合并电影评分和电影信息,过滤出平均评分大于4.0的电影,并格式化输出val result = movieScores.keyBy(tup => tup._1) // 基于电影ID创建键值对.join(moviesKey) // 将评分与电影信息进行连接.filter(f => f._2._1._2 > 4.0) // 过滤出平均评分大于4.0的电影.map(f => (f._1, f._2._1._2, f._2._2._2)) // 格式化为 (MovieID, AverageRating, MovieName)// 将结果保存到指定的输出路径result.saveAsTextFile(args(2))} finally {// 确保在程序结束时停止Spark上下文sc.stop()}}
}

wordcount两道:

MapReduce实现wordcount

package org.apache.hadoop.examples;import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {// 默认构造函数,无参数。public WordCount() {}public static void main(String[] args) throws Exception {// 创建一个配置对象,用于读取命令行参数和配置文件。Configuration conf = new Configuration();// 解析命令行参数,并将非Hadoop通用选项的参数分离出来。String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();// 检查输入参数是否正确。至少需要两个参数:一个或多个输入路径和一个输出路径。if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}// 初始化一个新的MapReduce作业,并设置其名称为"word count"。Job job = Job.getInstance(conf, "word count");// 设置包含main方法的类作为作业的主类,以便找到相关的Mapper、Reducer和其他资源。job.setJarByClass(WordCount.class);// 设置Mapper类,它负责处理输入数据并生成中间键值对。job.setMapperClass(TokenizerMapper.class);// 设置Combiner类(可选),它在映射阶段后立即对中间结果进行局部聚合,以减少传输的数据量。job.setCombinerClass(IntSumReducer.class);// 设置Reducer类,它负责接收来自Mapper的中间键值对,并执行最终的聚合操作。job.setReducerClass(IntSumReducer.class);// 定义作业的输出格式,指定键和值的类型。job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 添加所有提供的输入路径到作业中。最后一个参数总是作为输出路径。for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}// 设置输出目录,该目录必须不存在。FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 提交作业并等待完成,成功返回0,失败返回1。System.exit(job.waitForCompletion(true) ? 0 : 1);}// Reducer类用于汇总每个单词出现的次数。public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;// 遍历所有的IntWritable值,累加它们以计算总和。for (IntWritable val : values) {sum += val.get(); // 将IntWritable转换为原始int类型}// 设置sum到result中,以便可以序列化。result.set(sum);// 输出<key, result>对到context,即单词及其出现的次数。context.write(key, result);}}// Mapper类负责将输入文本拆分为单词,并为每个单词生成一个计数为1的键值对。public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1); // 代表每个单词出现一次private Text word = new Text(); // 用于存储当前处理的单词@Overrideprotected void map(Object key, Text value, Context context)throws IOException, InterruptedException {// 使用StringTokenizer来分割文本行中的单词。StringTokenizer itr = new StringTokenizer(value.toString());// 对于每一个单词,创建一个<单词, 1>键值对并写入到context中。while (itr.hasMoreTokens()) {word.set(itr.nextToken()); // 设置当前单词context.write(word, one); // 写入键值对到context中}}}
}

Spark SQL实现wordcount

package com.ht.final.wordcountimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkConfobject WordCountSparkSQL {def main(args: Array[String]): Unit = {// 初始化Spark配置,并创建一个本地模式的SparkSession。// local[*]表示使用所有可用的处理器核心来运行任务。val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")val spark = SparkSession.builder.config(sparkConf).getOrCreate()// 设置日志级别为警告,减少控制台输出的日志信息量。spark.sparkContext.setLogLevel("WARN")import spark.implicits._ // 导入隐式转换,用于支持DataFrame/Dataset操作。try {// 读取文本文件内容到Dataset中,每行作为一个字符串元素。val fileDS: Dataset[String] = spark.read.textFile("D:\\Document\\temp\\wordcount\\input.txt")// 将每一行按照制表符('\t')分割成多个单词,形成一个新的包含单个单词的Dataset。val wordDS: Dataset[String] = fileDS.flatMap(_.split("\t"))// 注册临时视图(类似数据库表),以便能够通过SQL查询访问数据。wordDS.createOrReplaceTempView("word_count")// 定义SQL查询语句,计算每个单词出现的次数,并按出现次数降序排列。val sqlQuery = "SELECT value AS word, COUNT(*) AS counts FROM word_count GROUP BY word ORDER BY counts DESC"// 执行SQL查询并获取结果作为DataFrame。val resultDF: DataFrame = spark.sql(sqlQuery)// 展示查询结果的前20行,默认情况下。resultDF.show()} finally {// 确保在程序结束时关闭资源。spark.stop()}}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com