文章目录
- 基础设施
- 依赖插件配置
- 编写 proto 文件
- 编译 proto 文件
- Armeria 集成 gRPC,启动服务
- 开发
- 基础设施
- 创建操作
- 读取操作
- 修改操作
- 删除操作
基础设施
依赖插件配置
Note:JDK 需要 11 及以上,Protobuf3.
import com.google.protobuf.gradle.idplugins {applicationkotlin("jvm") version "1.9.24"id("com.google.protobuf") version "0.9.3"
}group = "org.cyk"
version = "1.0-SNAPSHOT"repositories {mavenLocal()mavenCentral()
}protobuf {protoc {artifact = "com.google.protobuf:protoc:3.25.1"}plugins {id("grpc") {artifact = "io.grpc:protoc-gen-grpc-java:1.64.0"}}generateProtoTasks {all().forEach { task ->task.plugins {id("grpc")}}}
}dependencies {implementation ("com.linecorp.armeria:armeria:1.30.1")implementation ("com.linecorp.armeria:armeria-grpc:1.30.1")compileOnly("javax.annotation:javax.annotation-api:1.3.2")runtimeOnly ("ch.qos.logback:logback-classic:1.4.14")testImplementation ("org.junit.jupiter:junit-jupiter:5.10.3")testImplementation ("com.linecorp.armeria:armeria-junit5:1.30.1")testImplementation(kotlin("test"))
}tasks.test {useJUnitPlatform()
}kotlin {jvmToolchain(17)
}tasks.withType<JavaCompile> {sourceCompatibility = "17"targetCompatibility = "17"options.encoding = "UTF-8"options.isDebug = trueoptions.compilerArgs.add("-parameters")
}application {mainClass.set("MainKt")
}
编写 proto 文件
在 {project_root}/src/main/proto
下创建 blog.proto 文件,如下:
syntax = "proto3";package org.cyk.armeria.grpc.blog;
option java_package = "org.cyk.armeria.grpc.blog";
option java_multiple_files = true;import "google/protobuf/empty.proto";service BlogService {rpc CreateBlog (CreateBlogReq) returns (CreateBlogResp) {}rpc QueryBlogById (QueryBlogByIdReq) returns (QueryBlogByIdResp) {}rpc QueryBlogByIds (QueryBlogByIdsReq) returns (QueryBlogByIdsResp) {}rpc UpdateBlogById (UpdateBlogByIdReq) returns (UpdateBlogResp) {}rpc DeleteBlogById (DeleteBlogByIdReq) returns (google.protobuf.Empty) {}
}message CreateBlogReq {string title = 1;string content = 2;
}
message CreateBlogResp {Blog blog = 1;optional string errorMsg = 2;
}message QueryBlogByIdReq { // For retrieving a single postint32 id = 1;
}
message QueryBlogByIdResp {optional Blog blog = 1;
}message QueryBlogByIdsReq { // For retrieving multiple postsrepeated int32 ids = 1;
}
message QueryBlogByIdsResp {repeated Blog blogs = 1;
}message UpdateBlogByIdReq {int32 id = 1;string title = 2;string content = 3;
}
message UpdateBlogResp {Blog blog = 1;optional string errorMsg = 2;
}message DeleteBlogByIdReq {int32 id = 1;
}message Blog {int32 id = 1;string title = 2;string content = 3;int64 createdAt = 4;int64 modifiedAt = 5;
}
编译 proto 文件
Note:
protobuf-gradle-plugin
-> https://github.com/google/protobuf-gradle-plugin
gradlew 跳过测试并构建,protobuf-gradle-plugin
插件也会随之编译 proto 文件,如下命令:
gradlew build -x test
之后就可以在如下位置看到编译得到的文件:
- { project_root }/build/generated/source/proto/main/grpc
- { project_root }/build/generated/source/proto/main/java
Armeria 集成 gRPC,启动服务
1)创建 BlogServiceGrpcFacade 类,继承 BlogServiceImplBase,表示将来需要远程调用的对象,如下:
import example.armeria.blog.grpc.BlogServiceGrpc.BlogServiceImplBaseclass BlogServiceGrpcFacade: BlogServiceGrpc.BlogServiceImplBase()
2)配置 Armeria-gRPC 服务(这里没有使用 SpringBoot,默认构建 Bean)
import org.slf4j.LoggerFactory
import com.linecorp.armeria.server.Server
import com.linecorp.armeria.server.grpc.GrpcService
import service.BlogServiceGrpcFacadeobject ArmeriaGrpcBean {fun newServer(port: Int): Server {return Server.builder().http(port) // 1.配置端口号.service(GrpcService.builder().addService(BlogServiceGrpcFacade()) // 2.添加服务示例.build()).build()}
}fun main(args: Array<String>) {val log = LoggerFactory.getLogger("MainLogger")val server = ArmeriaGrpcBean.newServer(9000)server.closeOnJvmShutdown().thenRun {log.info("Server is closed ...")}server.start().join()
}
如果看到如下日志,表明服务正在运行:
22:27:51.746 [armeria-boss-http-*:9000] INFO com.linecorp.armeria.server.Server -- Serving HTTP at /[0:0:0:0:0:0:0:0]:9000 - http://127.0.0.1:9000/
开发
基础设施
1)客户端
companion object {private lateinit var stub: BlogServiceBlockingStubprivate lateinit var server: Server@JvmStatic@BeforeAllfun beforeAll() {server = ArmeriaGrpcBean.newServer(9000)server.start()//这里启动不是异步的,所以不用 Thread.sleep 等待stub = GrpcClients.newClient("http://127.0.0.1:9000/",BlogServiceBlockingStub::class.java,)}}
2)服务端
这里为了专注 Armeria-gRPC 的处理,使用 map 来替代数据库
class BlogServiceGrpcFacade: BlogServiceGrpc.BlogServiceImplBase() {// ID 生成器private val idGenerator = AtomicInteger()// 数据库private val blogRepo = ConcurrentHashMap<Int, Blog>()}
创建操作
1)服务端
override fun createBlog(request: CreateBlogReq,responseObserver: StreamObserver<CreateBlogResp>) {val id = idGenerator.getAndIncrement()val now = Instant.now()val blog = Blog.newBuilder().setId(id).setTitle(request.title).setContent(request.content).setModifiedAt(now.toEpochMilli()).setCreatedAt(now.toEpochMilli()).build()blogRepo[id] = blogval resp = CreateBlogResp.newBuilder().setBlog(blog).build()responseObserver.onNext(resp)responseObserver.onCompleted()}
2)客户端
@Testfun createBlogTest() {val req = CreateBlogReq.newBuilder().setTitle("我的博客1").setContent("今天天气真不错~").build()println("================= req send ... =================")val resp = stub.createBlog(req)println(resp.blog.title)println(resp.blog.content)println("================= resp received ... =================")}
3)效果如下:
================= req send ... =================
22:14:03.812 [Test worker] INFO com.linecorp.armeria.internal.common.JavaVersionSpecific -- Using the APIs optimized for: Java 12+
22:14:03.923 [armeria-common-worker-nio-3-3] DEBUG com.linecorp.armeria.server.HttpServerHandler -- [id: 0xd94afbdc, L:/127.0.0.1:9000 - R:/127.0.0.1:55237] HTTP/2 settings: {ENABLE_PUSH=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
22:14:03.929 [armeria-common-worker-nio-3-2] DEBUG com.linecorp.armeria.client.Http2ResponseDecoder -- [id: 0xfd4c7207, L:/127.0.0.1:55237 - R:/127.0.0.1:9000] HTTP/2 settings: {MAX_CONCURRENT_STREAMS=2147483647, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
我的博客1
今天天气真不错~
================= resp received ... =================
读取操作
单个读取:
1)服务端
override fun queryBlogById(request: QueryBlogByIdReq,responseObserver: StreamObserver<QueryBlogByIdResp>) {val resp = QueryBlogByIdResp.newBuilder()blogRepo[request.id]?.let {//这里的 it 不能为 null (proto 编译出的文件,只要 set,就不能为 null,除非你不 set)resp.setBlog(it)}// 如果不习惯, 可以对可能为空的字段滞后处理
// val blog = blogRepo[request.id]
// val resp = QueryBlogByIdResp.newBuilder().apply {
// blog?.let { setBlog(it) }
// }responseObserver.onNext(resp.build())responseObserver.onCompleted()}
2)客户端
@Test@Order(1)fun createBlogTest() {//...}@Test@Order(2)fun queryBlogByIdTest() {val req = QueryBlogByIdReq.newBuilder().setId(0).build()println("================= req send ... =================")val resp = stub.queryBlogById(req)if (resp.hasBlog()) {println(resp.blog.title)println(resp.blog.content)}println("================= req received ... =================")}
3)效果如下:
================= req send ... =================
我的博客1
今天天气真不错~
================= req received ... =================
多个读取
1)服务端
override fun queryBlogByIds(request: QueryBlogByIdsReq,responseObserver: StreamObserver<QueryBlogByIdsResp>,) {val blogs = blogRepo.filter {return@filter request.idsList.contains(it.key)}.map { it.value }val resp = QueryBlogByIdsResp.newBuilder().addAllBlogs(blogs).build()responseObserver.onNext(resp)responseObserver.onCompleted()}
2)客户端
@Testfun queryBlogByIdsTest() {// init startval q1 = CreateBlogReq.newBuilder().setTitle("blog 1").setContent("balabala").build()stub.createBlog(q1)val q2 = CreateBlogReq.newBuilder().setTitle("blog 2").setContent("balabala").build()stub.createBlog(q2)val q3 = CreateBlogReq.newBuilder().setTitle("blog 3").setContent("balabala").build()stub.createBlog(q3)// init endval req = QueryBlogByIdsReq.newBuilder().addAllIds(listOf(0,1,2)).build()println("================= req send ... =================")val resp = stub.queryBlogByIds(req)resp.blogsList.forEach {println(it.title)}println("================= req received ... =================")}
3)效果如下:
================= req send ... =================
blog 1
blog 2
blog 3
================= req received ... =================
修改操作
1)服务端
override fun updateBlogById(request: UpdateBlogByIdReq,responseObserver: StreamObserver<UpdateBlogResp>) {//这里的校验一般不再这一层做(还会有 Handler 读写分离类)val (errorMsg, beforeBlog) = checkAndGetPair(request)if (errorMsg != null) {responseObserver.onNext(UpdateBlogResp.newBuilder().setErrorMsg(errorMsg).build())responseObserver.onCompleted()return}val afterBlog = Blog.newBuilder().apply {id = beforeBlog!!.idtitle = request.titlecontent = request.content}.build()blogRepo[afterBlog.id] = afterBlogval resp = UpdateBlogResp.newBuilder().setBlog(afterBlog).build()responseObserver.onNext(resp)responseObserver.onCompleted()}private fun checkAndGetPair(req: UpdateBlogByIdReq): Pair<String?, Blog?> {val blog = blogRepo[req.id]?: return "文章不存在" to null// 如果还需要其他校验// ...return null to blog}
2)客户端
@Testfun updateBlogTest() {// init startval q1 = CreateBlogReq.newBuilder().setTitle("blog 1").setContent("balabala").build()val blogBefore = stub.createBlog(q1)// init endprintln("update before =========================>")println("title: " + blogBefore.blog.title)println("update after =========================>")val updateReq = UpdateBlogByIdReq.newBuilder().setId(0).setTitle(q1.title + " update...").setContent(q1.content + " update...").build()val blogAfter = stub.updateBlogById(updateReq)println("title: " + blogAfter.blog.title)}
3)效果如下:
update before =========================>
title: blog 1
update after =========================>
title: blog 1 update...
删除操作
1)服务端
override fun deleteBlogById(request: DeleteBlogByIdReq,responseObserver: StreamObserver<Empty>) {blogRepo.remove(request.id)responseObserver.onNext(Empty.getDefaultInstance())responseObserver.onCompleted()}
2)客户端
@Testfun deleteByIdTest() {val cq = CreateBlogReq.newBuilder().setTitle("blog").setContent("balabala ...").build()stub.createBlog(cq)val qq = QueryBlogByIdReq.newBuilder().setId(0).build()stub.queryBlogById(qq).also {assertTrue { it.hasBlog() }}val dq = DeleteBlogByIdReq.newBuilder().setId(0).build()stub.deleteBlogById(dq)stub.queryBlogById(qq).also {assertTrue { !it.hasBlog() }}}