SpringBoot和RabbitMQ集成


步骤

AH1g2V.png

自动配置

    @Bean
        public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
                throws Exception {
            RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
            if (config.determineHost() != null) {
                //设置mq的host地址
                factory.setHost(config.determineHost());
            }
            factory.setPort(config.determinePort());
            if (config.determineUsername() != null) {
                //设置mq的username
                factory.setUsername(config.determineUsername());
            }
            if (config.determinePassword() != null) {
                //设置mq的密码
                factory.setPassword(config.determinePassword());
            }
            if (config.determineVirtualHost() != null) {
                //是指虚拟主机
                factory.setVirtualHost(config.determineVirtualHost());
            }
            if (config.getRequestedHeartbeat() != null) {
                //心跳
                factory.setRequestedHeartbeat(config.getRequestedHeartbeat());
            }
        .....
    }
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
    //地址
   private String host = "localhost";
    //端口
   private int port = 5672;
    //账号
   private String username;
    //密码
   private String password;
    //SSL配置
   private final Ssl ssl = new Ssl();
    //虚拟主机
   private String virtualHost;
    //地址
   private String addresses;

    //请求心跳超时,以秒为单位; 零,没有。
   private Integer requestedHeartbeat;
    
    //Publisher Confirms and Returns机制
   private boolean publisherConfirms;
    
   private boolean publisherReturns;
    //连接超时时间
   private Integer connectionTimeout;
     //缓存
   private final Cache cache = new Cache();

     //监听容器配置
   private final Listener listener = new Listener();

   private final Template template = new Template();

   private List<Address> parsedAddresses;

   public String getHost() {
      return this.host;
   }

RabbitProperties封装了RabbitMQ发送和接收消息。

RabbitTemplate给RabbitMQ发送和接收消息。

AmqpAdmin,RabbitMQ系统管理功能组件。

@Bean
         @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean(RabbitTemplate.class)
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            //生成rabbitTemplate来操作rabbitmq
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            MessageConverter messageConverter = this.messageConverter.getIfUnique();
            //如果messageConverter不为空设置我们自己的messageConverter
            if (messageConverter != null) {
                rabbitTemplate.setMessageConverter(messageConverter);
            }
            rabbitTemplate.setMandatory(determineMandatoryFlag());
            RabbitProperties.Template templateProperties = this.properties.getTemplate();
            RabbitProperties.Retry retryProperties = templateProperties.getRetry();
            if (retryProperties.isEnabled()) {
                rabbitTemplate.setRetryTemplate(createRetryTemplate(retryProperties));
            }
            if (templateProperties.getReceiveTimeout() != null) {
                rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout());
            }
            if (templateProperties.getReplyTimeout() != null) {
                rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout());
            }
            return rabbitTemplate;
        }

        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic",
                matchIfMissing = true)
        @ConditionalOnMissingBean(AmqpAdmin.class)
        public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }

    }

P2P发送

package com.hph.amqp;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootAmqpApplicationTests {


    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 单播 P2P
     */
    @Test
    public void p2p() {
        Map<String, Object> map = new HashMap<>();
        map.put("msg","这是第1个消息");
        map.put("data", Arrays.asList("Hello Rabitmq",123456, true));
        //对象默认被序列化以后发送出去
        rabbitTemplate.convertAndSend("exchange.direct", "phh.news",map);
    }

}

AHdZeP.png

这是因为默认使用的是application/x-java-serialized-object的序列化

获取消息

    @Test
    public void receive() {
        Object o = rabbitTemplate.receiveAndConvert("hph.news");
        System.out.println(o.getClass());
        System.out.println(o);
    }

转为Json

由于是RabbitTemplate操作Rabbit的在RabbitTemplate中RabbitTemplate为默认的序列化器

private volatile MessageConverter messageConverter = new SimpleMessageConverter();

MessageConverter又一下实现类我们使用的是Jackson2JsonMessageConverter的序列化器

AHwo34.png

在设置我们自己的MessageConverter

