博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ 集群之镜像同步
阅读量:6210 次
发布时间:2019-06-21

本文共 2607 字,大约阅读时间需要 8 分钟。

mirrored

在上个博文中讲到了如果做集群,那么集群是成功了,但是queue是如何存放的呢?消息又是怎么同步呢。

默认的,也就是什么也不配置,直接在某个节点中添加一个queue,那么它仅仅是属于这个节点的。其它节点有的只是它的影子。所以像断线重连、操作恢复是无法做到的,实验证明确实是这样的。声明queue的节点关闭那么是无法再进行发布消息与消费的。这自然失去了集群的意义

 

所以default模式一般是不会进行使用的,我们的选择是 镜像节点

queue mirror 也就是说,也就是 会有一个 master 对应零个或多个 slave

也可以说成主从复制,每个节点都有着相同的数据,某个节点挂掉了,另一个可以立即顶上

 

官方的文档也可以看到,镜像的配置是通过 policy 策略的方式

我们有三种同步的方式,一般来说  all 是我们的最佳选择

all 所有的节点都将被同步

exactly 指定个数的节点被同步

nodes 指定的名称的节点被同步

一个简单的示例,下面使用的是ha-all 是我们的策略名称,后面的是^ha\. 则是一个正则,queue名满足这个规则的,将被做镜像。方式是所有的节点都被同步

这里需要说明的是 策略 可以被应用到exchange与queue上面,也可以选择只被应用到哪个上面,下面的命令则是exchange与queue都被应用到了

rabbitmqctl set_policy ha-all "^ha\." "{
""ha-mode"":""all""}"

翻看文档,似乎没有提供apply to 的参数,所以下面的操作都将使用WEB UI可以看到WEB UI的创建策略给我们提供了丰富的选项,比如Apply to,优先级,等其它设置

 

我们需要关注提上图中 关于 HA的参数

Ha mode 同步方式

Ha params 选择其它同步方式的参数 比如节点名

Ha async mode 同步方式,这个需要详细说明的,默认的情况下。当节点断线后那么这个节点就已经是落后的版本,当我们再去启动节点的时候数据我们需要去手动的同步,这自然是不好的,做到自动化是最完美的,所以设置成automatic是最佳选择

下面设置一个作用与queue,集群中的所有节点将被同步,并且断线重启后自动同步的的策略

 

设置完毕后,就可以通过代码来看看效果了,因为使用了集群,所以创建connection的时候需要做些变化了,创建连接的时候把节点的ip放了进去。

AutomaticRecoveryEnabled  断线重连,也就是如果当前的连接断开了,将会尝试重连

TopologyRecoveryEnabled 重连后恢复当前的工作进程,比如channel、queue、发布的消息进度等

var factory = new ConnectionFactory()                {                    UserName = "admin",                    Password = "admin",                    AutomaticRecoveryEnabled = true,                    TopologyRecoveryEnabled = true                };                Connection = factory.CreateConnection(new string[3] { "192.168.1.115", "192.168.1.113", "192.168.1.123" });

这里声明了一个test1queue,然后进行发布消息,当节点崩溃,代码自然也会异常。所以try catch起来,但是没关系。因为我们已经设置了断线重连,所以一会儿,就会恢复如常

//创建返回一个新的频道            using (var channel = RabbitMqHelper.GetConnection().CreateModel())            {                channel.QueueDeclare("test1", true, false, false, null);                var properties = channel.CreateBasicProperties();                properties.Persistent = true;                for (var i = 0; i < 10000; i++)                {                    try                    {                        channel.BasicPublish(string.Empty, "test1", properties, Encoding.UTF8.GetBytes($"这是{i}个消息"));                        Thread.Sleep(1000);                        Console.WriteLine($"发布消息 {i}");                    }                    catch (Exception ex)                    {                        Console.WriteLine(ex.Message);                        i--;
}                }            }
程序运行后,在WEB UI中已经可以看到了,主节点是rabbimq1,剩下的是从节点。

可以看到现在连接到的节点是rabbitmq2,现在我们可以手动的把它挂掉

可以看到应用程序一直在报错,就这么报了一会,然后又继续发布消息了。这样就保证了我们消息队列的高可用、高可靠

转载于:https://www.cnblogs.com/LiangSW/p/6242280.html

你可能感兴趣的文章
影响微信公众号排名的因素
查看>>
探析软件项目管理十大原则
查看>>
Mybatis update delete 操作的返回值
查看>>
【iOS Tips】模态跳转的透明效果
查看>>
<![CDATA[文本内容]]>代表的意思
查看>>
macbook 网络及使用技巧
查看>>
程序员学习网站
查看>>
银行业务调度系统
查看>>
长轮询解决方案
查看>>
Linux硬盘的检测(原创)
查看>>
remoteapp深度体验
查看>>
Eclipse是如何连接Oracle数据库的
查看>>
MapReduce 中文版论文(转载)
查看>>
MongoDB学习笔记(四) 用MongoDB的文档结构描述数据关系
查看>>
ios 调用应用 调用服务:电话、浏览器 Email SMS URL Schemes
查看>>
FTP命令详解
查看>>
软件工程师所需掌握的“终极技术”是什么?
查看>>
我的友情链接
查看>>
magento给后台表单添加新的元素
查看>>
myeclipse2015 git 操作
查看>>