入门使用 :http://rocketmq.apache.org/docs/quick-start/
下载
1 2 3 4 unzip rocketmq-all-4.2.0-source-release.zip cd rocketmq-all-4.2.0/ mvn -Prelease-all -DskipTests clean install -U cd distribution/target/apache-rocketmq
Start Name Server
1 2 nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success…
Start Broker
1 2 nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success…
发送消息可靠同步的 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class SyncProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name" ); producer.start(); for (int i = 0 ; i < 100 ; i++) { Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n" , sendResult); } producer.shutdown(); } }
可靠的异步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class AsyncProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup" ); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0 ); for (int i = 0 ; i < 100 ; i++) { final int index = i; Message msg = new Message("TopicTest" , "TagA" , "OrderID188" , "Hello world" .getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess (SendResult sendResult) { System.out.printf("%-10d OK %s %n" , index, sendResult.getMsgId()); } @Override public void onException (Throwable e) { System.out.printf("%-10d Exception %s %n" , index, e); e.printStackTrace(); } }); } producer.shutdown(); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class Consumer { public static void main (String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4" ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n" ); } }
单向传播 弱可靠
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class OnewayProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup" ); producer.start(); for (int i = 0 ; i < 100 ; i++) { Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); producer.sendOneway(msg); } producer.shutdown(); } }
广播 给所有订阅者广播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class BroadcastProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName" ); producer.start(); for (int i = 0 ; i < 100 ; i++){ Message msg = new Message("TopicTest" , "TagA" , "OrderID188" , "Hello world" .getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n" , sendResult); } producer.shutdown(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class BroadcastConsumer { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name" ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest" , "TagA || TagC || TagD" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n" ); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n" ); } }
定时消息 1.订阅等待消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import java.util.List; public class ScheduledMessageConsumer { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer" ); consumer.subscribe("TestTopic" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later" ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
2.发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class ScheduledMessageProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup" ); producer.start(); int totalMessagesToSend = 100 ; for (int i = 0 ; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic" , ("Hello scheduled message " + i).getBytes()); message.setDelayTimeLevel(3 ); producer.send(message); } producer.shutdown(); } }
批量消息 1 2 3 4 5 6 7 8 9 10 11 String topic = "BatchTest" ; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA" , "OrderID001" , "Hello world 0" .getBytes())); messages.add(new Message(topic, "TagA" , "OrderID002" , "Hello world 1" .getBytes())); messages.add(new Message(topic, "TagA" , "OrderID003" , "Hello world 2" .getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); }
!!因为最大只能发4M消息,最好每个不要超过1M
消息切分 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class ListSplitter implements Iterator <List <Message >> { private final int SIZE_LIMIT = 1000 * 1000 ; private final List<Message> messages; private int currIndex; public ListSplitter (List<Message> messages) { this .messages = messages; } @Override public boolean hasNext () { return currIndex < messages.size(); } @Override public List<Message> next () { int nextIndex = currIndex; int totalSize = 0 ; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20 ; if (tmpSize > SIZE_LIMIT) { if (nextIndex - currIndex == 0 ) { nextIndex++; } break ; } if (tmpSize + totalSize > SIZE_LIMIT) { break ; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } } ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); } }
过滤消息 1 2 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE" ); consumer.subscribe("TOPIC" , "TAGA || TAGB || TAGC" );
1 2 3 4 5 6 7 8 9 10 11 12 13 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name" ); producer.start(); Message msg = new Message("TopicTest" , tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); msg.putUserProperty("a" , String.valueOf(i)); SendResult sendResult = producer.send(msg); producer.shutdown();
1 2 3 4 5 6 7 8 9 10 11 12 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4" ); consumer.subscribe("TopicTest" , MessageSelector.bySql("a between 0 and 3" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
事务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class TransactionProducer { public static void main (String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name" ); producer.setCheckThreadPoolMinSize(2 ); producer.setCheckThreadPoolMaxSize(2 ); producer.setCheckRequestHoldMax(2000 ); producer.setTransactionCheckListener(transactionCheckListener); producer.start(); String[] tags = new String[] {"TagA" , "TagB" , "TagC" , "TagD" , "TagE" }; TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); for (int i = 0 ; i < 100 ; i++) { try { Message msg = new Message("TopicTest" , tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null ); System.out.printf("%s%n" , sendResult); Thread.sleep(10 ); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0 ; i < 100000 ; i++) { Thread.sleep(1000 ); } producer.shutdown(); } }
1 2 3 public interface TransactionCheckListener { LocalTransactionState checkLocalTransactionState (final MessageExt msg) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class TransactionCheckListenerImpl implements TransactionCheckListener { private AtomicInteger transactionIndex = new AtomicInteger(0 ); @Override public LocalTransactionState checkLocalTransactionState (MessageExt msg) { System.out.printf("server checking TrMsg %s%n" , msg); int value = transactionIndex.getAndIncrement(); if ((value % 6 ) == 0 ) { throw new RuntimeException("Could not find db" ); } else if ((value % 5 ) == 0 ) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if ((value % 4 ) == 0 ) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.UNKNOW; } }
log配置http://rocketmq.apache.org/docs/logappender-example/
OpenMessaging http://rocketmq.apache.org/docs/openmessaging-example/
Message Reliablity 影响消息可靠性的几种情况:
Broker正常关闭
Broker异常Crash
OS Crash
机器掉电,但是能立即恢复供电情况。
机器无法开机(可能是cpu、主板、内存等关键设备损坏)
磁盘设备损坏。
(1)、(2)、(3)、(4)四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
(5)、(6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。
RocketMQ从3.0版本开始支持同步双写。
架构
NameServer Cluster 命名服务器提供了轻量级服务发现和路由。每台命名服务器记录了完整的路由信息。提供了一致性读写服务,支持快速存储扩展。
Broker Cluster Brokers专注于消息存储,提供轻量级的TOPIC和QUEUE机制。支持“推”和“拉”模式,包含容错机制(2或3份副本),并提供强大的峰值填充和按原始时间顺序累积数千亿条消息的能力。 此外,Brokers还提供灾难恢复,丰富的指标统计和警报机制。
源码分析 代码分层
脚本源码分析 mqnamesrv.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 # !/bin/sh if [ -z "$ROCKETMQ_HOME" ] ; then # PRG="$0" # need this for relative symlinks while [ -h "$PRG" ] ; do ls=`ls -ld "$PRG"` link=`expr "$ls" : '.*-> \(.*\)$'` if expr "$link" : '/.*' > /dev/null; then PRG="$link" else PRG="`dirname "$PRG"`/$link" fi done saveddir=`pwd` ROCKETMQ_HOME=`dirname "$PRG"`/.. # make it fully qualified ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd` cd "$saveddir" fi export ROCKETMQ_HOME sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@
runserver.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 # !/bin/sh error_exit () { echo "ERROR: $1 !!" exit 1 } [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!" export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export BASE_DIR=$(dirname $0)/.. export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} # =========================================================================================== # JVM Configuration # =========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m" JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -XX:-UseParNewGC" JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib" # JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" $ JAVA ${JAVA_OPT} $@
脚本最后就是通过 java org.apache.rocketmq.namesrv.NamesrvStartup 运行nameserver
$@指的脚本传入的参数
NamesrvStartup 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 public class NamesrvStartup { public static Properties properties = null ; public static CommandLine commandLine = null ; public static void main (String[] args) { main0(args); } public static NamesrvController main0 (String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 4096 ; } if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 4096 ; } try { Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv" , args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1 ); return null ; } final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876 ); if (commandLine.hasOption('c' )) { String file = commandLine.getOptionValue('c' ); if (file != null ) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, " + file + "%n" ); in.close(); } } if (commandLine.hasOption('p' )) { MixAll.printObjectProperties(null , namesrvConfig); MixAll.printObjectProperties(null , nettyServerConfig); System.exit(0 ); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation%n" ); System.exit(-2 ); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml" ); final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); controller.getConfiguration().registerConfig(properties); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3 ); } Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call () throws Exception { controller.shutdown(); return null ; } })); controller.start(); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf(tip + "%n" ); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1 ); } return null ; } }
NamesrvController
namesrvConfig:nameServer的配置
nettyServerConfig:NameServer的netty配置
remotingServer:NameServer 的netty服务器
scheduledExecutorService:routeInfoManager和kvConfigManager使用的定时线程池
remotingExecutor:netty使用的线程池
brokerHosekeppingService:
kvConfigManager:kv配置管理
routeInfoManager:包含broker的ip和对应的队列信息,说明producer可以往哪一个broker发送消息,consumer从哪一个broker pull消息
1 2 3 4 5 6 7 8 9 10 11 12 public NamesrvController (NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this .namesrvConfig = namesrvConfig; this .nettyServerConfig = nettyServerConfig; this .kvConfigManager = new KVConfigManager(this ); this .routeInfoManager = new RouteInfoManager(); this .brokerHousekeepingService = new BrokerHousekeepingService(this ); this .configuration = new Configuration( log, this .namesrvConfig, this .nettyServerConfig ); this .configuration.setStorePathFromConfig(this .namesrvConfig, "configStorePath" ); }
initialize
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public boolean initialize () { this .kvConfigManager.load(); this .remotingServer = new NettyRemotingServer(this .nettyServerConfig, this .brokerHousekeepingService); this .remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_" )); this .registerProcessor(); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .routeInfoManager.scanNotActiveBroker(); } }, 5 , 10 , TimeUnit.SECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .kvConfigManager.printAllPeriodically(); } }, 1 , 10 , TimeUnit.MINUTES); return true ; } private void registerProcessor () { if (namesrvConfig.isClusterTest()) { this .remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this , namesrvConfig.getProductEnvName()), this .remotingExecutor); } else { this .remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this ), this .remotingExecutor); } }
start && shutdown
1 2 3 4 5 6 7 8 9 public void start () throws Exception { this .remotingServer.start(); } public void shutdown () { this .remotingServer.shutdown(); this .remotingExecutor.shutdown(); this .scheduledExecutorService.shutdown(); }
NettyRemotingServer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public NettyRemotingServer (final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { super (nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this .serverBootstrap = new ServerBootstrap(); this .nettyServerConfig = nettyServerConfig; this .channelEventListener = channelEventListener; int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0 ) { publicThreadNums = 4 ; } this .publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0 ); @Override public Thread newThread (Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this .threadIndex.incrementAndGet()); } }); this .eventLoopGroupBoss = new NioEventLoopGroup(1 , new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0 ); @Override public Thread newThread (Runnable r) { return new Thread(r, String.format("NettyBoss_%d" , this .threadIndex.incrementAndGet())); } }); if (useEpoll()) { this .eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0 ); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread (Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d" , threadTotal, this .threadIndex.incrementAndGet())); } }); } else { this .eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0 ); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread (Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d" , threadTotal, this .threadIndex.incrementAndGet())); } }); } }
start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Override public void start () { this .defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0 ); @Override public Thread newThread (Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this .threadIndex.incrementAndGet()); } }); ServerBootstrap childHandler = this .serverBootstrap.group(this .eventLoopGroupBoss, this .eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024 ) .option(ChannelOption.SO_REUSEADDR, true ) .option(ChannelOption.SO_KEEPALIVE, false ) .childOption(ChannelOption.TCP_NODELAY, true ) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this .nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0 , 0 , nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this .serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this .port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException" , e1); } if (this .channelEventListener != null ) { this .nettyEventExecutor.start(); } this .timer.scheduleAtFixedRate(new TimerTask() { @Override public void run () { try { NettyRemotingServer.this .scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception" , e); } } }, 1000 * 3 , 1000 ); }