if (messageConverter != null) {
                rabbitTemplate.setMessageConverter(messageConverter);
            }
package com.hph.amqp.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyAMQPConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

再次发送消息

AHB9iT.png

自定义发送

  @Test
    public void sendMessage() {
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "这是第1个消息");
        map.put("data", Arrays.asList("清风笑丶",123456,true));
        rabbitTemplate.convertAndSend("exchange.direct", "hph.news", new Person("小明",18));
    }
package com.hph.amqp.bean;

public class Person {
    private String name;
    private Integer age;

    public Person() {
    }

    public Person(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Person{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

AHBqk6.png

反序列化

    @Test
    public void receive() {
        Object o = rabbitTemplate.receiveAndConvert("hph.news");

        System.out.println(o.getClass());
        System.out.println(o);
    }

AHcEOe.png

广播发送

    @Test
    public void sendMessages() {
        rabbitTemplate.convertAndSend("exchange.fanout", "hph.news", new Person("清风笑丶",18));
    }

AHrePK.png

监听消息队列

package com.hph.amqp;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableRabbit //开启基于注解的RabbitMQ的模式
@SpringBootApplication
public class SpringBootAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootAmqpApplication.class, args);
    }
}
package com.hph.amqp.service;

import com.hph.amqp.bean.Person;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class PersonService {

    @RabbitListener(queues = "hph.news")
    public void receive(Person person) {
        System.out.println("收到消息" + person+"上线");

    }
}

启动SpringBoot然后运行sendMessage任务。

AHc1l8.png

   @RabbitListener(queues = "hph")
    public void receive02(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }
}

AHcWfx.png

消息头信息。

管理

在SpringBoot中消息队列的管理使用到了amqpAdmin

    @ConditionalOnMissingBean(AmqpAdmin.class)
        public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }

在RabbitAutoConfiguration

AHgiAs.png

public class DirectExchange extends AbstractExchange {

   public static final DirectExchange DEFAULT = new DirectExchange("");

    //设置名字
   public DirectExchange(String name) {
      super(name);
   }
    //名字  是否持久化 自动删除
   public DirectExchange(String name, boolean durable, boolean autoDelete) {
      super(name, durable, autoDelete);
   }

   public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
      super(name, durable, autoDelete, arguments);
   }

   @Override
   public final String getType() {
      return ExchangeTypes.DIRECT;
   }

}

![

AHg0UA.png](https://s2.ax1x.com/2019/04/11/AHg0UA.png)

   @Test
    public void createExchange(){
    amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        System.out.println("创建完成");
    }

运行该方法。

AH2Yin.png

创建exchange

    public Queue(String name, boolean durable) {
        this(name, durable, false, false, null);
    }

    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, null);
    }

    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
        Assert.notNull(name, "'name' cannot be null");
        this.name = name;
        this.durable = durable;
        this.exclusive = exclusive;
        this.autoDelete = autoDelete;
        this.arguments = arguments;
    }

创建Queue

  @Test
    public void createQueue() {
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
        System.out.println("创建队列成功");
    }

AHRUkd.png

绑定exchange

public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
            Map<String, Object> arguments) {
        this.destination = destination;
        this.destinationType = destinationType;
        this.exchange = exchange;
        this.routingKey = routingKey;
        this.arguments = arguments;
    }

之前尚未绑定

AHR5cV.png

    @Test
    public void bindExchange() {
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqp.bind", null));
    }

AHRqAJ.png

绑定成功


文章作者: 清风笑丶
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 清风笑丶 !
 上一篇
Elasticsearch简介 Elasticsearch简介
简介Elasticsearch (ES)是一个基于Lucene构建的开源、分布式、RESTful接口全文搜索引擎Elasticsearch还是一个分布式文档数据库,其中每个字段均是被索引的数据且可被搜索,它能够扩展至数以百计的服务器存储以及
2019-04-12
下一篇 
消息队列RabbitMQ 消息队列RabbitMQ
消息队列(Message Queue)消息: 网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。 队列:一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素。入队、出队。 消
2019-04-11
  目录