您的位置:首页 > 健康 > 养生 > 软件培训公司排名_深圳龙岗横岗疫情通报_广告服务平台_珠海网站建设制作

软件培训公司排名_深圳龙岗横岗疫情通报_广告服务平台_珠海网站建设制作

2024/12/22 22:25:32 来源:https://blog.csdn.net/yang731227/article/details/144440313  浏览:    关键词:软件培训公司排名_深圳龙岗横岗疫情通报_广告服务平台_珠海网站建设制作
软件培训公司排名_深圳龙岗横岗疫情通报_广告服务平台_珠海网站建设制作

go zero 使用MapReduce并发

一、MapReduce 介绍

MapReduce 是一种用于并行计算的编程模型,特别适合在大规模数据处理场景中简化逻辑代码。

官方文档:

https://go-zero.dev/docs/components/mr

1. MapReduce 的核心概念

在 MapReduce 中,主要有以下三个核心步骤:
a. Generate (生成数据)

  • 数据的初始输入阶段。可以是一个简单的循环,也可以是从数据库、文件或其他来源加载数据。
    b. Mapper (映射)
  • 将输入数据映射为中间结果。通常用来过滤、转换、查询或处理数据。
    c. Reducer (归约)
  • 对映射后的数据进行汇总处理,生成最终的结果。

在 go zero 中,mr.MapReduce 的具体代码如下:


func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],opts ...Option) (V, error) {panicChan := &onceChan{channel: make(chan any)}source := buildSource(generate, panicChan)return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}

2. 为什么需要 MapReduce

在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。

比如要查询商品详情:

  • 商品服务-查询商品属性
  • 库存服务-查询库存属性
  • 价格服务-查询价格属性
  • 营销服务-查询营销属性

如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。

简单的场景下使用 WaitGroup 也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 WaitGroup 就有点力不从心了.。

二、项目构建

接下来我们使用一个文章列表功能简单的演示下

1. article数据表

这是存储文章信息的表,包含标题、内容、作者、评论数等字段。

CREATE TABLE `article` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',`title` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '标题' COLLATE 'utf8mb4_bin',`content` TEXT NOT NULL COMMENT '内容' COLLATE 'utf8_unicode_ci',`cover` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '封面' COLLATE 'utf8mb4_bin',`description` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '描述' COLLATE 'utf8mb4_bin',PRIMARY KEY (`id`)
);

2.article.api

在实际开发中,应该传入作者ID、游标、页码、排序方法等信息,这里为了方便演示就不传入参数了,API 定义如下:

syntax = "v1"type (ArticleInfo {ArticleId   int64  `json:"article_id"`Title       string `json:"title"`Content     string `json:"content"`Description string `json:"description"`Cover       string `json:"cover"`}ArticleListResponse {Articles []ArticleInfo `json:"articles"`}
)@server (prefix: /v1/article
)
service article-api {@handler Articlelisthandlerpost /list returns (ArticleListResponse)
}

三、使用 MapReduce

拉取库

go get github.com/zeromicro/go-zero/core/mr

1.实现文章列表

我们使用 MapReduce 来并行处理文章数据,

func (l *ArticlelistLogic) Articlelist() (resp *types.ArticleListResponse, err error) {// Step 1: Generate 数据//这里为了方便我使用了简单for循环产生文档IDgenerateFunc := func(source chan<- int) {for id := 1; id < 50; id++ { // 模拟文章 ID 数据source <- id}}articleModel := l.svcCtx.ArticleModel// Step 2: Mapper 映射处理mapperFunc := func(id int, writer mr.Writer[*types.ArticleInfo], cancel func(error)) {//使用产生id,查询文章详情one, err := articleModel.FindOne(l.ctx, uint64(id)) // 查找单篇文章if err != nil {return // 跳过错误}//FindOne返回的是 *model.Article类型,Mapper映射的类型为*types.ArticleInfo//所以需要转换一下articleInfo := &types.ArticleInfo{ArticleId:   int64(one.Id),Title:       one.Title,Content:     one.Content,Description: one.Description,Cover:       one.Cover,}writer.Write(articleInfo) // 写入中间结果}// Step 3: Reducer 汇总处理reduceFunc := func(pipe <-chan *types.ArticleInfo, writer mr.Writer[[]types.ArticleInfo], cancel func(error)) {var articleList []types.ArticleInfofor article := range pipe {articleList = append(articleList, *article) }writer.Write(articleList) // 写入最终结果}// 调用 MapReduce//mr.WithWorkers(5)  允许调用者自定义并发工作线程数。//如果不传入mr.WithWorkers ,默认Workers为16个reduce, err := mr.MapReduce(generateFunc, mapperFunc, reduceFunc, mr.WithWorkers(5))   if err != nil {return nil, err // 处理错误}// 返回结果return &types.ArticleListResponse{Articles: reduce,}, nil
}

在这里插入图片描述

2. 详细讲解

Step 1: Generate 数据
generateFunc 的作用是提供初始数据。在本例中,我们通过一个循环生成了文章的 ID:

generateFunc := func(source chan<- int) {for id := 1; id < 50; id++ {source <- id}
}

Step 2: Mapper 映射处理
mapperFunc 用于处理每一个文章 ID,并将其转换为 ArticleInfo

  • 使用 articleModel.FindOne 从数据库中获取文章数据。
  • 如果获取失败,跳过该 ID。
  • 将结果通过 writer.Write 写入到下一步。
mapperFunc := func(id int, writer mr.Writer[*types.ArticleInfo], cancel func(error)) {one, err := articleModel.FindOne(l.ctx, uint64(id))if err != nil {return}articleInfo := &types.ArticleInfo{ArticleId:   int64(one.Id),Title:       one.Title,Content:     one.Content,Description: one.Description,Cover:       one.Cover,}writer.Write(articleInfo)
}

Step 3: Reducer 汇总处理
reduceFuncmapperFunc 的结果汇总为最终的 []types.ArticleInfo

  • 遍历管道中的每个 *types.ArticleInfo
  • 将解引用后的 ArticleInfo 添加到结果列表。
reduceFunc := func(pipe <-chan *types.ArticleInfo, writer mr.Writer[[]types.ArticleInfo], cancel func(error)) {var articleList []types.ArticleInfofor article := range pipe {articleList = append(articleList, *article)}writer.Write(articleList)
}

3. 测试运行

/v1/article/list 发送 POST 请求:

curl -X POST http://localhost:8888/v1/article/list

运行结果如下:

{"articles": [{"article_id": 1,"title": "标题1","content": "这是内容1","description": "描述1","cover": "封面1.jpg"},...]
}

4.效率对比

普通循环

为了更直观的对比效率,我们使用普通循环再次实现下文章列表:

func (l *ArticlelistLogic) Articlelist() (resp *types.ArticleListResponse, err error) {// todo: add your logic here and delete this linetime1 := time.Now()var articleList []types.ArticleInfoarticleModel := l.svcCtx.ArticleModelfor id := 1; id < 50; id++ {article, _ := articleModel.FindOne(l.ctx, uint64(id))articleInfo := types.ArticleInfo{ArticleId:   int64(article.Id),Title:       article.Title,Content:     article.Content,Description: article.Description,Cover:       article.Cover,}articleList = append(articleList, articleInfo)}time2 := time.Now()logx.Info("执行时间为:", time2.Sub(time1))return &types.ArticleListResponse{Articles: articleList,}, nil}

效率对比

这个执行时间可能每次都不一样,但是进过多次对比, 使用mapreduce 效率是高于普通方法的

使用串行调用时间:
在这里插入图片描述

使用MapReduce消耗时间:

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com