Skip to content

Commit

Permalink
rocketmq
Browse files Browse the repository at this point in the history
  • Loading branch information
penglingfeng committed Feb 15, 2020
1 parent 672f648 commit 092ee35
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 0 deletions.
10 changes: 10 additions & 0 deletions MQ/RocketMQ/01.概述.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,13 @@ nohup sh bin/mqbroker -n 192.168.80.128:9876 &jps
bin/mqshutdown broker
```

# 高可用

## name server 集群

直接部署多态,每个broker,producer,consumer注册多个即可。

## broker 主从

设置 brokerId ,为 0 则为 master,大于0开始为 slave

128 changes: 128 additions & 0 deletions MQ/RocketMQ/02.java调用.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,114 @@ public class Provider {
}
```

## SendResult

发送会产生一个发送结果对象,可以得到 SendStatus ,判断是否成功。

默认是同步发送消息,等待服务器返回发送结果。

```java
for (int i = 0; i < 10; i++) {
String hello = "hello rocket" + i + ",time:" + LocalDateTime.now().toString();
Message msg = new Message("student", hello.getBytes());
SendResult result = producer.send(msg);
if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
System.out.println("发生成功,msgId:" + result.getMsgId());
} else {
System.out.println("发生失败");
}
TimeUnit.SECONDS.sleep(1);
}
```

## SendCallback

如果想要异步发送消息,不等待发送结果,而是得到结果执行回调函数。

使用 SendCallback 对象,作为 send 的第二个参数。

```java
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:"+sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
System.out.println("发送失败:"+e.getMessage());
}
});
```

## 单向消息

如果不需要发送结果,则调用 sendOneway 方法,该方法没有返回值。

```java
producer.sendOneway(msg);
```

## 顺序消息

想要控制一组消息的顺序,发送时通过选择队列`MessageQueueSelector`,消费时使用顺序消费 `MessageListenerOrderly`

## 延时消息

消息发送前可以设置一个延迟级别。控制 broker 服务器发送给消费者的时间。

```java
Message msg = new Message("student", hello.getBytes());
msg.setDelayTimeLevel(3);
```

设置为 0 ,为不延迟。延迟级别从 1 开始。目前仅支持以下级别。

```java
//位于服务器 rocketmq-store-4.6.0.jar 包下的 org.apache.rocketmq.store.config.MessageStoreConfig 中
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
```

## 批量发送

批量发送只需把消息对象放进集合,send 即可。

```java
List<Message> msgs = new ArrayList<>();
msgs.add(new Message(topic,"Hello 1".getBytes()));
msgs.add(new Message(topic,"Hello 2".getBytes()));
msgs.add(new Message(topic,"Hello 3".getBytes()));

producer.send(msgs);
```

## 过滤消息

### tag

创建 消息对象时,还可以指定一个 tag 参数。

```java
List<Message> msgs = new ArrayList<>();
msgs.add(new Message(topic,"tag1","Hello 1".getBytes()));
msgs.add(new Message(topic,"tag2","Hello 2".getBytes()));
msgs.add(new Message(topic,"tag3","Hello 3".getBytes()));

producer.send(msgs);
```

而消费的时候可以指定一个表达式,指定过滤添加。指定 `*` 则消费所有类型。

例如,只消费 tag1 或 tag3

```java
consumer.subscribe("student", "tag1 || tag3");
```

## 事物消息

可以控制消息是否能被消费。

# 消费者

直接创建消费者,指定消费者组,订阅一个topic。
Expand Down Expand Up @@ -63,3 +171,23 @@ public class Consumer {
}
```

## 负载均衡

当启动多个同**消费者组**下的消费者时,默认会采用负载均衡的方式,分发到这些消费者。

消费者1

![](img/m1.png)

消费者2

![](img/m2.png)

## 广播模式

可以通过设置消息模式,设置为广播模式后的消费者,将得到所有消息。

```java
consumer.setMessageModel(MessageModel.BROADCASTING);
```

51 changes: 51 additions & 0 deletions MQ/RocketMQ/03.整合springboot.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# 生产者

引入 starter 依赖

```xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
```

基本配置

```yaml
rocketmq:
name-server: 192.168.80.128:9876
producer:
group: demo-pro-group
```
发送消息只需注入 RocketMQTemplate ,调用 相关 send 方法即可。
```java
@Autowired
private RocketMQTemplate rocketMQTemplate;

@RequestMapping("getStudent")
public String getStudent(){
SendResult result = rocketMQTemplate.syncSend("student", "hello spring");
return "success";
}
```

# 消费者

实现一个 RocketMQListener 的接口,重写 onMessage 方法。

使用 `@RocketMQMessageListener` 注解定义 topic 和消费者组。同时声明为bean即可。

```java
@Component
@RocketMQMessageListener(topic = "student",consumerGroup = "demo-con-group")
public class StudentConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
```

Binary file added MQ/RocketMQ/img/m1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added MQ/RocketMQ/img/m2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 092ee35

Please sign in to comment.