您的位置:首页 > 娱乐 > 明星 > 怎样搭建大型企业网络_excel小程序商店_交换友链是什么意思_网站制作流程和方法

怎样搭建大型企业网络_excel小程序商店_交换友链是什么意思_网站制作流程和方法

2025/2/25 9:41:09 来源:https://blog.csdn.net/weixin_74994990/article/details/144199373  浏览:    关键词:怎样搭建大型企业网络_excel小程序商店_交换友链是什么意思_网站制作流程和方法
怎样搭建大型企业网络_excel小程序商店_交换友链是什么意思_网站制作流程和方法
  1. 语音识别子服务(speech文件)

在七个模块中语音识别子服务最为独立,也最为简单,下面是他的接口,只有一个业务,就是将语音文件转换成文字消息,主要还是调用了阿里云的语音识别SDK进行

service SpeechService {

 rpc SpeechRecognition(SpeechRecognitionReq) returns 

(SpeechRecognitionRsp); }

语音识别接口:从request里面获取用户ID和语音文件,然后调用阿里云的语音识别接口,获得对应的文本信息,将文本消息放入request里面就结束

void SpeechRecognition(google::protobuf::RpcController* controller,

                       const ::zhou::SpeechRecognitionReq* request,

                       ::zhou::SpeechRecognitionRsp* response,

                       ::google::protobuf::Closure* done)

                       {

                        LOG_DEBUG("收到语音转文字请求!");

                       brpc::ClosureGuard rpc_guard(done);

                       std::string err ;

                       std::string res = _asr_client->recognize(request->speech_content(), err);

                       if(res.empty())

                       {

                        LOG_ERROR("{}语音识别失败", request->request_id());

                        response->set_request_id(request->request_id());

                        response->set_success(false);

                        response->set_errmsg("语音识别失败:"+err);

                        return ;

                       }

                       response->set_request_id(request->request_id());

                        response->set_success(true);

                        response->set_recognition_result(res);

                       }

  1. 消息转发子服务(transmite文件)

消息转发子服务就只有一个接口,但是却是项目中的核心功能,主要是利用rabbitmq将消息转发给消息管理子服务让其进行持久化

service MsgTransmitService {

 rpc GetTransmitTarget(NewMessageReq) returns 

(GetTransmitTargetRsp);

}

消息转发接口:从request里面获得用户ID,会话ID,获得消息的正文(采用MessageContent结构进行接收),通过用户子服务获取用户的个人信息,然后再新建一条message结构,将message的个人信息,content,时间,会话ID,进行添加好以后,再将消息放入rabbitmq的发布机里面,实现消息的一个生产(后面的消费是消息子服务的工作),再将消息内容和需要转发的人的uid放入response里面,结束

void GetTransmitTarget(google::protobuf::RpcController* controller,

                       const ::zhou::NewMessageReq* request,

                       ::zhou::GetTransmitTargetRsp* response,

                       ::google::protobuf::Closure* done) override {

            brpc::ClosureGuard rpc_guard(done);

            auto err_response = [this, response](const std::string &rid,

                const std::string &errmsg) -> void {

                response->set_request_id(rid);

                response->set_success(false);

                response->set_errmsg(errmsg);

                return;

            };

            //从请求中获取关键信息:用户ID,所属会话ID,消息内容

            std::string rid = request->request_id();

            std::string uid = request->user_id();

            std::string chat_ssid = request->chat_session_id();

            const MessageContent &content = request->message();

            // 进行消息组织:发送者-用户子服务获取信息,所属会话,消息内容,产生时间,消息ID

            auto channel = _mm_channels->choose(_user_service_name);

            if (!channel) {

                LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);

                return err_response(rid, "没有可供访问的用户子服务节点!");

            }

            UserService_Stub stub(channel.get());

            GetUserInfoReq req;

            GetUserInfoRsp rsp;

            req.set_request_id(rid);

            req.set_user_id(uid);

            brpc::Controller cntl;

            stub.GetUserInfo(&cntl, &req, &rsp, nullptr);

            if (cntl.Failed() == true || rsp.success() == false) {

                LOG_ERROR("{} - 用户子服务调用失败:{}!", request->request_id(), cntl.ErrorText());

                return err_response(request->request_id(), "用户子服务调用失败!");

            }

            MessageInfo message;

            message.set_message_id(uuid());

            message.set_chat_session_id(chat_ssid);

            message.set_timestamp(time(nullptr));

            message.mutable_sender()->CopyFrom(rsp.user_info());

            message.mutable_message()->CopyFrom(content);

            // 获取消息转发客户端用户列表

            auto target_list = _mysql_session_member_table->members(chat_ssid);

            // 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化

            bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);

            if (ret == false) {

                LOG_ERROR("{} - 持久化消息发布失败:{}!", request->request_id(), cntl.ErrorText());

                return err_response(request->request_id(), "持久化消息发布失败:!");

            }

            //组织响应

            response->set_request_id(rid);

            response->set_success(true);

            response->mutable_message()->CopyFrom(message);

            for (const auto &id : target_list) {

                response->add_target_id_list(id);

            }

        }

  1. 消息管理子服务(message文件)

