第八章:虚拟机模块的整合

news/2025/2/26 17:49:13

目录

第一节:代码实现

        1-1.前置代码

        1-2.成员变量

        1-3.构造函数

        1-4.交换机接口

        1-5.队列接口

        1-6.绑定接口

        1-7.消息接口

 

        1-8.其他功能接口

第二节:单元测试

下期预告: 


        虚拟机模块在mqserver目录下实现。

第一节:代码实现

        虚拟机的各种接口其实就是直接封装各个管理模块的接口。

        1-1.前置代码

        防重复包含、头文件、和命名空间:

#ifndef __M_VIRHOST_H__
#define __M_VIRHOST_H__

#include "mq_exchange.hpp"
#include "mq_msgqueue.hpp"
#include "mq_binding.hpp"
#include "mq_message.hpp" 

namespace zd
{};

#endif

        1-2.成员变量

        虚拟机由四个模块组合而成,它通过这4个模块的句柄管理交换机、队列、绑定和消息:

    class VirtualHost
    {
        public:
            using ptr = std::shared_ptr<VirtualHost>;    
        private:
            std::string _virhostname;   // 虚拟机的名字
            ExchangeManager::ptr _emp;  // 交换机管理句柄
            MsgQueueManager::ptr _qmp;  // 队列管理句柄
            BindingManager::ptr _bmp;   // 绑定管理句柄
            MessageManager::ptr _mmp;   // 消息管理句柄
    }

        1-3.构造函数

        virhostname:虚拟机的名字,无实际意义

        base_dir:存放持久化消息的目录

        dbfile:sqlite文件的路径,这个文件保存持久化的交换机、队列、绑定信息

        拉取历史队列信息之后,根据已经存在的队列拉取各自的历史消息。

            VirtualHost(const std::string& virhostname,const std::string& base_dir,const std::string& dbfile):
            _virhostname(virhostname),
            _emp(std::make_shared<ExchangeManager>(dbfile)),
            _qmp(std::make_shared<MsgQueueManager>(dbfile)),
            _bmp(std::make_shared<BindingManager>(dbfile)),
            _mmp(std::make_shared<MessageManager>(base_dir))
            {
                // 获取所有队列信息,通过队列名称恢复历史消息
                MsgQueueMap qm = _qmp->selectAll();
                for(const auto& it:qm)
                {
                    _mmp->initQueueMesssge(it.first);
                }
            }

        1-4.交换机接口

            //-----------------------------------------------------------------------
            // 声明交换机
            bool declareExchange(
                const std::string& name,
                zd::ExchangeType type,
                bool durable,
                bool autodelete,
                const google::protobuf::Map<std::string,std::string>& args)
            {
                return _emp->declareExchange(name,type,durable,autodelete,args);
            }
            // 移除交换机
            void deleteExchange(const std::string& name)
            {
                // 删除交换机时,还需要移除交换机的绑定
                _emp->deleteExchange(name);
                _bmp->removeExchangeBindings(name);
            }
            //-----------------------------------------------------------------------

        1-5.队列接口

        队列的接口比较特殊,它包含队列管理模块和消息管理模块:

            //-----------------------------------------------------------------------
            // 声明队列
            bool declareMsgQueue(
                const std::string& name,
                bool durable,
                bool exclusive,
                bool autodelete, 
                const google::protobuf::Map<std::string,std::string>& args)
            {
                // 先创建消息队列的消息存储管理
                _mmp->initQueueMesssge(name);
                // 再创建这个消息队列的管理
                return _qmp->declareMsgQueue(name,durable,exclusive,autodelete,args);
            }
            // 移除队列
            void deleteMsgQueue(const std::string& name)
            {
                // 先删除队列的消息,再删除队列的绑定,最后删除这个队列
                _mmp->destoryQueueMessage(name);
                _bmp->removeMsgQueueBindings(name);
                _qmp->deleteMsgQueue(name);
            }
            //-----------------------------------------------------------------------

        1-6.绑定接口

            //-----------------------------------------------------------------------
            // 绑定
            bool bind(const std::string& ename,const std::string& qname,const std::string& bid_ky)
            {
                // 如果 交换机 和 队列 都需要持久化,那么这条 绑定 也需要持久化
                Exchange::ptr ep = _emp->selectExchange(ename);
                if(ep.get() == nullptr)
                {
                    LOG("绑定失败,交换机 %s 不存在",ename.c_str())
                    return false;
                } 
                MsgQueue::ptr qp = _qmp->selectMsgQueue(qname);
                if(qp.get() == nullptr)
                {
                    LOG("绑定失败,队列 %s 不存在",qname.c_str())
                    return false;
                }
                return _bmp->bind(ename,qname,bid_ky,ep->durable && qp->durable);
            }
            // 解绑
            void unBind(const std::string& ename,const std::string& qname)
            {
                _bmp->unBind(ename,qname);
            }
            //-----------------------------------------------------------------------

        1-7.消息接口

 

            //-----------------------------------------------------------------------
            // 获得指定交换机的所有绑定关系
            MsgQueueBindingMap exchangeBindings(const std::string& ename)
            {
                return _bmp->getExchangeBindings(ename);
            }
            // 获得指定交换机的信息
            Exchange::ptr getOneExchange(const std::string& ename)
            {
                return _emp->selectExchange(ename);
            }
            // 获取所有队列信息
            MsgQueueMap getAllMsgQueue()
            {
                return _qmp->selectAll();
            }
            //-----------------------------------------------------------------------

        1-8.其他功能接口

        其他可能用到的接口

            //-----------------------------------------------------------------------
            // 获得指定交换机的所有绑定关系
            MsgQueueBindingMap exchangeBindings(const std::string& ename)
            {
                return _bmp->getExchangeBindings(ename);
            }
            // 获得指定交换机的信息
            Exchange::ptr getOneExchange(const std::string& ename)
            {
                return _emp->selectExchange(ename);
            }
            // 获取所有队列信息
            MsgQueueMap getAllMsgQueue()
            {
                return _qmp->selectAll();
            }
            //-----------------------------------------------------------------------
            // 判断存在
            bool exchangeExists(const std::string& ename)
            {
                return _emp->exists(ename);
            }

            bool msgqueueExists(const std::string& qname)
            {
                return _qmp->exists(qname);
            }

            bool bindingExists(const std::string& ename,const std::string& qname)
            {
                return _bmp->exists(ename,qname);
            }
            // 清理
            void clear()
            {
                _emp->clear();
                _qmp->clear();
                _bmp->clear();
                _mmp->clear();
            }
            //-----------------------------------------------------------------------

        虚拟机的实现就是如此简单,因为要用的接口都实现了,封装一下即可。

