Skip to content

Commit

Permalink
add: 线程数配置项
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizzercn committed May 23, 2022
1 parent d7f7c87 commit f9808bb
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public class BrokerProperties {
private boolean kafkaBrokerEnabled;
public static final String PROP_KAFKA_BROKER_ENABLED = PRE + "kafka.broker-enabled";

private int bossGroup_nThreads;
@PropDoc(group = "broker", value = "bossGroup线程数", type = "int", defaultValue = "4")
public static final String PROP_BOSSGROUP_NTHREADS = PRE + "bossGroup-nThreads";
private int workerGroup_nThreads;
@PropDoc(group = "broker", value = "bossGroup线程数", type = "int", defaultValue = "20")
public static final String PROP_WORKERGROUP_NTHREADS = PRE + "workerGroup-nThreads";

public void init() {
this.id = conf.get(_id, "mqttwk");
Expand All @@ -133,6 +139,8 @@ public void init() {
this.useEpoll = conf.getBoolean(PROP_USEEPOLL, false);
this.soBacklog = conf.getInt(PROP_SOBACKLOG, 511);
this.soKeepAlive = conf.getBoolean(PROP_SOKEEPALIVE, true);
this.bossGroup_nThreads = conf.getInt(PROP_BOSSGROUP_NTHREADS, 4);
this.workerGroup_nThreads = conf.getInt(PROP_WORKERGROUP_NTHREADS, 20);
}

public String getId() {
Expand Down Expand Up @@ -279,4 +287,21 @@ public BrokerProperties setSoKeepAlive(boolean soKeepAlive) {
return this;
}

public int getBossGroup_nThreads() {
return bossGroup_nThreads;
}

public BrokerProperties setBossGroup_nThreads(int bossGroup_nThreads) {
this.bossGroup_nThreads = bossGroup_nThreads;
return this;
}

public int getWorkerGroup_nThreads() {
return workerGroup_nThreads;
}

public BrokerProperties setWorkerGroup_nThreads(int workerGroup_nThreads) {
this.workerGroup_nThreads = workerGroup_nThreads;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.util.Map;

/**
* t-io启动Broker
* 启动Broker
*/
@IocBean
public class BrokerServer implements ServerFace {
Expand All @@ -66,8 +66,8 @@ public void start() throws Exception {
LOGGER.info("Initializing {} MQTT Broker ...", "[" + brokerProperties.getId() + "]");
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
channelIdMap = new HashMap<>();
bossGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
workerGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
bossGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup(brokerProperties.getBossGroup_nThreads()) : new NioEventLoopGroup(brokerProperties.getBossGroup_nThreads());
workerGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup(brokerProperties.getWorkerGroup_nThreads()) : new NioEventLoopGroup(brokerProperties.getWorkerGroup_nThreads());
if (brokerProperties.getSslEnabled()) {
KeyStore keyStore = KeyStore.getInstance("PKCS12");
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("keystore/server.pfx");
Expand Down
4 changes: 4 additions & 0 deletions mqtt-broker/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ mqttwk:
websocket-port: 9995
# websocket 是否启用
websocket-enabled: true
# bossGroup线程数
bossGroup-nThreads: 4
# workerGroup线程数
workerGroup-nThreads: 200
# 是否开启集群功能
cluster-on: false
# 启用ssl验证(含websocket)
Expand Down

0 comments on commit f9808bb

Please sign in to comment.