消息管理子服务功能主要是包括消息的管理,查询和消费,虽然接口不多,但都是核心功能

service MsgStorageService {

 rpc GetHistoryMsg(GetHistoryMsgReq) returns 

(GetHistoryMsgRsp);  rpc GetRecentMsg(GetRecentMsgReq) returns (GetRecentMsgRsp);

 rpc MsgSearch(MsgSearchReq) returns (MsgSearchRsp);

 

在介绍三个接口之前,先介绍一个关键的函数(消息消费函数)onMessage:因为消息是四种类型,而我们对每种消息的存储处理不一样,并且在消息转发子服务中,我们只是将消息添加到rabbitmq发送队列,并没有对消息进行消费,现在要在消息子服务中对消息进行消费,所以存在这个函数,首先,我们从对从rabbitmq里面获取的消息进行序列化得到MessageInfo类型,然后通过message.message().message_type()函数获得该消息是什么类型的,若是普通文本类型,就将其存入ES搜索引擎内部,如果是图片类型,则将其上传到文件管理子服务处,并得到图片的文件ID,如果是文件类型,也上传到文字子服务处,获得对应的文件名,文件ID,文件大小,如果是语音文件就上传到文件子服务,得到对应的文件ID,

然后无论是什么类型的消息,最后都会被抽象化为message结构,填充文件ID,会话ID,文件类型,产生时间,消息内容(只有文本消息有),文件ID,文件名,文件大小(三者只有大类文件消息有),然后再将这个结构插入数据库,而这个函数的本质是一个回调函数,当消息转发服务器发送消息时,这边就会接收到发布的消息然后调用这个函数,让这个函数对消息进行持久化(存入数据库的搜索引擎),这这个函数是在初始化这个调用服务时被绑定

也就是这里

MessageServiceImpl *msg_service = new MessageServiceImpl(_es_client,

                _mysql_client, _mm_channels, _file_service_name, _user_service_name);

            int ret = _rpc_server->AddService(msg_service,

                brpc::ServiceOwnership::SERVER_OWNS_SERVICE);

            if (ret == -1) {

                LOG_ERROR("添加Rpc服务失败!");

                abort();

            }

            brpc::ServerOptions options;

            options.idle_timeout_sec = timeout;

            options.num_threads = num_threads;

            ret = _rpc_server->Start(port, &options);

            if (ret == -1) {

                LOG_ERROR("服务启动失败!");

                abort();

            }

           

            auto callback = std::bind(&MessageServiceImpl::onMessage, msg_service,

                std::placeholders::_1, std::placeholders::_2);

            _mq_client->consume(_queue_name, callback);

上面是绑定onmessage的关键代码,下面是onmessage的实现,二者并不连接

void onMessage(const char *body, size_t sz) {

            LOG_DEBUG("收到新消息,进行存储处理!");

            //1. 取出序列化的消息内容,进行反序列化

            zhou::MessageInfo message;

            bool ret = message.ParseFromArray(body, sz);

            if (ret == false) {

                LOG_ERROR("对消费到的消息进行反序列化失败!");

                return;

            }

            //2. 根据不同的消息类型进行不同的处理

            std::string file_id, file_name, content;

            int64_t file_size;

            switch(message.message().message_type()) {

                //  1. 如果是一个文本类型消息,取元信息存储到ES中

                case MessageType::STRING:

                    content = message.message().string_message().content();

                    ret = _es_message->appendData(

                        message.sender().user_id(),

                        message.message_id(),

                        message.timestamp(),

                        message.chat_session_id(),

                        content);

                    if (ret == false) {

                        LOG_ERROR("文本消息向存储引擎进行存储失败!");

                        return;

                    }

                    break;

                //  2. 如果是一个图片/语音/文件消息,则取出数据存储到文件子服务中,并获取文件ID

                case MessageType::IMAGE:

                    {

                        const auto &msg = message.message().image_message();

                        ret = _PutFile("", msg.image_content(), msg.image_content().size(), file_id);

                        if (ret == false) {

                            LOG_ERROR("上传图片到文件子服务失败!");

                            return ;

                        }

                    }

                    break;

                case MessageType::FILE:

                    {

                        const auto &msg = message.message().file_message();

                        file_name = msg.file_name();

                        file_size = msg.file_size();

                        ret = _PutFile(file_name, msg.file_contents(), file_size, file_id);

                        if (ret == false) {

                            LOG_ERROR("上传文件到文件子服务失败!");

                            return ;

                        }

                    }

                    break;

                case MessageType::SPEECH:

                    {

                        const auto &msg = message.message().speech_message();

                        ret = _PutFile("", msg.file_contents(), msg.file_contents().size(), file_id);

                        if (ret == false) {

                            LOG_ERROR("上传语音到文件子服务失败!");

                            return ;

                        }

                    }

                    break;

                default:

                    LOG_ERROR("消息类型错误!");

                    return;

            }

            //3. 提取消息的元信息,存储到mysql数据库中

            zhou::Message msg(message.message_id(),

                message.chat_session_id(),

                message.sender().user_id(),

                message.message().message_type(),

                boost::posix_time::from_time_t(message.timestamp()));

            msg.content(content);

            msg.file_id(file_id);

            msg.file_name(file_name);

            msg.file_size(file_size);

            ret = _mysql_message->insert(msg);

            if (ret == false) {

                LOG_ERROR("向数据库插入新消息失败!");

                return;

            }

        }

  1. 获取历史消息的接口设计:先从request里面获得会话ID,用户ID,起始时间,结束时间,然后再数据库中进行查询,得到对应的消息列表,在从消息列表中剥离文件消息,获取所有需要下载的文件消息的ID,再通过文件子服务获得所有文件ID对应的文件,然后再从消息的列表里面获取对应的userid,通过用户子服务来获取所有的用户信息,然后通过遍历消息列表,通过里面的message_id的索引方式来按照时间顺序将消息(无论是文件消息还是文本消息都按照顺序返回,还是由于因为我们获取批量文件是,使用map进行装载吗,而索引便是文件对应的消息ID)

 virtual void GetHistoryMsg(::google::protobuf::RpcController* controller,

            const ::zhou::GetHistoryMsgReq* request,

            ::zhou::GetHistoryMsgRsp* response,

            ::google::protobuf::Closure* done) {

            brpc::ClosureGuard rpc_guard(done);

            auto err_response = [this, response](const std::string &rid,

                const std::string &errmsg) -> void {

                response->set_request_id(rid);

                response->set_success(false);

                response->set_errmsg(errmsg);

                return;

            };

            //1. 提取关键要素:会话ID,起始时间,结束时间

            std::string rid = request->request_id();

            std::string chat_ssid = request->chat_session_id();

            boost::posix_time::ptime stime = boost::posix_time::from_time_t(request->start_time());

            boost::posix_time::ptime etime = boost::posix_time::from_time_t(request->over_time());

            //2. 从数据库中进行消息查询

            auto msg_lists = _mysql_message->range(chat_ssid, stime, etime);

            if (msg_lists.empty()) {

                response->set_request_id(rid);

                response->set_success(true);

                return ;

            }

            //3. 统计所有文件类型消息的文件ID,并从文件子服务进行批量文件下载

            std::unordered_set<std::string> file_id_lists;

            for (const auto &msg : msg_lists) {

                if (msg.file_id().empty()) continue;

                LOG_DEBUG("需要下载的文件ID: {}", msg.file_id());

                file_id_lists.insert(msg.file_id());

            }

            std::unordered_map<std::string, std::string> file_data_lists;

            bool ret = _GetFile(rid, file_id_lists, file_data_lists);

            if (ret == false) {

                LOG_ERROR("{} 批量文件数据下载失败!", rid);

                return err_response(rid, "批量文件数据下载失败!");

            }

            //4. 统计所有消息的发送者用户ID,从用户子服务进行批量用户信息获取

            std::unordered_set<std::string> user_id_lists; //

            for (const auto &msg : msg_lists) {

                user_id_lists.insert(msg.user_id());

            }

            std::unordered_map<std::string, UserInfo> user_lists;

            ret = _GetUser(rid, user_id_lists, user_lists);

            if (ret == false) {

                LOG_ERROR("{} 批量用户数据获取失败!", rid);

                return err_response(rid, "批量用户数据获取失败!");

            }

            //5. 组织响应

            response->set_request_id(rid);

            response->set_success(true);

            for (const auto &msg : msg_lists) {

                auto message_info = response->add_msg_list();

                message_info->set_message_id(msg.message_id());

                message_info->set_chat_session_id(msg.session_id());

                message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));

                message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);

                switch(msg.message_type()) {

                    case MessageType::STRING:

                        message_info->mutable_message()->set_message_type(MessageType::STRING);

                        message_info->mutable_message()->mutable_string_message()->set_content(msg.content());

                        break;

                    case MessageType::IMAGE:

                        message_info->mutable_message()->set_message_type(MessageType::IMAGE);

                        message_info->mutable_message()->mutable_image_message()->set_file_id(msg.file_id());

                        message_info->mutable_message()->mutable_image_message()->set_image_content(file_data_lists[msg.file_id()]);

                        break;

                    case MessageType::FILE:

                        message_info->mutable_message()->set_message_type(MessageType::FILE);

                        message_info->mutable_message()->mutable_file_message()->set_file_id(msg.file_id());

                        message_info->mutable_message()->mutable_file_message()->set_file_size(msg.file_size());

                        message_info->mutable_message()->mutable_file_message()->set_file_name(msg.file_name());

                        message_info->mutable_message()->mutable_file_message()->set_file_contents(file_data_lists[msg.file_id()]);

                        break;

                    case MessageType::SPEECH:

                        message_info->mutable_message()->set_message_type(MessageType::SPEECH);

                        message_info->mutable_message()->mutable_speech_message()->set_file_id(msg.file_id());

                        message_info->mutable_message()->mutable_speech_message()->set_file_contents(file_data_lists[msg.file_id()]);

                        break;

                    default:

                        LOG_ERROR("消息类型错误!!");

                        return;

                }

            }

            return;

        }

  1. 获取最近消息的接口:这个的逻辑和上面通过时间获取消息的接口逻辑大差不差,主要是这个接口是从最近开始的时间起算,然后你传入需要的消息条数,然后忘ES搜索引擎和数据库中进行查询,然后调用文件子服务和用户子服务,得到消息和用户信息以后填入response,进行返回