第二节:单元测试

        在mqtest目录下创建mq_virtualhost_test.cc文件,因为封装比较简单,底层的模块也已经测试过来,所以只简单测试一下即可。

        虚拟机使用独立测试套件,它的特征是对于每个单元测试,环境初始化函数和清理函数都会调用一次,保证每个单元测试的独立性。

#include "../mqserver/mq_virtualhost.hpp"

#include <gtest/gtest.h>
#include <iostream>
#include <unordered_set>

zd::VirtualHost::ptr vhp;
// 独立测试套件------------------------------------------------
// 与全局测试套件相比,可以定义成员变量,且成员变量仅相关单元测试可见
// 不需要注册测试单元
class HostTest:public testing::Test
{
public:
    // 每个单元测试之前都会调用一次
    virtual void SetUp() override
    {
        std::cout << "每个单元测试执行前的环境初始化" << std::endl;
        vhp = std::make_shared<zd::VirtualHost>("host1","./data/host1/","./data/host1/meta.bd");

        // 测试声明
        std::unordered_map<std::string,std::string> m;
        vhp->declareExchange("e1",zd::ExchangeType::DIRECT,true,false,m);
        vhp->declareExchange("e2",zd::ExchangeType::DIRECT,true,false,m);
        vhp->declareExchange("e3",zd::ExchangeType::DIRECT,true,false,m);
        vhp->declareExchange("e4",zd::ExchangeType::DIRECT,true,false,m);
        vhp->declareExchange("e5",zd::ExchangeType::DIRECT,true,false,m);

        vhp->declareMsgQueue("q1",true,false,false,m);
        vhp->declareMsgQueue("q2",true,false,false,m);
        vhp->declareMsgQueue("q3",true,false,false,m);
        vhp->declareMsgQueue("q4",true,false,false,m); 
        vhp->declareMsgQueue("q5",true,false,false,m);

        // 测试绑定
        vhp->bind("e1","q1","");   
        vhp->bind("e2","q2","");
        vhp->bind("e3","q3",""); 
        vhp->bind("e4","q4","");
        vhp->bind("e5","q5","");
        vhp->bind("e1","q5","");
    }

    // 每个单元测试之后都会调用一次
    virtual void TearDown() override
    {
        std::cout << "每个单元测试执行后的环境清理" << std::endl;
        vhp->clear();
    }
};

// 测试声明和绑定
TEST_F(HostTest,HostTest_test1_Test)
{ 
    std::cout << "单元测试-1" << std::endl;
    ASSERT_EQ(vhp->exchangeExists("e1"),true);
    ASSERT_EQ(vhp->exchangeExists("e2"),true);
    ASSERT_EQ(vhp->exchangeExists("e3"),true);
    ASSERT_EQ(vhp->exchangeExists("e4"),true);
    ASSERT_EQ(vhp->exchangeExists("e5"),true);

    ASSERT_EQ(vhp->msgqueueExists("q1"),true);
    ASSERT_EQ(vhp->msgqueueExists("q2"),true);
    ASSERT_EQ(vhp->msgqueueExists("q3"),true);
    ASSERT_EQ(vhp->msgqueueExists("q4"),true);
    ASSERT_EQ(vhp->msgqueueExists("q5"),true);

    ASSERT_EQ(vhp->bindingExists("e1","q1"),true);
    ASSERT_EQ(vhp->bindingExists("e2","q2"),true);
    ASSERT_EQ(vhp->bindingExists("e3","q3"),true);
    ASSERT_EQ(vhp->bindingExists("e4","q4"),true);
    ASSERT_EQ(vhp->bindingExists("e5","q5"),true);
    ASSERT_EQ(vhp->bindingExists("e1","q5"),true);
}

