Skip to content

Commit

Permalink
add initialization value of rocketmq consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ieven committed May 31, 2018
1 parent 0db55ea commit e5cda5b
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,19 @@ public static MessageProducer getMessageProducer(String producerId) {
* @return
*/
public static MessageConsumer createMessageConsumer(String name, String[] bizIDs, int maxConcurrent,
long stopInterval, MQFactory.QueueType queueType) {
int initConcurrent, long stopInterval, MQFactory.QueueType queueType) {

MessageConsumer consumer = new RMQMessageConsumer(name, bizIDs, maxConcurrent, stopInterval, queueType);
MessageConsumer consumer = new RMQMessageConsumer(name, bizIDs, maxConcurrent, initConcurrent, stopInterval,
queueType);

return consumer;
}

public static MessageConsumer createMessageConsumer(String name, String[] bizIDs, int maxConcurrent,
long stopInterval) {
int initConcurrent, long stopInterval) {

return createMessageConsumer(name, bizIDs, maxConcurrent, stopInterval, MQFactory.QueueType.QUEUE);
return createMessageConsumer(name, bizIDs, maxConcurrent, initConcurrent, stopInterval,
MQFactory.QueueType.QUEUE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ public class RMQMessageConsumer extends MessageConsumer {
private long stopInterval = 0;

@SuppressWarnings("unchecked")
public RMQMessageConsumer(String name, String[] bizIds, int maxConcurrent, long stopInterval,
public RMQMessageConsumer(String name, String[] bizIds, int maxConcurrent, int initConcurrent, long stopInterval,
MQFactory.QueueType queueType) {

super(name, bizIds);

this.context = MessagingContext.instance();
Expand All @@ -61,7 +62,7 @@ public RMQMessageConsumer(String name, String[] bizIds, int maxConcurrent, long
// group不同的consumer不能重复
config.setComsumerGroup(MessagingContext.DefaultConsumerGroup + "-" + this.getName());
config.setConsumeThreadMax(maxConcurrent);
config.setConsumeThreadMin(1);
config.setConsumeThreadMin(initConcurrent);
// 暂时使用默认值,调优的时候可能需要调整这个参数
config.setPullBatchSize(0);
// get bizId2TopicMap
Expand All @@ -79,9 +80,9 @@ public RMQMessageConsumer(String name, String[] bizIds, int maxConcurrent, long
consumer = MQFactory.createMQConsumer(config, queueInfo);
}

public RMQMessageConsumer(String name, String[] bizIds, int maxConcurrent, long stopInterval) {
public RMQMessageConsumer(String name, String[] bizIds, int maxConcurrent, int initConcurrent, long stopInterval) {

this(name, bizIds, maxConcurrent, stopInterval, MQFactory.QueueType.QUEUE);
this(name, bizIds, maxConcurrent, initConcurrent, stopInterval, MQFactory.QueueType.QUEUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static void main(String[] args) {
bizIDs[0] = "test";
String[] handlerNames = new String[1];
handlerNames[0] = "test";
MessageConsumer consumer = MessagingFactory.createMessageConsumer("name", bizIDs, 10, 0);
MessageConsumer consumer = MessagingFactory.createMessageConsumer("name", bizIDs, 10, 10, 0);
consumer.start();
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
public class StandardMessagingBuilder extends AbstractComponent {

public StandardMessagingBuilder(String cName, String feature) {

super(cName, feature);
}

Expand Down Expand Up @@ -103,6 +104,8 @@ public MessageConsumer buildConsumer(String groupName, String msgType, MQFactory

String ConsumeThreadMax = ConfigurationManager.getInstance().getFeatureConfiguration(feature,
msgType + ".consumethreadmax");
String ConsumeThreadInit = ConfigurationManager.getInstance().getFeatureConfiguration(feature,
msgType + ".consumethreadinit");
String ConsumeStopInterval = ConfigurationManager.getInstance().getFeatureConfiguration(feature,
msgType + ".consumestopinterval");

Expand All @@ -117,6 +120,7 @@ public MessageConsumer buildConsumer(String groupName, String msgType, MQFactory
bizIDs[0] = msgType;
MessageConsumer consumer = MessagingFactory.createMessageConsumer(groupName, bizIDs,
ConsumeThreadMax == null ? 10 : Integer.parseInt(ConsumeThreadMax),
ConsumeThreadInit == null ? 10 : Integer.parseInt(ConsumeThreadInit),
ConsumeStopInterval == null ? 0 : Long.parseLong(ConsumeStopInterval), queueType);

return consumer;
Expand Down

0 comments on commit e5cda5b

Please sign in to comment.