virtual void GetRecentMsg(::google::protobuf::RpcController* controller,

            const ::zhou::GetRecentMsgReq* request,

            ::zhou::GetRecentMsgRsp* response,

            ::google::protobuf::Closure* done) {

            brpc::ClosureGuard rpc_guard(done);

            auto err_response = [this, response](const std::string &rid,

                const std::string &errmsg) -> void {

                response->set_request_id(rid);

                response->set_success(false);

                response->set_errmsg(errmsg);

                return;

            };

            //1. 提取请求中的关键要素:请求ID,会话ID,要获取的消息数量

            std::string rid = request->request_id();

            std::string chat_ssid = request->chat_session_id();

            int msg_count = request->msg_count();

            //2. 从数据库,获取最近的消息元信息

            auto msg_lists = _mysql_message->recent(chat_ssid, msg_count);

            if (msg_lists.empty()) {

                response->set_request_id(rid);

                response->set_success(true);

                return ;

            }

            //3. 统计所有消息中文件类型消息的文件ID列表,从文件子服务下载文件

            std::unordered_set<std::string> file_id_lists;

            for (const auto &msg : msg_lists) {

                if (msg.file_id().empty()) continue;

                LOG_DEBUG("需要下载的文件ID: {}", msg.file_id());

                file_id_lists.insert(msg.file_id());

            }

            std::unordered_map<std::string, std::string> file_data_lists;

            bool ret = _GetFile(rid, file_id_lists, file_data_lists);

            if (ret == false) {

                LOG_ERROR("{} 批量文件数据下载失败!", rid);

                return err_response(rid, "批量文件数据下载失败!");

            }

            //4. 统计所有消息的发送者用户ID,从用户子服务进行批量用户信息获取

            std::unordered_set<std::string> user_id_lists;

            for (const auto &msg : msg_lists) {

                user_id_lists.insert(msg.user_id());

            }

            std::unordered_map<std::string, UserInfo> user_lists;

            ret = _GetUser(rid, user_id_lists, user_lists);

            if (ret == false) {

                LOG_ERROR("{} 批量用户数据获取失败!", rid);

                return err_response(rid, "批量用户数据获取失败!");

            }

            //5. 组织响应

            response->set_request_id(rid);

            response->set_success(true);

            for (const auto &msg : msg_lists) {

                auto message_info = response->add_msg_list();

                message_info->set_message_id(msg.message_id());

                message_info->set_chat_session_id(msg.session_id());

                message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));

                message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);

                switch(msg.message_type()) {

                    case MessageType::STRING:

                        message_info->mutable_message()->set_message_type(MessageType::STRING);

                        message_info->mutable_message()->mutable_string_message()->set_content(msg.content());

                        break;

                    case MessageType::IMAGE:

                        message_info->mutable_message()->set_message_type(MessageType::IMAGE);

                        message_info->mutable_message()->mutable_image_message()->set_file_id(msg.file_id());

                        message_info->mutable_message()->mutable_image_message()->set_image_content(file_data_lists[msg.file_id()]);

                        break;

                    case MessageType::FILE:

                        message_info->mutable_message()->set_message_type(MessageType::FILE);

                        message_info->mutable_message()->mutable_file_message()->set_file_id(msg.file_id());

                        message_info->mutable_message()->mutable_file_message()->set_file_size(msg.file_size());

                        message_info->mutable_message()->mutable_file_message()->set_file_name(msg.file_name());

                        message_info->mutable_message()->mutable_file_message()->set_file_contents(file_data_lists[msg.file_id()]);

                        break;

                    case MessageType::SPEECH:

                        message_info->mutable_message()->set_message_type(MessageType::SPEECH);

                        message_info->mutable_message()->mutable_speech_message()->set_file_id(msg.file_id());

                        message_info->mutable_message()->mutable_speech_message()->set_file_contents(file_data_lists[msg.file_id()]);

                        break;

                    default:

                        LOG_ERROR("消息类型错误!!");

                        return;

                }

            }

            return;

        }

  1. 消息查询接口:搜索消息接口和上面的两个接口也是差不多的,主要的区别是这次消息的搜索是在ES里面对文本消息进行搜索,得到以后通过用户子服务得到用户的个人信息,然后对response进行填充,完成