// 测试消息的推送
TEST_F(HostTest,HostTest_test2_Test)
{
    std::cout << "单元测试-2" << std::endl;
    vhp->basicPublish("q1",nullptr,"Hello World-1");

    zd::MessagePtr msgp = vhp->basicConsume("q1");
    ASSERT_EQ(msgp->payload().body(),"Hello World-1");
    zd::MessagePtr msgp_ = vhp->basicConsume("q1");
    ASSERT_EQ(msgp_.get(),nullptr);

    zd::MessagePtr msgp__ = vhp->basicConsume("q6");

}

// 测试消息的确认
TEST_F(HostTest,HostTest_test3_Test)
{
    std::cout << "单元测试-3" << std::endl;
    zd::BasicProperties pb;
    std::string id = zd::UUIDHelper::uuid();
    pb.set_id(id);
    pb.set_delivery_mode(zd::DeliveryMode::DURABLE); 
    pb.set_routing_key("123");
    vhp->basicPublish("q1",&pb,"Hello World-1");
    vhp->basicConsume("q1");
    vhp->basicAck("q1",id);
    ASSERT_EQ(vhp->basicConsume("q1"),nullptr);    
}

// ----------------------------------------------------------
 
int main(int argc,char** argv)
{
    testing::InitGoogleTest(&argc,argv);

    if(RUN_ALL_TESTS() != 0) // 运行所有单元测试
    {
        printf("单元测试失败!\n");
    }
    return 0;
}

        编译:

mq_virhost_test:mq_virhost_test.cc ../mqcommon/mq_msg.pb.cc
	g++ -std=c++14 $^ -o $@ -lgtest -lprotobuf -lsqlite3

        执行结果:

        符合预期 。

        那么虚拟机的"整合"也完成了。        

下期预告: 

        之后将完成与binding_key和routing_key有关的模块——Router,它提供3个功能:

        (1)routing_key的合法性检验

        (2)binding_key的合法性检验

        (3)匹配routing_key和binding_key,并返回匹配结果


http://www.niftyadmin.cn/n/5869015.html

相关文章

解决IDEA使用Ctrl + / 注释不规范问题

问题描述&#xff1a; ctrl/ 时&#xff0c;注释缩进和代码规范不一致问题 解决方式 设置->编辑器->代码样式->java->代码生成->注释代码

Android NFC功能开发指南

在 Android 平台上开发 NFC&#xff08;近场通信&#xff09;功能&#xff0c;主要涉及以下几个步骤&#xff1a; 1. 权限声明 首先&#xff0c;在 AndroidManifest.xml 文件中声明 NFC 权限&#xff1a; <uses-permission android:name"android.permission.NFC&quo…

数据分析七大步骤

在工作中&#xff0c;我们可能都遭遇过面对一堆数据&#xff0c;费尽心思进行分析&#xff0c;结果却惨不忍睹&#xff0c;仿佛“一顿操作猛如虎&#xff0c;一看结果0:5”。更糟糕的是&#xff0c;有时我们甚至完全找不到数据分析的头绪。 别急&#xff0c;朋友们&#xff01…

Docker小游戏 | 使用Docker部署star-battle太空飞船射击小游戏

Docker小游戏 | 使用Docker部署star-battle太空飞船射击小游戏 前言项目介绍项目简介项目预览二、系统要求环境要求环境检查Docker版本检查检查操作系统版本三、部署star-battle网页小游戏下载镜像创建容器检查容器状态检查服务端口安全设置四、访问star-battle网页小游戏五、总…

Python游戏编程之赛车游戏6-1

通过Python的pygame模块可以实现赛车游戏&#xff0c;如图1所示。 图1 赛车游戏 从图1中可以看出&#xff0c;玩家通过键盘的左右键操作蓝色汽车躲避红色汽车的撞击&#xff0c;每成功躲避过一辆红色汽车&#xff0c;则玩家得一分。当蓝色汽车被红色汽车撞击后&#xff0c;游戏…

es-head(es库-谷歌浏览器插件)

1.下载es-head插件压缩包&#xff0c;并解压缩 2.谷歌浏览器添加插件 3.使用

SV基础(二):数据类型

文章目录 **1. Verilog 的 4 值数据类型****硬件建模的必要性****2. Testbench 中的问题****Verilog 的局限性****3. SystemVerilog 的 2 值数据类型****示例:明确的 2 值操作****4. 何时使用 2 值 vs 4 值****5. 关键优势****6. 注意事项**7. 有符号数与无符号数详解**无符号…

PostgreSQL的学习心得和知识总结(一百七十)|深入理解PostgreSQL数据库之 处理HAVING子句 的使用和实现

目录结构 注&#xff1a;提前言明 本文借鉴了以下博主、书籍或网站的内容&#xff0c;其列表如下&#xff1a; 1、参考书籍&#xff1a;《PostgreSQL数据库内核分析》 2、参考书籍&#xff1a;《数据库事务处理的艺术&#xff1a;事务管理与并发控制》 3、PostgreSQL数据库仓库…