C++亚、消息队列之如何当C#中采用RabbitMQ

1、什么是RabbitMQ。详见 http://www.rabbitmq.com/。

   
作用就是增高系统的并发性,将一部分勿需这响应客户端且占用比较多资源的操作,放入队列,再由另外一个线程,去异步处理这些队列,可大幅度的增进系统的起能力。

    MQ全称为Message Queue,
消息队列(MQ)是如出一辙种应用程序对应用程序的通信方式。应用程序通过描写及搜索出入列队的针对性应用程序的数码(消息)来通信,而无需专用连接来链接它们。消息传递指的凡程序中通过当信息遭发送数据进行通信,而非是经过一直调用彼此来通信,直接调用通常是用于诸如远程过程调用的技能。排队靠的是应用程序通过队来通信。队列的采用除了接受和发送应用程序同时推行的求。其中较成熟之MQ产品产生MSMQ,ActiveMQ,RabbitMQ,IBM
WEBSPHERE MQ 等等。

    
RabbitMQ是采用Erlang开发的,开源之,一个于高档消息队列协议(AMQP)基础及完的,可复用的店铺信息网。他准Mozilla
Public
License开源商谈。

    
RabbitMQ支持您能体悟的大多数出语言如Java,Ruby,Python,.Net,C/C++,Erlang等等。各种语言的客户端你得自她的官方网站上下载。

2. RabbitMQ的特点

    相较让任何组成部分MQ产品,RabbitMQ的有的优势还是比较强烈。

    你可以当下面有链接中找到有于

   
RabbitMQ,ActiveMQ,qpid:http://bhavin.directi.com/rabbitmq-vs-apache-activemq-vs-apache-qpid/

   
RabbitMQ和ZeroMQ:http://stackoverflow.com/questions/731233/activemq-or-rabbitmq-or-zeromq-or

   
还有四款消息队列大比并:http://www.oschina.net/news/17973/message-queue-shootout

 

    RabbitMQ的一部分长

    安装配置好:

       RabbitMQ安装好,有详细的装置文档。

    可靠性:

        RabbitMQ
提供了各式各样的特点深受您以可靠性,可用性和性之间作出权衡,包括持久化,发送应答,发布确认等。

    强大地路由于功能:

       
所有的信都见面经路由器转发到各个消息队列中,RabbitMQ内修了几只常因此底路由器,并且可以经过路由器的组成和由定义路由器插件来就复杂的路由功能。

    Clustering:

       RabbitMQ服务端可以当一个局域网中集群部署,作为一个逻辑服务器。

    Federation:

      
Federration模式可以让RabbitMQ服务器热备部署,当系统遭到中一个服务器失效而无法运转时,另一个服务器即可自行接手原失效系统所执行的行事。

    Completing Consumer:

      内置的竞争的消费者模式可兑现消费者之负载均衡。

   管理工具:

     
RabbitMQ提供了一个简练容易用的管住界面,让用户可通过浏览器监控及控制而的音队列的合。

   跟踪:

     
如果你当采用RabbitMQ过程遭到起了问题,或者抱了若无希望的结果,你得经打开内置Tracer功能进行跟踪,当然这性能就有下降。

   插件系统:

     
提供了强劲地插件系统,让用户会有利于之针对性MQ进行扩展。上面所说到的管理工具和Federation组件都是作为一个插件发布的。你可选取设置或不安装。

   客户端支持:

    
RabbitMQ支持而会想到的多数开发语言如Java,Ruby,Python,.Net,C/C++,Erlang等等。

   强大的社区与小买卖支持:

    
强大的社区可帮忙而快的解决有关RabbitMQ的各种题材,如果要无克迎刃而解您的问题,VMWare还提供买卖技术支持。

3、安装

    RabbitMQ服务:http://www.rabbitmq.com/download.html。
   