virtual void MsgSearch(::google::protobuf::RpcController* controller,

            const ::zhou::MsgSearchReq* request,

            ::zhou::MsgSearchRsp* response,

            ::google::protobuf::Closure* done) {

            brpc::ClosureGuard rpc_guard(done);

            auto err_response = [this, response](const std::string &rid,

                const std::string &errmsg) -> void {

                response->set_request_id(rid);

                response->set_success(false);

                response->set_errmsg(errmsg);

                return;

            };

            //关键字的消息搜索--只针对文本消息

            //1. 从请求中提取关键要素:请求ID,会话ID, 关键字

            std::string rid = request->request_id();

            std::string chat_ssid = request->chat_session_id();

            std::string skey = request->search_key();

            //2. 从ES搜索引擎中进行关键字消息搜索,得到消息列表

            auto msg_lists = _es_message->search(skey, chat_ssid);

            if (msg_lists.empty()) {

                response->set_request_id(rid);

                response->set_success(true);

                return ;

            }

            //3. 组织所有消息的用户ID,从用户子服务获取用户信息

            std::unordered_set<std::string> user_id_lists;

            for (const auto &msg : msg_lists) {

                user_id_lists.insert(msg.user_id());

            }

            std::unordered_map<std::string, UserInfo> user_lists;

            bool ret = _GetUser(rid, user_id_lists, user_lists);

            if (ret == false) {

                LOG_ERROR("{} 批量用户数据获取失败!", rid);

                return err_response(rid, "批量用户数据获取失败!");

            }

            //4. 组织响应

            response->set_request_id(rid);

            response->set_success(true);

            for (const auto &msg : msg_lists) {

                auto message_info = response->add_msg_list();

                message_info->set_message_id(msg.message_id());

                message_info->set_chat_session_id(msg.session_id());

                message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));

                message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);

                message_info->mutable_message()->set_message_type(MessageType::STRING);

                message_info->mutable_message()->mutable_string_message()->set_content(msg.content());

            }

            return;

        }

在消息管理子服务中,最重要的就是消息的消费和持久化,然后是对消息的获取和查询工作

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com