(安装完RabbitMQ服务后,会于Windows服务遭遇视。如果没Erlang运行环境,在设置过程中会提示先安装Erlang环境。http://www.erlang.org/downloads)

    .net客户端类库:http://www.rabbitmq.com/dotnet.html

4、插件

     RabbitMQ提供了成百上千好用的插件,最常用的便是web管理工具,启动之插件。

     CMD中运行命令:rabbitmq-plugins enable rabbitmq_management

     注:rabbitmq-plugins 所在路径为:D:\Program Files\RabbitMQ
Server\rabbitmq_server-3.6.9\plugins

     web管理工具的地方是:http://localhost:15672,初始用户名:guest
初始密码:guest

5. RabbitMQ的部分定义

     – 连接(Connection),与RabbitMQ
Server建立的一个连接,由ConnectionFactory创建,每个connection只及一个大体的Server进行连接,此连续是根据Socket进行连续的,这个得相似的晓啊像一个DB
Connection。AMQP一般下TCP链接来保证信息传的可靠性。

     – 通道
(Channel),在C#客户端里应该是被Model,不亮堂为何这么取名字,其他客户端基本还让Channel。建立以Connection基础及之一个通路,相对于Connection来说,它是轻量级的。它便像是Hibernate里面的Session一样。Channel
主要进行相关定义,发送信息,获取信息,事务处理等。Channel可以在多线程中使用,但是于得保证其他时刻偏偏来一个线程执行命令。一个Connection可以生差不多个Channel。客户端程序有时候会是一个大多线程程序,每一个线程都想要同RabbitMQ进行连接,但是以非思量共享一个老是,这种求或比泛的。因为一个Connection就是一个TCP链接,RabbitMQ在设计之时段不指望和各个一个客户端保持多个TCP连接,但马上的确是发出把客户端的要求,所以在统筹被引入了Channel的定义,每一个Channel之间从来不外沟通,是一心分开之。多单Channel来共享一个Connection。

      – 交换器(Exchange),它是殡葬信息的实业。

      – 队列(Queue),这是收取信息之实业。

      – 绑定器(Bind),将交换器和排连接起来,并且封装消息之路由信息。

6、配置

    配置文件地点也:C:\Documents and
Settings\Administrator\Application
Data\RabbitMQ\rabbitmq.config,默认没有rabbit.config文件,需要手工新建(默认会时有发生rabbitmq.config.example
作为参照)。基于安全,做了少数独布局,如下:

    

[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 1234},
{"10.121.1.48", 8009}]}

]}
].

loopback_users:设置只能以跟RabbitMq服务一样台机器上看服务之用户。

tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止让侵犯攻击。

安完后,别忘了以下操作,否则配置不起作用。

  • 停止RabbitMQ服务;
  • 重新安装服务如配置生效:rabbitmq-service.bat install

        此命令要切换到路:D:\Program Files\RabbitMQ
Server\rabbitmq_server-3.4.0\sbin

  • 启动RabbitMQ服务;

6、Demo练习。

消息生产者:

class Program
    {
        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = Constants.MqHost;
                factory.Port = Constants.MqPort;
                factory.UserName = Constants.MqUserName;
                factory.Password = Constants.MqPwd;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);
                        while (true)
                        {
                            string customStr = Console.ReadLine();
                            RequestMsg requestMsg = new RequestMsg();
                            requestMsg.Name = string.Format("Name_{0}", customStr);
                            requestMsg.Code = string.Format("Code_{0}", customStr);
                            string jsonStr = JsonConvert.SerializeObject(requestMsg);
                            byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);

                            //设置消息持久化
                            IBasicProperties properties = channel.CreateBasicProperties();
                            properties.DeliveryMode = 2;
                            channel.BasicPublish("", "MyFirstQueue", properties, bytes);

                            //channel.BasicPublish("", "MyFirstQueue", null, bytes);

                            Console.WriteLine("消息已发送:" + requestMsg.ToString());
                        }
                    }
                }
            }
            catch (Exception e1)
            {
                Console.WriteLine(e1.ToString());
            }
            Console.ReadLine();
        }
    }

 

 

class Program
    {
        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = Constants.MqHost;
                factory.Port = Constants.MqPort;
                factory.UserName = Constants.MqUserName;
                factory.Password = Constants.MqPwd;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);

                        //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
                        channel.BasicQos(0, 1, false);

                        Console.WriteLine("Listening...");

                        //在队列上定义一个消费者
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        //消费队列,并设置应答模式为程序主动应答
                        channel.BasicConsume("MyFirstQueue", false, consumer);

                        while (true)
                        {
                            //阻塞函数,获取队列中的消息
                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                            byte[] bytes = ea.Body;
                            string str = Encoding.UTF8.GetString(bytes);
                            RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
                            Console.WriteLine("HandleMsg:" + msg.ToString());
                            //回复确认
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                    }
                }
            }
            catch (Exception e1)
            {
                Console.WriteLine(e1.ToString());
            }
            Console.ReadLine();
        }
    }

 

     C++ 1
     C++ 2

当下是私家的下结论,只是略的安装与动,积累了再次好的经验以记录下来。