Spring AOP 源码解析
首先通过继承BeanDefinitionParser的触发过程看http://www1350.github.io/#post/84
通过继承NamespaceHandlerSupport来注册,最核心的类就是AnnotationAwareAspectJAutoProxyCreator
首先通过继承BeanDefinitionParser的触发过程看http://www1350.github.io/#post/84
通过继承NamespaceHandlerSupport来注册,最核心的类就是AnnotationAwareAspectJAutoProxyCreator
我们要使用的时候通常
1 | <bean id="sqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate"> |
1 | public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { |
我们可以注意到这里是使用代理模式,SqlSessionInterceptor实现了InvocationHandler
1 | private class SqlSessionInterceptor implements InvocationHandler { |
SqlSessionManager如何保证线程安全?
他的构造方法是私有的,只能通过newInstance构造
1
2
3
4
5
6
7private SqlSessionManager(SqlSessionFactory sqlSessionFactory) {
this.sqlSessionFactory = sqlSessionFactory;
this.sqlSessionProxy = (SqlSession) Proxy.newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[]{SqlSession.class},
new SqlSessionInterceptor());
}
这里又用了代理
1 | private class SqlSessionInterceptor implements InvocationHandler { |
一般我们这么写
1 | <bean id="mybatisSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> |
我们可以看到SqlSessionFactoryBean的构建,另外通过解析configLocation配置的xml获取Configuration配置
1 | @Override |
所以我们来分析下DefaultSqlSessionFactory
我们知道SqlSessionTemplate是通过DefaultSqlSessionFactory的openSession获取SqlSession
1 | @Override |
1 | <settings> |
Configuration的defaultExecutorType配置,我们回顾下:1.SIMPLE是普通的执行器2.REUSE执行器会重用预处理语句3.BATCH会重用预处理并执行批量更新,我们来看下是如何实现的
1 | public Executor newExecutor(Transaction transaction, ExecutorType executorType) { |
看下SqlSessionTemplate在执行update的时候
1 | @Override |
先看下BATCH,对应BatchExecutor
1 | @Override |
为什么这么写呢。我们回顾下jdbc的用法
1 | //获取连接 |
其中里面事务部分是托管给spring代理的
我们通常会这么使用
1 | <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> |
或
1 | <bean id="baseMapper" class="org.mybatis.spring.mapper.MapperFactoryBean" abstract="true" lazy-init="true"> |
MapperFactoryBean,作用就是把接口进行代理切入
1 | public abstract class DaoSupport implements InitializingBean { |
1 | @Override |
1 | public class MapperProxyFactory<T> { |
最后我们知道方法都通过MapperMethod 了
1 | public Object execute(SqlSession sqlSession, Object[] args) { |
整个过程如下
让我们回顾下http://www1350.github.io/#post/88 的插件写法
1 | <plugins> |
源码如下
1 | //编程api方式加入 |
//责任链模式
1 | public interface Interceptor { |
1 |
|
最后我们知道,入口都在pluginAll,我们看下哪里调用了
1 | public ParameterHandler newParameterHandler(MappedStatement mappedStatement, Object parameterObject, BoundSql boundSql) { |
而newStatementHandler等方法则用在执行器上,所以一路就通了。
SimpleExecutor
1 | @Override |
插件后源码如下:
先看下缓存Cache 类接口
实现类:
我们知道每个sqlSession都有自己的BaseExecutor,每个BaseExecutor都有自己的Local Cache。所以一级缓存是基于sqlSession级别的
一级缓存的配置,共有两个选项,SESSION或者STATEMENT,默认是SESSION级别。
<setting name="localCacheScope" value="SESSION"/>
1 | //BaseExecutor 的query 构建CacheKey |
查询的时候先生成cacheKey,然后使用CacheKey先去缓存查找
1 | @SuppressWarnings("unchecked") |
我们看到BaseExecutor类里有
1 | protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads; |
https://tech.meituan.com/mybatis_cache.html
1 | public ResultSetHandler newResultSetHandler(Executor executor, MappedStatement mappedStatement, RowBounds rowBounds, ParameterHandler parameterHandler, |
最后一般是
1 | @Override |
1 | @Override |
机器码 机器码(machine code)是CPU可直接解读的指令。机器码与硬件等有关,不同的CPU架构支持的硬件码也不相同。
字节码 字节码(bytecode)是一种包含执行程序、由一序列 op 代码/数据对 组成的二进制文件。字节码是一种中间码,它比机器码更抽象,需要直译器转译后才能成为机器码的中间代码。通常情况下它是已经经过编译,但与特定机器码无关。字节码主要为了实现特定软件运行和软件环境、与硬件环境无关。 字节码的实现方式是通过编译器和虚拟机器。编译器将源码编译成字节码,特定平台上的虚拟机器将字节码转译为可以直接执行的指令。 而在Java里,通过类加载器把字节码读入加载并转换成 java.lang.Class类的一个实例。
一个编译后的类文件包含下面的结构:
1 | ClassFile { |
字符 | 描述 |
---|---|
magic, minor_version, major_version | 类文件的版本信息和用于编译这个类的 JDK 版本。 |
constant_pool | 类似于符号表,尽管它包含更多数据。下面有更多的详细描述。 |
access_flags | 提供这个类的描述符列表。 |
this_class | 提供这个类全名的常量池(constant_pool)索引,比如org/jamesdbloom/foo/Bar。 |
super_class | 提供这个类的父类符号引用的常量池索引。 |
interfaces | 指向常量池的索引数组,提供那些被实现的接口的符号引用。 |
fields | 提供每个字段完整描述的常量池索引数组。 |
methods | 指向constant_pool的索引数组,用于表示每个方法签名的完整描述。如果这个方法不是抽象方法也不是 native 方法,那么就会显示这个函数的字节码。 |
attributes | 不同值的数组,表示这个类的附加信息,包括 RetentionPolicy.CLASS 和 RetentionPolicy.RUNTIME 注解。 |
举个简单的例子(例1.1):
1 | public class HelloWorld { |
对应的字节码如下:
1 | public class com.souche.rick.HelloWorld |
注:编译的时候选用-g参数,否则LineNumberTable参数默认是不显示的,maven编译的时候默认是使用-g参数所以不用担心
描述符标识字符含义
标识字符 | 含义 | 标识字符 | 含义 |
---|---|---|---|
B | 基本类型byte | J | 基本类型long |
C | 基本类型char | S | 基本类型short |
D | 基本类型double | Z | 基本类型boolean |
F | 基本类型float | V | 特殊类型void |
I | 基本类型int | L | 对象类型,如Ljava/lang/Object |
对于数组类型,每一唯独使用一个前置的“[”字符描述,如一个定义为“java.lang.String[][]”类型的二维数组,将被记录为“[[Ljava/lang/String”。
当描述符描述方法时,按照先参数列表后返回值的顺序描述,参数列表按照参数的严格顺序放在一组小括号“()”之内。如int getResult()方法的描述符为“()I”。
Method declaration in source file | Method descriptor |
---|---|
void m(int i, float f) | (IF)V |
int m(Object o) | (Ljava/lang/Object;)I |
int[] m(int i, String s) | (ILjava/lang/String;)[I |
Object m(int[] i) | ([I)Ljava/lang/Object; |
关于助记符https://www.cnblogs.com/anbylau2130/p/6078427.html
简单了解了一些基本概念以后,我们正式进入主题:字节码操纵
其实还有另外一种形式,就是直接换掉原来的字节码。一种是在JVM加载用户的Class时,拦截,返回修改后的字节码。另外一种在运行时,使用Instrumentation.redefineClasses方法来替换掉原来的字节码,和这个类相关的实例立即生效。不过这种形式需要采用agent形式启动。有兴趣的同学可以自行搜索下相关内容。
操纵字节码可以在字节码被载入类加载器之前修改字节码二进制文件,从而做到一些技术上难以实现的功能,如:AOP增强、性能分析、调试跟踪、日志记录、bug定位、混淆代码,甚至连Scala、Groovy和Grails等JVM语言都用到大量的操纵字节码技术。
ASM主要通过树这种数据结构来表示复杂的字节码结构,通过Visitor设计模式来实现(详细见手册)
1 | package com.souche.rick.asm; |
效果:
1 | public class com.souche.rick.HelloWorld |
1 | public class ClassPrinter extends ClassVisitor { |
3.转化一个类
spring里面有一个类叫LocalVariableTableParameterNameDiscoverer
,它可以用来获取参数名列表,其实内部就是使用了ASM,
1 | private static class ParameterNameDiscoveringVisitor extends ClassVisitor { |
1 | private static class LocalVariableTableVisitor extends MethodVisitor { |
注:一个局部变量表的占用了32位的存储空间(一个存储单位称之为slot,槽),所以可以存储一个boolean、byte、char、short、float、int、refrence和returnAdress数据,long和double需要2个连续的局部变量表来保存,通过较小位置的索引来获取。如果被调用的是实例方法,那么第0个位置存储“this”关键字代表当前实例对象的引用。
里面重点关注visitLocalVariable方法,其他的忽略,是不是就觉得简单多了?
Javassist和其他的类似库不同的是,Javassist并不要求开发者对字节码方面具有多么深入的了解,同样的,它也允许开发者忽略被修改的类本身的细节和结构。相对其他如ASM显得更为简单,当然性能也较之更低下。
ClassPool.getDefault()
只是搜索JVM的同路径下的classnew ClassPool(true)
当ClassPool没被引用的时候,JVM的垃圾收集会收集该类1 | ClassPool#insertClassPath(ClassPath) |
CtClass ct = mPool.get(name)
通过类池获取类 CtClass ct = mPool.makeClass(mClassName)
创建一个类CtMethod m = cc.getDeclaredMethod("say")
获取方法 Class c = cc.toClass()
转化为Class byte[] b = cc.toBytecode()
转化为字节码二进制还是那个例子,不过Javassist是不是简单多了
1 | public class HelloWorld { |
参数 | 含义 |
---|---|
$0, $1, $2, … | $0为this ,其他为参数 |
$args | 数组参数Object[] |
$$ | 所有参数m($$) 相当于m($1,$2,…) (除了this) |
$cflow(…) | cflow 变量 |
$r | 结果类型 |
$w | 包装类型 |
$_ | 结果值 |
$sig | java.lang.Class数组对象,用来表示每个参数的类型 |
$type | java.lang.Class 对象表示结果类型 |
$class | A java.lang.Class 表示现在这个类的类型 |
例子:
1 | class Point { |
结果
1 | class Point { |
dubbo的ReferenceBean(dubbo调用方)里面创建代理就运用了Javassist技术来获取接口生成代理类:
只看部分关键代码:
1 | ClassGenerator ccp = null, ccm = null; |
如果有一个接口,被dubbo:reference引用,假设包名是com.souche.rick
1 | public interface AuthService { |
1.委托类
1 | public class com.souche.rick.proxy1 implements AuthService{ |
1 | public class Proxy + id extends Proxy{ |
此外还有非常多的工具使用了字节码操纵技术,比如fastjson(ASM)、hibernate(cglib、javaassist)、zorka(ASM)、Btrace(ASM)
参考:
https://www.cnblogs.com/qiumingcheng/p/5400265.html
https://www.ibm.com/developerworks/cn/java/j-lo-classloader/index.html
地址:https://github.com/btraceio/btrace
BTrace 是基于动态字节码修改技术(Hotswap)来实现运行时 java 程序的跟踪和替换。大体的原理可以用下面的公式描述:Client(Java compile api + attach api) + Agent(脚本解析引擎 + ASM + JDK6 Instumentation) + Socket其实 BTrace 就是使用了 java attach api 附加 agent.jar ,然后使用脚本解析引擎+asm来重写指定类的字节码,再使用 instrument 实现对原有类的替换。
虽然BTrace很强大,但Btrace脚本就是一个普通的用@Btrace注解的Java类,其中包含一个或多个public static void修饰的方法,为了保证对目标程序不造成影响,Btrace脚本对其可以执行的动作做了很多限制
https://github.com/btraceio/btrace/releases/tag/v1.3.10.1 下载到包解压
1 | export JAVA_HOME=/home/wenwei/jdk1.8.0_111 |
执行之前可以用预编译命令检查脚本的正确性,预编译命令为 btracec,它是一个 javac-like 命令
btracec JStack.java
1. btrace [-I <include-path>] [-p <port>] [-cp <classpath>] <pid> <btrace-script> [<args>]
例如:btrace -cp lib/servlet-api.jar -p 2021 53523 JStack.java
2.java -javaagent:btrace-agent.jar=[<agent-arg>[,<agent-arg>]*]? <launch-args>
参数:
1 | package com.sun.btrace.samples; |
可以用表达式,批量定义需要监控的类与方法。正则表达式需要写在两个 “/“ 中间。
下例监控javax.swing下的所有类的所有方法….可能会非常慢,建议范围还是窄些。
1 | @OnMethod(clazz="/javax\\.swing\\..*/", method="/.*/") |
通过在拦截函数的定义里注入@ProbeClassName String probeClass, @ProbeMethodName String probeMethod 参数,告诉脚本实际匹配到的类和方法名。
另一个例子,监控Statement的executeUpdate(), executeQuery() 和 executeBatch() 三个方法,见JdbcQueries.java
比如我想匹配所有的Filter类,在接口或基类的名称前面,加个+ 就行@OnMethod(clazz="+com.vip.demo.Filter", method="doFilter")
也可以按类或方法上的annotaiton匹配,前面加上@就行@OnMethod(clazz="@javax.jws.WebService", method="@javax.jws.WebMethod")
构造函数的名字是 @OnMethod(clazz="java.net.ServerSocket", method="<init>")
静态内部类的写法,是在类与内部类之间加上”$”@OnMethod(clazz="com.vip.MyServer$MyInnerClass", method="hello")
如果有多个同名的函数,想区分开来,可以在拦截函数上定义不同的参数列表(见4.1)。
可以为同一个函数的不同的Location,分别定义多个拦截函数。
@OnMethod( clazz="java.net.ServerSocket", method="bind" )
不写Location,默认就是刚进入函数的时候(Kind.ENTRY)。
但如果你想获得函数的返回结果或执行时间,则必须把切入点定在返回(Kind.RETURN)时。
1 | OnMethod(clazz = "java.net.ServerSocket", method = "getLocalPort", location = @Location(Kind.RETURN)) |
duration的单位是纳秒,要除以 1,000,000 才是毫秒。
异常抛出(Throw),异常被捕获(Catch),异常没被捕获被抛出函数之外(Error),主要用于对某些异常情况的跟踪。
在拦截函数的参数定义里注入一个Throwable的参数,代表异常。
1 | @OnMethod(clazz = "java.net.ServerSocket", method = "bind", location = @Location(Kind.ERROR)) |
下例定义监控bind()函数里调用的所有其他函数:
1 | @OnMethod(clazz = "java.net.ServerSocket", method = "bind", location = @Location(value = Kind.CALL, clazz = "/.*/", method = "/.*/", where = Where.AFTER)) |
所调用的类及方法名所注入到@TargetInstance与 @TargetMethodOrField中。
静态函数中,instance的值为空。如果想获得执行时间,必须把Where定义成AFTER。
如果想获得执行时间,必须 把Where定义成AFTER。
注意这里,一定不要像下面这样大范围的匹配,否则这性能是神仙也没法救了:
@OnMethod(clazz = "/javax\\.swing\\..*/", method = "/.*/", location = @Location(value = Kind.CALL, clazz = "/.*/", method = "/.*/"))
下例监控代码是否到达了Socket类的第363行。
1 | @OnMethod(clazz = "java.net.ServerSocket", location = @Location(value = Kind.LINE, line = 363)) |
line还可以为-1,然后每行都会打印出来,加参数int line 获得的当前行数。此时会显示函数里完整的执行路径,但肯定又非常慢。
1 | import com.sun.btrace.AnyType; |
如果想打印它们,首先按顺序定义用@Self 注释的this, 完整的参数列表,以及用@Return 注释的返回值。
需要打印哪个就定义哪个,不需要的就不要定义。但定义一定要按顺序,比如参数列表不能跑到返回值的后面。
Self:
如果是静态函数, self为空。
前面提到,如果上述使用了非JDK的类,命令行里要指定classpath。不过,如前所述,因为BTrace里不允许调用类的方法,所以定义具体类很多时候也没意思,所以self定义为Object就够了。
参数:
参数数列表要么不要定义,要定义就要定义完整,否则BTrace无法处理不同参数的同名函数。
如果有些参数你实在不想引入非JDK类,又不会造成同名函数不可区分,可以用AnyType来定义(不能用Object)。
如果拦截点用正则表达式中匹配了多个函数,函数之间的参数个数不一样,你又还是想把参数打印出来时,可以用AnyType[] args来定义。
但不知道是不是当前版本的bug,AnyType[] args 不能和 location=Kind.RETURN 同用,否则会进入一种奇怪的静默状态,只要有一个函数定义错了,整个Btrace就什么都打印不出来。
结果:
同理,结果也可以用AnyType来定义,特别是用正则表达式匹配多个函数的时候,连void都可以表示。
再次强调,为了保证性能不受影响,Btrace不允许调用任何实例方法。
比如不能调用getter方法(怕在getter里有复杂的计算),只会通过直接反射来读取属性名。
又比如,除了JDK类,其他类toString时只会打印其类名+System.IdentityHashCode。
println, printArray,都按上面的规律进行,所以只能打打基本类型。
如果想打印一个Object的属性,用printFields()来反射。
如果只想反射某个属性,参照下面打印Port属性的写法。从性能考虑,应把field用静态变量缓存起来。
注意JDK类与非JDK类的区别:
1 | import java.lang.reflect.Field; |
如果要多个拦截函数之间要通信,可以使用@TLS定义 ThreadLocal的变量来共享
1 | @TLS |
下例打印所有用时超过1毫秒的filter。
1 | @OnMethod(clazz = "+com.vip.demo.Filter", method = "doFilter", location = @Location(Kind.RETURN)) |
最好能抽取了打印耗时的函数,减少代码重复度。
定位到某一个Filter慢了之后,可以直接用Location(Kind.CALL),进一步找出它里面的哪一步慢了。
比如,谁调用了System.gc() ?
1 | @OnMethod(clazz = "java.lang.System", method = "gc") |
按之前的提示,自己组合一下即可。
如果你已经看到了这里,那基本也不用我再啰嗦了,自己看Samples的Histogram.java, HistoOnEvent.java
可以用AtomicInteger构造计数器,然后定时(@OnTimer),或根据事件(@OnEvent)输出结果(ctrl+c后选择发送事件)。
TreeNode类似一颗红黑树
看下普通treeMap
https://juejin.im/post/5a0658f76fb9a04523415a8d
基本结构:
1 | static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> { |
1 | //返回根结点 |
1 | static <K,V> void moveRootToFront(Node<K,V>[] tab, TreeNode<K,V> root) { |
1 | final TreeNode<K,V> find(int h, Object k, Class<?> kc) { |
1 | final TreeNode<K,V> getTreeNode(int h, Object k) { |
1 | static int tieBreakOrder(Object a, Object b) { |
hd.treeify(tab);
1 | final void treeify(Node<K,V>[] tab) { |
1 | static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, |
1 | final Node<K,V> untreeify(HashMap<K,V> map) { |
1 | final TreeNode<K,V> putTreeVal(HashMap<K,V> map, Node<K,V>[] tab, |
1 | final void removeTreeNode(HashMap<K,V> map, Node<K,V>[] tab, |
当扩容的时候,节点使用split,((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
1 | final void split(HashMap<K,V> map, Node<K,V>[] tab, int index, int bit) { |
1 | static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, |
1 | static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, |
1 |
|
1 | static <K,V> boolean checkInvariants(TreeNode<K,V> t) { |
treeifyBin
1 | /** |
入门使用 :http://rocketmq.apache.org/docs/quick-start/
下载
1 | unzip rocketmq-all-4.2.0-source-release.zip |
Start Name Server
1 | nohup sh bin/mqnamesrv & |
The Name Server boot success…
Start Broker
1 | nohup sh bin/mqbroker -n localhost:9876 & |
The broker[%s, 172.30.30.233:10911] boot success…
1 | public class SyncProducer { |
1 | public class AsyncProducer { |
1 | public class Consumer { |
弱可靠
1 | public class OnewayProducer { |
给所有订阅者广播
1 | public class BroadcastProducer { |
1 | public class BroadcastConsumer { |
1.订阅等待消息
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
2.发送
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
1 | String topic = "BatchTest"; |
!!因为最大只能发4M消息,最好每个不要超过1M
1 | public class ListSplitter implements Iterator<List<Message>> { |
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); |
1 | DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); |
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); |
1 | public class TransactionProducer { |
1 | public interface TransactionCheckListener { |
1 | public class TransactionCheckListenerImpl implements TransactionCheckListener { |
log配置http://rocketmq.apache.org/docs/logappender-example/
http://rocketmq.apache.org/docs/openmessaging-example/
影响消息可靠性的几种情况:
(1)、(2)、(3)、(4)四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
(5)、(6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。
RocketMQ从3.0版本开始支持同步双写。
命名服务器提供了轻量级服务发现和路由。每台命名服务器记录了完整的路由信息。提供了一致性读写服务,支持快速存储扩展。
Brokers专注于消息存储,提供轻量级的TOPIC和QUEUE机制。支持“推”和“拉”模式,包含容错机制(2或3份副本),并提供强大的峰值填充和按原始时间顺序累积数千亿条消息的能力。 此外,Brokers还提供灾难恢复,丰富的指标统计和警报机制。
mqnamesrv.sh
1 | #!/bin/sh |
runserver.sh
1 | #!/bin/sh |
脚本最后就是通过 java org.apache.rocketmq.namesrv.NamesrvStartup 运行nameserver
$@指的脚本传入的参数
1 | public class NamesrvStartup { |
1 | public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { |
initialize
1 | public boolean initialize() { |
start && shutdown
1 | public void start() throws Exception { |
1 | public NettyRemotingServer(final NettyServerConfig nettyServerConfig, |
start
1 | @Override |
CAP
CAP由Eric Brewer在2000年PODC会议上提出[1][2],是Eric Brewer在Inktomi[3]期间研发搜索引擎、分布式web缓存时得出的关于数据一致性(consistency)、服务可用性(availability)、分区容错性(partition-tolerance)的猜想:
在某时刻如果满足AP,分隔的节点同时对外服务但不能相互通信,将导致状态不一致,即不能满足C;如果满足CP,网络分区的情况下为达成C,请求只能一直等待,即不满足A;如果要满足CA,在一定时间内要达到节点状态一致,要求不能出现网络分区,则不能满足P。
C、A、P三者最多只能满足其中两个,和FLP定理一样,CAP定理也指示了一个不可达的结果(impossibility result)。
在满足分区容错的前提下,没有算法能同时满足数据一致性和服务可用性[11]:
P 是必选项,那3选2的选择题不就变成数据一致性(consistency)、服务可用性(availability) 2选1?工程实践中一致性有不同程度,可用性也有不同等级,在保证分区容错性的前提下,放宽约束后可以兼顾一致性和可用性,两者不是非此即彼[12]。
一般要让springmvc生效,我们都在web.xml配上这么一段
1 | <servlet> |
记得以前很早的时候刚开始学java web的时候都是直接继承HttpServlet然后写的,这个当然也不例外,
Servlet接口 包括 public void init(ServletConfig config) throws ServletException;
、 public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException;
、public void destroy();
在DispatcherServlet父类FrameworkServlet
1 | @Override |
最后根据请求调用的又是FrameworkServlet的
1 | @Override |
接下来
1 | protected final void processRequest(HttpServletRequest request, HttpServletResponse response) |
DispatcherServlet
1 | @Override |
doDispatch
1 | protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception { |
getHandler
1 | protected HandlerExecutionChain getHandler(HttpServletRequest request) throws Exception { |
getHandlerAdapter
1 | protected HandlerAdapter getHandlerAdapter(Object handler) throws ServletException { |
processDispatchResult
1 | private void processDispatchResult(HttpServletRequest request, HttpServletResponse response, |
1请求对应的控制器
2拦截器
1 | boolean applyPreHandle(HttpServletRequest request, HttpServletResponse response) throws Exception { |
初始化:
1 | private void initHandlerAdapters(ApplicationContext context) { |
AbstractHandlerMethodAdapter#handle
1 | @Override |
ServletInvocableHandlerMethod
1 | public void invokeAndHandle(ServletWebRequest webRequest, |
题外,这个是如何初始化的:
1 | private void initHandlerMappings(ApplicationContext context) { |
1.redis数据结构
redis专门封装了一个叫SDS的数据结构
1 | struct sdshdr { |
优点:
char *strcat(char *dest, const char *src);
因为 C 字符串不记录自身的长度, 所以 strcat 假定用户在执行这个函数时, 已经为 dest 分配了足够多的内存, 可以容纳 src 字符串中的所有内容, 而一旦这个假定不成立时, 就会产生缓冲区溢出。
举个例子, 假设程序里有两个在内存中紧邻着的 C 字符串 s1 和 s2 , 其中 s1 保存了字符串 “Redis” , 而 s2 则保存了字符串 “MongoDB”, 如图 2-7 所示。
strcat(s1, " Cluster");
将 s1 的内容修改为 “Redis Cluster” , 但粗心的他却忘了在执行 strcat 之前为 s1 分配足够的空间, 那么在 strcat 函数执行之后, s1 的数据将溢出到 s2 所在的空间中, 导致 s2 保存的内容被意外地修改, 如图 2-8 所示。
与 C 字符串不同, SDS 的空间分配策略完全杜绝了发生缓冲区溢出的可能性: 当 SDS API 需要对 SDS 进行修改时, API 会先检查 SDS 的空间是否满足修改所需的要求, 如果不满足的话, API 会自动将 SDS 的空间扩展至执行修改所需的大小, 然后才执行实际的修改操作, 所以使用 SDS 既不需要手动修改 SDS 的空间大小, 也不会出现前面所说的缓冲区溢出问题。
举个例子, SDS 的 API 里面也有一个用于执行拼接操作的 sdscat 函数, 它可以将一个 C 字符串拼接到给定 SDS 所保存的字符串的后面, 但是在执行拼接操作之前, sdscat 会先检查给定 SDS 的空间是否足够, 如果不够的话, sdscat 就会先扩展 SDS 的空间, 然后才执行拼接操作。
如果程序执行的是增长字符串的操作, 比如拼接操作(append), 那么在执行这个操作之前, 程序需要先通过内存重分配来扩展底层数组的空间大小 —— 如果忘了这一步就会产生缓冲区溢出。
如果程序执行的是缩短字符串的操作, 比如截断操作(trim), 那么在执行这个操作之后, 程序需要通过内存重分配来释放字符串不再使用的那部分空间 —— 如果忘了这一步就会产生内存泄漏。
举个例子, 如果我们持有一个值为 “Redis” 的 C 字符串 s , 那么为了将 s 的值改为 “Redis Cluster” , 在执行:strcat(s, " Cluster");
为了避免 C 字符串的这种缺陷, SDS 通过未使用空间解除了字符串长度和底层数组长度之间的关联: 在 SDS 中, buf 数组的长度不一定就是字符数量加一, 数组里面可以包含未使用的字节, 而这些字节的数量就由 SDS 的 free 属性记录。
通过未使用空间, SDS 实现了空间预分配和惰性空间释放两种优化策略。
其中, 额外分配的未使用空间数量由以下公式决定:
如果对 SDS 进行修改之后, SDS 的长度(也即是 len 属性的值)将小于 1 MB , 那么程序分配和 len 属性同样大小的未使用空间, 这时 SDS len 属性的值将和 free 属性的值相同。 举个例子, 如果进行修改之后, SDS 的 len 将变成 13 字节, 那么程序也会分配13 字节的未使用空间, SDS 的 buf 数组的实际长度将变成 13 + 13 + 1 = 27 字节(额外的一字节用于保存空字符)。
如果对 SDS 进行修改之后, SDS 的长度将大于等于 1 MB , 那么程序会分配 1 MB 的未使用空间。 举个例子, 如果进行修改之后, SDS 的 len 将变成 30 MB , 那么程序会分配 1 MB 的未使用空间, SDS 的 buf 数组的实际长度将为 30 MB + 1 MB + 1 byte 。
sdscat(s, " Cluster");
那么 sdscat 将执行一次内存重分配操作, 将 SDS 的长度修改为 13 字节, 并将 SDS 的未使用空间同样修改为 13 字节, 如图 2-12 所示。
如果这时, 我们再次对 s 执行:sdscat(s, " Tutorial");
那么这次 sdscat 将不需要执行内存重分配: 因为未使用空间里面的 13 字节足以保存 9 字节的 “ Tutorial” , 执行 sdscat 之后的 SDS 如图 2-13 所示。
在扩展 SDS 空间之前, SDS API 会先检查未使用空间是否足够, 如果足够的话, API 就会直接使用未使用空间, 而无须执行内存重分配。
通过这种预分配策略, SDS 将连续增长 N 次字符串所需的内存重分配次数从必定 N 次降低为最多 N 次。
惰性空间释放用于优化 SDS 的字符串缩短操作: 当 SDS 的 API 需要缩短 SDS 保存的字符串时, 程序并不立即使用内存重分配来回收缩短后多出来的字节, 而是使用 free 属性将这些字节的数量记录起来, 并等待将来使用。
举个例子, sdstrim 函数接受一个 SDS 和一个 C 字符串作为参数, 从 SDS 左右两端分别移除所有在 C 字符串中出现过的字符。
sdstrim(s, “XY”); // 移除 SDS 字符串中的所有 ‘X’ 和 ‘Y’
会将 SDS 修改成图 2-15 所示的样子。
注意执行 sdstrim 之后的 SDS 并没有释放多出来的 8 字节空间, 而是将这 8 字节空间作为未使用空间保留在了 SDS 里面, 如果将来要对 SDS 进行增长操作的话, 这些未使用空间就可能会派上用场。
1 | redis> LLEN integers |
1 | typedef struct listNode { |
多个 listNode 可以通过 prev 和 next 指针组成双端链表, 如图 3-1 所示。
1 | typedef struct list { |
list 结构为链表提供了表头指针 head 、表尾指针 tail , 以及链表长度计数器 len , 而 dup 、 free 和 match 成员则是用于实现多态链表所需的类型特定函数:
dup 函数用于复制链表节点所保存的值;
free 函数用于释放链表节点所保存的值;
match 函数则用于对比链表节点所保存的值和另一个输入值是否相等。
Redis 的链表实现的特性可以总结如下:
字典, 又称符号表(symbol table)、关联数组(associative array)或者映射(map), 是一种用于保存键值对(key-value pair)的抽象数据结构。
1 | redis> HLEN website |
其中一个键值对的键为 “Redis” , 值为 “Redis.io” 。
另一个键值对的键为 “MariaDB” , 值为 “MariaDB.org” ;
还有一个键值对的键为 “MongoDB” , 值为 “MongoDB.org” ;
1 | typedef struct dictht { |
table 属性是一个数组, 数组中的每个元素都是一个指向 dict.h/dictEntry 结构的指针, 每个 dictEntry 结构保存着一个键值对。
size 属性记录了哈希表的大小, 也即是 table 数组的大小, 而 used 属性则记录了哈希表目前已有节点(键值对)的数量。
sizemask 属性的值总是等于 size - 1 , 这个属性和哈希值一起决定一个键应该被放到 table 数组的哪个索引上面。
1 | typedef struct dictEntry { |
key 属性保存着键值对中的键, 而 v 属性则保存着键值对中的值, 其中键值对的值可以是一个指针, 或者是一个 uint64_t 整数, 又或者是一个 int64_t 整数。
next 属性是指向另一个哈希表节点的指针, 这个指针可以将多个哈希值相同的键值对连接在一次, 以此来解决键冲突(collision)的问题。
举个例子, 图 4-2 就展示了如何通过 next 指针, 将两个索引值相同的键 k1 和 k0 连接在一起。
1 | typedef struct dict { |
type 属性和 privdata 属性是针对不同类型的键值对, 为创建多态字典而设置的:
type 属性是一个指向 dictType 结构的指针, 每个 dictType 结构保存了一簇用于操作特定类型键值对的函数, Redis 会为用途不同的字典设置不同的类型特定函数。
而 privdata 属性则保存了需要传给那些类型特定函数的可选参数。
1 | typedef struct dictType { |
ht 属性是一个包含两个项的数组, 数组中的每个项都是一个 dictht 哈希表, 一般情况下, 字典只使用 ht[0] 哈希表, ht[1] 哈希表只会在对 ht[0] 哈希表进行 rehash 时使用。
除了 ht[1] 之外, 另一个和 rehash 有关的属性就是 rehashidx : 它记录了 rehash 目前的进度, 如果目前没有在进行 rehash , 那么它的值为 -1 。
图 4-3 展示了一个普通状态下(没有进行 rehash)的字典:
当要将一个新的键值对添加到字典里面时, 程序需要先根据键值对的键计算出哈希值和索引值, 然后再根据索引值, 将包含新键值对的哈希表节点放到哈希表数组的指定索引上面。
Redis 计算哈希值和索引值的方法如下:
1 | # 使用字典设置的哈希函数,计算键 key 的哈希值 |
举个例子, 对于图 4-4 所示的字典来说, 如果我们要将一个键值对 k0 和 v0 添加到字典里面, 那么程序会先使用语句:hash = dict->type->hashFunction(k0);
计算键 k0 的哈希值。
假设计算得出的哈希值为 8 , 那么程序会继续使用语句:index = hash & dict->ht[0].sizemask = 8 & 3 = 0;
计算出键 k0 的索引值 0 , 这表示包含键值对 k0 和 v0 的节点应该被放置到哈希表数组的索引 0 位置上, 如图 4-5 所示。
当字典被用作数据库的底层实现, 或者哈希键的底层实现时, Redis 使用 MurmurHash2 算法来计算键的哈希值。
MurmurHash 算法最初由 Austin Appleby 于 2008 年发明, 这种算法的优点在于, 即使输入的键是有规律的, 算法仍能给出一个很好的随机分布性, 并且算法的计算速度也非常快。
MurmurHash 算法目前的最新版本为 MurmurHash3 , 而 Redis 使用的是 MurmurHash2 , 关于 MurmurHash 算法的更多信息可以参考该算法的主页: http://code.google.com/p/smhasher/
当有两个或以上数量的键被分配到了哈希表数组的同一个索引上面时, 我们称这些键发生了冲突(collision)。
Redis 的哈希表使用链地址法(separate chaining)来解决键冲突: 每个哈希表节点都有一个 next 指针, 多个哈希表节点可以用 next 指针构成一个单向链表, 被分配到同一个索引上的多个节点可以用这个单向链表连接起来, 这就解决了键冲突的问题。
举个例子, 假设程序要将键值对 k2 和 v2 添加到图 4-6 所示的哈希表里面, 并且计算得出 k2 的索引值为 2 , 那么键 k1 和 k2 将产生冲突, 而解决冲突的办法就是使用 next 指针将键 k2 和 k1 所在的节点连接起来, 如图 4-7 所示。
因为 dictEntry 节点组成的链表没有指向链表表尾的指针, 所以为了速度考虑, 程序总是将新节点添加到链表的表头位置(复杂度为 O(1)), 排在其他已有节点的前面。
扩展和收缩哈希表的工作可以通过执行 rehash (重新散列)操作来完成, Redis 对字典的哈希表执行 rehash 的步骤如下:
为字典的 ht[1] 哈希表分配空间, 这个哈希表的空间大小取决于要执行的操作, 以及 ht[0] 当前包含的键值对数量 (也即是ht[0].used 属性的值):
如果执行的是扩展操作, 那么 ht[1] 的大小为第一个大于等于 ht[0].used * 2 的 2^n (2 的 n 次方幂);
如果执行的是收缩操作, 那么 ht[1] 的大小为第一个大于等于 ht[0].used 的 2^n 。
将保存在 ht[0] 中的所有键值对 rehash 到 ht[1] 上面: rehash 指的是重新计算键的哈希值和索引值, 然后将键值对放置到 ht[1] 哈希表的指定位置上。
当 ht[0] 包含的所有键值对都迁移到了 ht[1] 之后 (ht[0] 变为空表), 释放 ht[0] , 将 ht[1] 设置为 ht[0] , 并在 ht[1] 新创建一个空白哈希表, 为下一次 rehash 做准备。
举个例子, 假设程序要对图 4-8 所示字典的 ht[0] 进行扩展操作, 那么程序将执行以下步骤:
ht[0].used 当前的值为 4 , 4 * 2 = 8 , 而 8 (2^3)恰好是第一个大于等于 4 的 2 的 n 次方, 所以程序会将 ht[1] 哈希表的大小设置为 8 。 图 4-9 展示了 ht[1] 在分配空间之后, 字典的样子。
将 ht[0] 包含的四个键值对都 rehash 到 ht[1] , 如图 4-10 所示。
释放 ht[0] ,并将 ht[1] 设置为 ht[0] ,然后为 ht[1] 分配一个空白哈希表,如图 4-11 所示。
至此, 对哈希表的扩展操作执行完毕, 程序成功将哈希表的大小从原来的 4 改为了现在的 8 。
当以下条件中的任意一个被满足时, 程序会自动开始对哈希表执行扩展操作:
服务器目前没有在执行 BGSAVE 命令或者 BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 1 ;
服务器目前正在执行 BGSAVE 命令或者 BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 5 ;
其中哈希表的负载因子可以通过公式:
1 | # 负载因子 = 哈希表已保存节点数量 / 哈希表大小 |
计算得出。
比如说, 对于一个大小为 4 , 包含 4 个键值对的哈希表来说, 这个哈希表的负载因子为:load_factor = 4 / 4 = 1
又比如说, 对于一个大小为 512 , 包含 256 个键值对的哈希表来说, 这个哈希表的负载因子为:load_factor = 256 / 512 = 0.5
根据 BGSAVE 命令或 BGREWRITEAOF 命令是否正在执行, 服务器执行扩展操作所需的负载因子并不相同, 这是因为在执行 BGSAVE 命令或BGREWRITEAOF 命令的过程中, Redis 需要创建当前服务器进程的子进程, 而大多数操作系统都采用写时复制(copy-on-write)技术来优化子进程的使用效率, 所以在子进程存在期间, 服务器会提高执行扩展操作所需的负载因子, 从而尽可能地避免在子进程存在期间进行哈希表扩展操作, 这可以避免不必要的内存写入操作, 最大限度地节约内存。
另一方面, 当哈希表的负载因子小于 0.1 时, 程序自动开始对哈希表执行收缩操作。
这样做的原因在于, 如果 ht[0] 里只保存着四个键值对, 那么服务器可以在瞬间就将这些键值对全部 rehash 到 ht[1] ; 但是, 如果哈希表里保存的键值对数量不是四个, 而是四百万、四千万甚至四亿个键值对, 那么要一次性将这些键值对全部 rehash 到 ht[1] 的话, 庞大的计算量可能会导致服务器在一段时间内停止服务。
因此, 为了避免 rehash 对服务器性能造成影响, 服务器不是一次性将 ht[0] 里面的所有键值对全部 rehash 到 ht[1] , 而是分多次、渐进式地将 ht[0] 里面的键值对慢慢地 rehash 到 ht[1] 。
以下是哈希表渐进式 rehash 的详细步骤:
https://www.kancloud.cn/kancloud/redisbook/63842
跳跃表(skiplist)是一种有序数据结构, 它通过在每个节点中维持多个指向其他节点的指针, 从而达到快速访问节点的目的。
跳跃表支持平均 O(\log N) 最坏 O(N) 复杂度的节点查找, 还可以通过顺序性操作来批量处理节点。
在大部分情况下, 跳跃表的效率可以和平衡树相媲美, 并且因为跳跃表的实现比平衡树要来得更为简单, 所以有不少程序都使用跳跃表来代替平衡树。
Redis 使用跳跃表作为有序集合键的底层实现之一: 如果一个有序集合包含的元素数量比较多, 又或者有序集合中元素的成员(member)是比较长的字符串时, Redis 就会使用跳跃表来作为有序集合键的底层实现。
举个例子, fruit-price 是一个有序集合键, 这个有序集合以水果名为成员, 水果价钱为分值, 保存了 130 款水果的价钱:
1 | redis> ZRANGE fruit-price 0 2 WITHSCORES |
fruit-price 有序集合的所有数据都保存在一个跳跃表里面, 其中每个跳跃表节点(node)都保存了一款水果的价钱信息, 所有水果按价钱的高低从低到高在跳跃表里面排序:
和链表、字典等数据结构被广泛地应用在 Redis 内部不同, Redis 只在两个地方用到了跳跃表, 一个是实现有序集合键, 另一个是在集群节点中用作内部数据结构, 除此之外, 跳跃表在 Redis 里面没有其他用途。
Redis 的跳跃表由 redis.h/zskiplistNode 和 redis.h/zskiplist 两个结构定义, 其中 zskiplistNode 结构用于表示跳跃表节点, 而 zskiplist结构则用于保存跳跃表节点的相关信息, 比如节点的数量, 以及指向表头节点和表尾节点的指针, 等等。
图 5-1 展示了一个跳跃表示例, 位于图片最左边的是 zskiplist 结构, 该结构包含以下属性:
header :指向跳跃表的表头节点。
tail :指向跳跃表的表尾节点。
level :记录目前跳跃表内,层数最大的那个节点的层数(表头节点的层数不计算在内)。
length :记录跳跃表的长度,也即是,跳跃表目前包含节点的数量(表头节点不计算在内)。
位于 zskiplist 结构右方的是四个 zskiplistNode 结构, 该结构包含以下属性:
层(level):节点中用 L1 、 L2 、 L3 等字样标记节点的各个层, L1 代表第一层, L2 代表第二层,以此类推。每个层都带有两个属性:前进指针和跨度。前进指针用于访问位于表尾方向的其他节点,而跨度则记录了前进指针所指向节点和当前节点的距离。在上面的图片中,连线上带有数字的箭头就代表前进指针,而那个数字就是跨度。当程序从表头向表尾进行遍历时,访问会沿着层的前进指针进行。
后退(backward)指针:节点中用 BW 字样标记节点的后退指针,它指向位于当前节点的前一个节点。后退指针在程序从表尾向表头遍历时使用。
分值(score):各个节点中的 1.0 、 2.0 和 3.0 是节点所保存的分值。在跳跃表中,节点按各自所保存的分值从小到大排列。
成员对象(obj):各个节点中的 o1 、 o2 和 o3 是节点所保存的成员对象。
注意表头节点和其他节点的构造是一样的: 表头节点也有后退指针、分值和成员对象, 不过表头节点的这些属性都不会被用到, 所以图中省略了这些部分, 只显示了表头节点的各个层。
1 | typedef struct zskiplistNode { |
每次创建一个新跳跃表节点的时候, 程序都根据幂次定律 (power law,越大的数出现的概率越小) 随机生成一个介于 1 和 32 之间的值作为 level 数组的大小, 这个大小就是层的“高度”。
每个层都有一个指向表尾方向的前进指针(level[i].forward 属性), 用于从表头向表尾方向访问节点。
图 5-3 用虚线表示出了程序从表头向表尾方向, 遍历跳跃表中所有节点的路径:
1.迭代程序首先访问跳跃表的第一个节点(表头), 然后从第四层的前进指针移动到表中的第二个节点。
2.在第二个节点时, 程序沿着第二层的前进指针移动到表中的第三个节点。
3.在第三个节点时, 程序同样沿着第二层的前进指针移动到表中的第四个节点。
4.当程序再次沿着第四个节点的前进指针移动时, 它碰到一个 NULL , 程序知道这时已经到达了跳跃表的表尾, 于是结束这次遍历。
举个例子, 图 5-4 用虚线标记了在跳跃表中查找分值为 3.0 、 成员对象为 o3 的节点时, 沿途经历的层: 查找的过程只经过了一个层, 并且层的跨度为 3 , 所以目标节点在跳跃表中的排位为 3 。
节点的后退指针(backward 属性)用于从表尾向表头方向访问节点: 跟可以一次跳过多个节点的前进指针不同, 因为每个节点只有一个后退指针, 所以每次只能后退至前一个节点。
图 5-6 用虚线展示了如果从表尾向表头遍历跳跃表中的所有节点: 程序首先通过跳跃表的 tail 指针访问表尾节点, 然后通过后退指针访问倒数第二个节点, 之后再沿着后退指针访问倒数第三个节点, 再之后遇到指向 NULL 的后退指针, 于是访问结束。
举个例子, 在图 5-7 所示的跳跃表中, 三个跳跃表节点都保存了相同的分值 10086.0 , 但保存成员对象 o1 的节点却排在保存成员对象 o2和 o3 的节点之前, 而保存成员对象 o2 的节点又排在保存成员对象 o3 的节点之前, 由此可见, o1 、 o2 、 o3 三个成员对象在字典中的排序为 o1 <= o2 <= o3 。
1 | typedef struct zskiplist { |
整数集合(intset)是集合键的底层实现之一: 当一个集合只包含整数值元素, 并且这个集合的元素数量不多时, Redis 就会使用整数集合作为集合键的底层实现。
1 | redis> SADD numbers 1 3 5 7 9 |
1 | typedef struct intset { |
contents 数组是整数集合的底层实现: 整数集合的每个元素都是 contents 数组的一个数组项(item), 各个项在数组中按值的大小从小到大有序地排列, 并且数组中不包含任何重复项。
length 属性记录了整数集合包含的元素数量, 也即是 contents 数组的长度。
虽然 intset 结构将 contents 属性声明为 int8_t 类型的数组, 但实际上 contents 数组并不保存任何 int8_t 类型的值 —— contents 数组的真正类型取决于 encoding 属性的值:
如果 encoding 属性的值为 INTSET_ENC_INT16 , 那么 contents 就是一个 int16_t 类型的数组, 数组里的每个项都是一个 int16_t 类型的整数值 (最小值为 -32,768 ,最大值为 32,767 )。
如果 encoding 属性的值为 INTSET_ENC_INT32 , 那么 contents 就是一个 int32_t 类型的数组, 数组里的每个项都是一个 int32_t 类型的整数值 (最小值为 -2,147,483,648 ,最大值为 2,147,483,647 )。
如果 encoding 属性的值为 INTSET_ENC_INT64 , 那么 contents 就是一个 int64_t 类型的数组, 数组里的每个项都是一个 int64_t 类型的整数值 (最小值为 -9,223,372,036,854,775,808 ,最大值为 9,223,372,036,854,775,807 )。
encoding 属性的值为 INTSET_ENC_INT16 , 表示整数集合的底层实现为 int16_t 类型的数组, 而集合保存的都是 int16_t 类型的整数值。
length 属性的值为 5 , 表示整数集合包含五个元素。
contents 数组按从小到大的顺序保存着集合中的五个元素。
因为每个集合元素都是 int16_t 类型的整数值, 所以 contents 数组的大小等于 sizeof(int16_t) * 5 = 16 * 5 = 80 位。
虽然 contents 数组保存的四个整数值中, 只有 -2675256175807981027 是真正需要用 int64_t 类型来保存的, 而其他的 1 、 3 、 5 三个值都可以用 int16_t 类型来保存, 不过根据整数集合的升级规则, 当向一个底层为 int16_t 数组的整数集合添加一个 int64_t 类型的整数值时, 整数集合已有的所有元素都会被转换成 int64_t 类型, 所以 contents 数组保存的四个整数值都是 int64_t 类型的, 不仅仅是-2675256175807981027 。
升级整数集合并添加新元素共分为三步进行:
举个例子, 假设现在有一个 INTSET_ENC_INT16 编码的整数集合, 集合中包含三个 int16_t 类型的元素, 如图 6-3 所示。
因为每个元素都占用 16 位空间, 所以整数集合底层数组的大小为 3 * 16 = 48 位, 图 6-4 展示了整数集合的三个元素在这 48 位里的位置。
现在, 假设我们要将类型为 int32_t 的整数值 65535 添加到整数集合里面, 因为 65535 的类型 int32_t 比整数集合当前所有元素的类型都要长, 所以在将 65535 添加到整数集合之前, 程序需要先对整数集合进行升级。
升级首先要做的是, 根据新类型的长度, 以及集合元素的数量(包括要添加的新元素在内), 对底层数组进行空间重分配。
整数集合目前有三个元素, 再加上新元素 65535 , 整数集合需要分配四个元素的空间, 因为每个 int32_t 整数值需要占用 32 位空间, 所以在空间重分配之后, 底层数组的大小将是 32 * 4 = 128 位, 如图 6-5 所示
虽然程序对底层数组进行了空间重分配, 但数组原有的三个元素 1 、 2 、 3 仍然是 int16_t 类型, 这些元素还保存在数组的前 48 位里面, 所以程序接下来要做的就是将这三个元素转换成 int32_t 类型, 并将转换后的元素放置到正确的位上面, 而且在放置元素的过程中, 需要维持底层数组的有序性质不变。
首先, 因为元素 3 在 1 、 2 、 3 、 65535 四个元素中排名第三, 所以它将被移动到 contents 数组的索引 2 位置上, 也即是数组 64 位至 95 位的空间内, 如图 6-6 所示。
接着, 因为元素 2 在 1 、 2 、 3 、 65535 四个元素中排名第二, 所以它将被移动到 contents 数组的索引 1 位置上, 也即是数组的 32位至 63 位的空间内, 如图 6-7 所示。
之后, 因为元素 1 在 1 、 2 、 3 、 65535 四个元素中排名第一, 所以它将被移动到 contents 数组的索引 0 位置上, 也即是数组的 0 位至 31 位的空间内, 如图 6-8 所示。
然后, 因为元素 65535 在 1 、 2 、 3 、 65535 四个元素中排名第四, 所以它将被添加到 contents 数组的索引 3 位置上, 也即是数组的96 位至 127 位的空间内, 如图 6-9 所示。
最后, 程序将整数集合 encoding 属性的值从 INTSET_ENC_INT16 改为 INTSET_ENC_INT32 , 并将 length 属性的值从 3 改为 4 , 设置完成之后的整数集合如图 6-10 所示。
因为每次向整数集合添加新元素都可能会引起升级, 而每次升级都需要对底层数组中已有的所有元素进行类型转换, 所以向整数集合添加新元素的时间复杂度为 O(N)
其他类型的升级操作, 比如从 INTSET_ENC_INT16 编码升级为 INTSET_ENC_INT64 编码, 或者从 INTSET_ENC_INT32 编码升级为 INTSET_ENC_INT64 编码, 升级的过程都和上面展示的升级过程类似。
升级之后新元素的摆放位置
因为引发升级的新元素的长度总是比整数集合现有所有元素的长度都大, 所以这个新元素的值要么就大于所有现有元素, 要么就小于所有现有元素:
在新元素小于所有现有元素的情况下, 新元素会被放置在底层数组的最开头(索引 0 );
在新元素大于所有现有元素的情况下, 新元素会被放置在底层数组的最末尾(索引 length-1 )。
当一个列表键只包含少量列表项, 并且每个列表项要么就是小整数值, 要么就是长度比较短的字符串, 那么 Redis 就会使用压缩列表来做列表键的底层实现。
1 | redis> RPUSH lst 1 3 5 10086 "hello" "world" |
另外, 当一个哈希键只包含少量键值对, 并且每个键值对的键和值要么就是小整数值, 要么就是长度比较短的字符串, 那么 Redis 就会使用压缩列表来做哈希键的底层实现。
举个例子, 执行以下命令将创建一个压缩列表实现的哈希键:
1 | redis> HMSET profile "name" "Jack" "age" 28 "job" "Programmer" |
每个压缩列表节点可以保存一个字节数组或者一个整数值, 其中, 字节数组可以是以下三种长度的其中一种:
而整数值则可以是以下六种长度的其中一种:
每个压缩列表节点都由 previous_entry_length 、 encoding 、 content 三个部分组成
节点的 previous_entry_length 属性以字节为单位, 记录了压缩列表中前一个节点的长度。
previous_entry_length 属性的长度可以是 1 字节或者 5 字节:
因为节点的 previous_entry_length 属性记录了前一个节点的长度, 所以程序可以通过指针运算, 根据当前节点的起始地址来计算出前一个节点的起始地址。
压缩列表的从表尾向表头遍历操作就是使用这一原理实现的: 只要我们拥有了一个指向某个节点起始地址的指针, 那么通过这个指针以及这个节点的 previous_entry_length 属性, 程序就可以一直向前一个节点回溯, 最终到达压缩列表的表头节点。
图 7-8 展示了一个从表尾节点向表头节点进行遍历的完整过程:
节点的 encoding 属性记录了节点的 content 属性所保存数据的类型以及长度:
编码的最高两位 00 表示节点保存的是一个字节数组;
编码的后六位 001011 记录了字节数组的长度 11 ;
content 属性保存着节点的值 “hello world” 。
编码 11000000 表示节点保存的是一个 int16_t 类型的整数值;
content 属性保存着节点的值 10086 。
1 | typedef struct redisObject { |
类型常量 | 对象的名称 |
---|---|
REDIS_STRING | 字符串对象 |
REDIS_LIST | 列表对象 |
REDIS_HASH | 哈希对象 |
REDIS_SET | 集合对象 |
REDIS_ZSET | 有序集合对象 |
1 | # 键为字符串对象,值为字符串对象 |
对象 | 对象 type 属性的值 | TYPE 命令的输出 |
---|---|---|
字符串对象 | REDIS_STRING | “string” |
列表对象 | REDIS_LIST | “list” |
哈希对象 | REDIS_HASH | “hash” |
集合对象 | REDIS_SET | “set” |
有序集合对象 | REDIS_ZSET | “zset” |
编码常量 | 编码所对应的底层数据结构 |
---|---|
REDIS_ENCODING_INT | long 类型的整数 |
REDIS_ENCODING_EMBSTR | embstr 编码的简单动态字符串 |
REDIS_ENCODING_RAW | 简单动态字符串 |
REDIS_ENCODING_HT | 字典 |
REDIS_ENCODING_LINKEDLIST | 双端链表 |
REDIS_ENCODING_ZIPLIST | 压缩列表 |
REDIS_ENCODING_INTSET | 整数集合 |
REDIS_ENCODING_SKIPLIST | 跳跃表和字典 |
类型 | 编码 | 对象 |
---|---|---|
REDIS_STRING | REDIS_ENCODING_INT | 使用整数值实现的字符串对象。 |
REDIS_STRING | REDIS_ENCODING_EMBSTR | 使用 embstr 编码的简单动态字符串实现的字符串对象。 |
REDIS_STRING | REDIS_ENCODING_RAW | 使用简单动态字符串实现的字符串对象。 |
REDIS_LIST | REDIS_ENCODING_ZIPLIST | 使用压缩列表实现的列表对象。 |
REDIS_LIST | REDIS_ENCODING_LINKEDLIST | 使用双端链表实现的列表对象。 |
REDIS_HASH | REDIS_ENCODING_ZIPLIST | 使用压缩列表实现的哈希对象。 |
REDIS_HASH | REDIS_ENCODING_HT | 使用字典实现的哈希对象。 |
REDIS_SET | REDIS_ENCODING_INTSET | 使用整数集合实现的集合对象。 |
REDIS_SET | REDIS_ENCODING_HT | 使用字典实现的集合对象。 |
REDIS_ZSET | REDIS_ENCODING_ZIPLIST | 使用压缩列表实现的有序集合对象。 |
REDIS_ZSET | REDIS_ENCODING_SKIPLIST | 使用跳跃表和字典实现的有序集合对象。 |
1 | redis> SET msg "hello wrold" |
对象所使用的底层数据结构 | 编码常量 | OBJECT ENCODING 命令输出 |
---|---|---|
整数 | REDIS_ENCODING_INT | “int” |
embstr 编码的简单动态字符串(SDS) | REDIS_ENCODING_EMBSTR | “embstr” |
简单动态字符串 | REDIS_ENCODING_RAW | “raw” |
字典 | REDIS_ENCODING_HT | “hashtable” |
双端链表 | REDIS_ENCODING_LINKEDLIST | “linkedlist” |
压缩列表 | REDIS_ENCODING_ZIPLIST | “ziplist” |
整数集合 | REDIS_ENCODING_INTSET | “intset” |
跳跃表和字典 | REDIS_ENCODING_SKIPLIST | “skiplist” |
https://www.kancloud.cn/kancloud/redisbook/63862
1 | typedef struct redisObject { |
函数 | 作用 |
---|---|
incrRefCount | 将对象的引用计数值增一。 |
decrRefCount | 将对象的引用计数值减一, 当对象的引用计数值等于 0 时, 释放对象。 |
resetRefCount | 将对象的引用计数值设置为 0 , 但并不释放对象, 这个函数通常在需要重新设置对象的引用计数值时使用。 |
Redis 只对包含整数值的字符串对象进行共享。
除了前面介绍过的 type 、 encoding 、 ptr 和 refcount 四个属性之外, redisObject 结构包含的最后一个属性为 lru 属性, 该属性记录了对象最后一次被命令程序访问的时间:
1 | typedef struct redisObject { |
1 | typedef struct redisDb { |
1 | redis> SET message "hello world" |
1 | redis> SET date "2013.12.1" |
删除键
删除数据库中的一个键, 实际上就是在键空间里面删除键所对应的键值对对象。
更新键
对一个数据库键进行更新, 实际上就是对键空间里面键所对应的值对象进行更新, 根据值对象的类型不同, 更新的具体方法也会有所不同。
对键取值
读写键空间时的维护操作
当使用 Redis 命令对数据库进行读写时, 服务器不仅会对键空间执行指定的读写操作, 还会执行一些额外的维护操作, 其中包括:
在读取一个键之后(读操作和写操作都要对键进行读取), 服务器会根据键是否存在, 以此来更新服务器的键空间命中(hit)次数或键空间不命中(miss)次数, 这两个值可以在 INFO stats 命令的 keyspace_hits 属性和 keyspace_misses 属性中查看。
在读取一个键之后, 服务器会更新键的 LRU (最后一次使用)时间, 这个值可以用于计算键的闲置时间, 使用命令 OBJECT idletime 命令可以查看键 key 的闲置时间。
如果服务器在读取一个键时, 发现该键已经过期, 那么服务器会先删除这个过期键, 然后才执行余下的其他操作, 本章稍后对过期键的讨论会详细说明这一点。
如果有客户端使用 WATCH 命令监视了某个键, 那么服务器在对被监视的键进行修改之后, 会将这个键标记为脏(dirty), 从而让事务程序注意到这个键已经被修改过, 《事务》一章会详细说明这一点。
服务器每次修改一个键之后, 都会对脏(dirty)键计数器的值增一, 这个计数器会触发服务器的持久化以及复制操作执行, 《RDB 持久化》、《AOF 持久化》和《复制》这三章都会说到这一点。
如果服务器开启了数据库通知功能, 那么在对键进行修改之后, 服务器将按配置发送相应的数据库通知, 本章稍后讨论数据库通知功能的实现时会详细说明这一点。
RDB 文件的最开头是 REDIS 部分, 这个部分的长度为 5 字节, 保存着 “REDIS” 五个字符。 通过这五个字符, 程序可以在载入文件时, 快速检查所载入的文件是否 RDB 文件。
db_version 长度为 4 字节, 它的值是一个字符串表示的整数, 这个整数记录了 RDB 文件的版本号, 比如 “0006” 就代表 RDB 文件的版本为第六版。
databases 部分包含着零个或任意多个数据库, 以及各个数据库中的键值对数据:
EOF 常量的长度为 1 字节, 这个常量标志着 RDB 文件正文内容的结束, 当读入程序遇到这个值的时候, 它知道所有数据库的所有键值对都已经载入完毕了
check_sum 是一个 8 字节长的无符号整数, 保存着一个校验和, 这个校验和是程序通过对 REDIS 、 db_version 、 databases 、 EOF 四个部分的内容进行计算得出的。 服务器在载入 RDB 文件时, 会将载入数据所计算出的校验和与 check_sum 所记录的校验和进行对比, 以此来检查 RDB 文件是否有出错或者损坏的情况出现。
每个非空数据库在 RDB 文件中都可以保存为 SELECTDB 、 db_number 、 key_value_pairs 三个部分
SELECTDB 常量的长度为 1 字节, 当读入程序遇到这个值的时候, 它知道接下来要读入的将是一个数据库号码。
db_number 保存着一个数据库号码, 根据号码的大小不同, 这个部分的长度可以是 1 字节、 2 字节或者 5 字节。 当程序读入 db_number 部分之后, 服务器会调用 SELECT 命令, 根据读入的数据库号码进行数据库切换, 使得之后读入的键值对可以载入到正确的数据库中。
key_value_pairs 部分保存了数据库中的所有键值对数据, 如果键值对带有过期时间, 那么过期时间也会和键值对保存在一起。 根据键值对的数量、类型、内容、以及是否有过期时间等条件的不同, key_value_pairs 部分的长度也会有所不同。
不带过期时间的键值对在 RDB 文件中对由 TYPE 、 key 、 value 三部分组成, 如图 IMAGE_KEY_WITHOUT_EXPIRE_TIME 所示
以上列出的每个 TYPE 常量都代表了一种对象类型或者底层编码, 当服务器读入 RDB 文件中的键值对数据时, 程序会根据 TYPE 的值来决定如何读入和解释 value 的数据。
key 和 value 分别保存了键值对的键对象和值对象:
https://www.kancloud.cn/kancloud/redisbook/63879
AOF 持久化功能的实现可以分为命令追加(append)、文件写入、文件同步(sync)三个步骤。
当 AOF 持久化功能处于打开状态时, 服务器在执行完一个写命令之后, 会以协议格式将被执行的写命令追加到服务器状态的 aof_buf 缓冲区的末尾:
1 | struct redisServer { |
因为服务器在处理文件事件时可能会执行写命令, 使得一些内容被追加到 aof_buf 缓冲区里面, 所以在服务器每次结束一个事件循环之前, 它都会调用 flushAppendOnlyFile 函数, 考虑是否需要将 aof_buf 缓冲区中的内容写入和保存到 AOF 文件里面, 这个过程可以用以下伪代码表示:
1 | def eventLoop(): |
appendfsync 选项的值 | flushAppendOnlyFile 函数的行为 |
---|---|
always | 将 aof_buf 缓冲区中的所有内容写入并同步到 AOF 文件。 |
everysec | 将 aof_buf 缓冲区中的所有内容写入到 AOF 文件, 如果上次同步 AOF 文件的时间距离现在超过一秒钟, 那么再次对 AOF 文件进行同步, 并且这个同步操作是由一个线程专门负责执行的。 |
no | 将 aof_buf 缓冲区中的所有内容写入到 AOF 文件, 但并不对 AOF 文件进行同步, 何时同步由操作系统来决定。 |
如果用户没有主动为 appendfsync 选项设置值, 那么 appendfsync 选项的默认值为 everysec , 关于 appendfsync 选项的更多信息, 请参考 Redis 项目附带的示例配置文件 redis.conf 。
为了提高文件的写入效率, 在现代操作系统中, 当用户调用 write 函数, 将一些数据写入到文件的时候, 操作系统通常会将写入数据暂时保存在一个内存缓冲区里面, 等到缓冲区的空间被填满、或者超过了指定的时限之后, 才真正地将缓冲区中的数据写入到磁盘里面。
这种做法虽然提高了效率, 但也为写入数据带来了安全问题, 因为如果计算机发生停机, 那么保存在内存缓冲区里面的写入数据将会丢失。为此, 系统提供了 fsync 和 fdatasync 两个同步函数, 它们可以强制让操作系统立即将缓冲区中的数据写入到硬盘里面, 从而确保写入数据的安全性。
如果这时 flushAppendOnlyFile 函数被调用, 假设服务器当前 appendfsync 选项的值为 everysec , 并且根据 server.aof_last_fsync 属性显示, 距离上次同步 AOF 文件已经超过一秒钟, 那么服务器会先将 aof_buf 中的内容写入到 AOF 文件中, 然后再对 AOF 文件进行同步。
当 appendfsync 的值为 always 时, 服务器在每个事件循环都要将 aof_buf 缓冲区中的所有内容写入到 AOF 文件, 并且同步 AOF 文件, 所以 always 的效率是 appendfsync 选项三个值当中最慢的一个, 但从安全性来说, always 也是最安全的, 因为即使出现故障停机, AOF 持久化也只会丢失一个事件循环中所产生的命令数据。
当 appendfsync 的值为 everysec 时, 服务器在每个事件循环都要将 aof_buf 缓冲区中的所有内容写入到 AOF 文件, 并且每隔超过一秒就要在子线程中对 AOF 文件进行一次同步: 从效率上来讲, everysec 模式足够快, 并且就算出现故障停机, 数据库也只丢失一秒钟的命令数据。
Redis 基于 Reactor 模式开发了自己的网络事件处理器: 这个处理器被称为文件事件处理器(file event handler):
虽然文件事件处理器以单线程方式运行, 但通过使用 I/O 多路复用程序来监听多个套接字, 文件事件处理器既实现了高性能的网络通信模型, 又可以很好地与 Redis 服务器中其他同样以单线程方式运行的模块进行对接, 这保持了 Redis 内部单线程设计的简单性。
文件事件是对套接字操作的抽象, 每当一个套接字准备好执行连接应答(accept)、写入、读取、关闭等操作时, 就会产生一个文件事件。 因为一个服务器通常会连接多个套接字, 所以多个文件事件有可能会并发地出现。
I/O 多路复用程序负责监听多个套接字, 并向文件事件分派器传送那些产生了事件的套接字。
尽管多个文件事件可能会并发地出现, 但 I/O 多路复用程序总是会将所有产生事件的套接字都入队到一个队列里面, 然后通过这个队列, 以有序(sequentially)、同步(synchronously)、每次一个套接字的方式向文件事件分派器传送套接字: 当上一个套接字产生的事件被处理完毕之后(该套接字为事件所关联的事件处理器执行完毕), I/O 多路复用程序才会继续向文件事件分派器传送下一个套接字, 如图 IMAGE_DISPATCH_EVENT_VIA_QUEUE 。
文件事件分派器接收 I/O 多路复用程序传来的套接字, 并根据套接字产生的事件的类型, 调用相应的事件处理器。
服务器会为执行不同任务的套接字关联不同的事件处理器, 这些处理器是一个个函数, 它们定义了某个事件发生时, 服务器应该执行的动作。
Redis 在 I/O 多路复用程序的实现源码中用 #include 宏定义了相应的规则, 程序会在编译时自动选择系统中性能最高的 I/O 多路复用函数库来作为 Redis 的 I/O 多路复用程序的底层实现:
https://www.kancloud.cn/kancloud/redisbook/63885
假设一个 Redis 服务器正在运作, 那么这个服务器的监听套接字的 AE_READABLE 事件应该正处于监听状态之下, 而该事件所对应的处理器为连接应答处理器。
如果这时有一个 Redis 客户端向服务器发起连接, 那么监听套接字将产生 AE_READABLE 事件, 触发连接应答处理器执行: 处理器会对客户端的连接请求进行应答, 然后创建客户端套接字, 以及客户端状态, 并将客户端套接字的 AE_READABLE 事件与命令请求处理器进行关联, 使得客户端可以向主服务器发送命令请求。
之后, 假设客户端向主服务器发送一个命令请求, 那么客户端套接字将产生 AE_READABLE 事件, 引发命令请求处理器执行, 处理器读取客户端的命令内容, 然后传给相关程序去执行。
执行命令将产生相应的命令回复, 为了将这些命令回复传送回客户端, 服务器会将客户端套接字的 AE_WRITABLE 事件与命令回复处理器进行关联: 当客户端尝试读取命令回复的时候, 客户端套接字将产生 AE_WRITABLE 事件, 触发命令回复处理器执行, 当命令回复处理器将命令回复全部写入到套接字之后, 服务器就会解除客户端套接字的 AE_WRITABLE 事件与命令回复处理器之间的关联。
图 IMAGE_COMMAND_PROGRESS 总结了上面描述的整个通讯过程, 以及通讯时用到的事件处理器。
1 | typedef struct redisClient { |
根据客户端类型的不同, fd 属性的值可以是 -1 或者是大于 -1 的整数:
伪客户端(fake client)的 fd 属性的值为 -1 : 伪客户端处理的命令请求来源于 AOF 文件或者 Lua 脚本, 而不是网络, 所以这种客户端不需要套接字连接, 自然也不需要记录套接字描述符。 目前 Redis 服务器会在两个地方用到伪客户端, 一个用于载入 AOF 文件并还原数据库状态, 而另一个则用于执行 Lua 脚本中包含的 Redis 命令。
普通客户端的 fd 属性的值为大于 -1 的整数: 普通客户端使用套接字来与服务器进行通讯, 所以服务器会用 fd 属性来记录客户端套接字的描述符。 因为合法的套接字描述符不能是 -1 , 所以普通客户端的套接字描述符的值必然是大于 -1 的整数。
执行 CLIENT_LIST 命令可以列出目前所有连接到服务器的普通客户端, 命令输出中的 fd 域显示了服务器连接客户端所使用的套接字描述符:
1 | redis> CLIENT list |
使用 CLIENT_SETNAME 命令可以为客户端设置一个名字, 让客户端的身份变得更清晰。
1 | typedef struct redisClient { |
1 | typedef struct redisClient { |
可以是多个标志的二进制或, 比如:
flags = <flag1> | <flag2> | ...
1 | # 客户端是一个主服务器 |
1 | typedef struct redisClient { |
输入缓冲区的大小会根据输入内容动态地缩小或者扩大, 但它的最大大小不能超过 1 GB , 否则服务器将关闭这个客户端。
1 | typedef struct redisClient { |
1 | typedef struct redisClient { |
如果 authenticated 的值为 0 , 那么表示客户端未通过身份验证; 如果 authenticated 的值为 1 , 那么表示客户端已经通过了身份验证。
1 | # authenticated 属性的值从 0 变为 1 |
1 | typedef struct redisClient { |
那么从客户端发送 SET KEY VALUE 命令到获得回复 OK 期间, 客户端和服务器共需要执行以下操作:
当客户端与服务器之间的连接套接字因为客户端的写入而变得可读时, 服务器将调用命令请求处理器来执行以下操作:
https://www.kancloud.cn/kancloud/redisbook/63892
Redis 的复制功能分为同步(sync)和命令传播(command propagate)两个操作:
同步操作用于将从服务器的数据库状态更新至主服务器当前所处的数据库状态。
而命令传播操作则用于在主服务器的数据库状态被修改, 导致主从服务器的数据库状态出现不一致时, 让主从服务器的数据库重新回到一致状态。
同步
当客户端向从服务器发送 SLAVEOF 命令, 要求从服务器复制主服务器时, 从服务器首先需要执行同步操作, 也即是, 将从服务器的数据库状态更新至主服务器当前所处的数据库状态。
从服务器对主服务器的同步操作需要通过向主服务器发送 SYNC 命令来完成, 以下是 SYNC 命令的执行步骤:
时间 | 主服务器 | 从服务器 |
---|---|---|
T0 | 服务器启动。 | 服务器启动。 |
T1 | 执行 SET k1 v1 。 | |
T2 | 执行 SET k2 v2 。 | |
T3 | 执行 SET k3 v3 。 | |
T4 | 向主服务器发送 SYNC 命令。 | |
T5 | 接收到从服务器发来的 SYNC 命令, 执行 BGSAVE 命令, 创建包含键 k1 、 k2 、 k3 的 RDB 文件, 并使用缓冲区记录接下来执行的所有写命令。 | |
T6 | 执行 SET k4 v4 , 并将这个命令记录到缓冲区里面。 | |
T7 | 执行 SET k5 v5 , 并将这个命令记录到缓冲区里面。 | |
T8 | BGSAVE 命令执行完毕, 向从服务器发送 RDB 文件。 | |
T9 | 接收并载入主服务器发来的 RDB 文件 , 获得 k1 、 k2 、 k3 三个键。 | |
T10 | 向从服务器发送缓冲区中保存的写命令 SET k4 v4 和 SET k5v5 。 | |
T11 | 接收并执行主服务器发来的两个 SET 命令, 得到 k4 和 k5 两个键。 | |
T12 | 同步完成, 现在主从服务器两者的数据库都包含了键 k1 、k2 、 k3 、 k4 和 k5 。 | 同步完成, 现在主从服务器两者的数据库都包含了键 k1 、 k2 、k3 、 k4 和 k5 。 |
举个例子, 假设一个主服务器和一个从服务器刚刚完成同步操作, 它们的数据库都保存了相同的五个键 k1 至 k5 , 如图 IMAGE_CONSISTENT 所示。
如果这时, 客户端向主服务器发送命令 DEL k3 , 那么主服务器在执行完这个 DEL 命令之后, 主从服务器的数据库将出现不一致: 主服务器的数据库已经不再包含键 k3 , 但这个键却仍然包含在从服务器的数据库里面, 如图 IMAGE_INCONSISTENT 所示。
为了让主从服务器再次回到一致状态, 主服务器需要对从服务器执行命令传播操作: 主服务器会将自己执行的写命令 —— 也即是造成主从服务器不一致的那条写命令 —— 发送给从服务器执行, 当从服务器执行了相同的写命令之后, 主从服务器将再次回到一致状态。
在上面的例子中, 主服务器因为执行了命令 DEL k3 而导致主从服务器不一致, 所以主服务器将向从服务器发送相同的命令 DEL k3 : 当从服务器执行完这个命令之后, 主从服务器将再次回到一致状态 —— 现在主从服务器两者的数据库都不再包含键 k3 了, 如图 IMAGE_PROPAGATE_DEL_k3 所示。
$ redis-sentinel /path/to/your/sentinel.conf
$ redis-server /path/to/your/sentinel.conf --sentinel
当一个 Sentinel 启动时, 它需要执行以下步骤:
功能 | 使用情况 |
---|---|
数据库和键值对方面的命令, 比如 SET 、 DEL 、FLUSHDB 。 | 不使用。 |
事务命令, 比如 MULTI 和 WATCH 。 | 不使用。 |
脚本命令,比如 EVAL 。 | 不使用。 |
RDB 持久化命令, 比如 SAVE 和 BGSAVE 。 | 不使用。 |
AOF 持久化命令, 比如 BGREWRITEAOF 。 | 不使用。 |
复制命令,比如 SLAVEOF 。 | Sentinel 内部可以使用,但客户端不可以使用。 |
发布与订阅命令, 比如 PUBLISH 和 SUBSCRIBE 。 | SUBSCRIBE 、 PSUBSCRIBE 、 UNSUBSCRIBE PUNSUBSCRIBE 四个命令在 Sentinel 内部和客户端都可以使用, 但 PUBLISH 命令只能在 Sentinel 内部使用。 |
文件事件处理器(负责发送命令请求、处理命令回复)。 | Sentinel 内部使用, 但关联的文件事件处理器和普通 Redis 服务器不同。 |
时间事件处理器(负责执行 serverCron 函数)。 | Sentinel 内部使用, 时间事件的处理器仍然是 serverCron 函数, serverCron函数会调用 sentinel.c/sentinelTimer 函数, 后者包含了 Sentinel 要执行的所有操作。 |
比如说, 普通 Redis 服务器使用 redis.h/REDIS_SERVERPORT 常量的值作为服务器端口:#define REDIS_SERVERPORT 6379
而 Sentinel 则使用 sentinel.c/REDIS_SENTINEL_PORT 常量的值作为服务器端口:#define REDIS_SENTINEL_PORT 26379
除此之外, 普通 Redis 服务器使用 redis.c/redisCommandTable 作为服务器的命令表:
1 | struct redisCommand redisCommandTable[] = { |
1 | struct sentinelState { |
1 | typedef struct sentinelRedisInstance { |
sentinelRedisInstance.addr 属性是一个指向 sentinel.c/sentinelAddr 结构的指针, 这个结构保存着实例的 IP 地址和端口号:
1 | typedef struct sentinelAddr { |
对 Sentinel 状态的初始化将引发对 masters 字典的初始化, 而 masters 字典的初始化是根据被载入的 Sentinel 配置文件来进行的。
对于每个被 Sentinel 监视的主服务器来说, Sentinel 会创建两个连向主服务器的异步网络连接:
一个是命令连接, 这个连接专门用于向主服务器发送命令, 并接收命令回复。
另一个是订阅连接, 这个连接专门用于订阅主服务器的 sentinel:hello 频道。
为什么有两个连接?
在 Redis 目前的发布与订阅功能中, 被发送的信息都不会保存在 Redis 服务器里面, 如果在信息发送时, 想要接收信息的客户端不在线或者断线, 那么这个客户端就会丢失这条信息。
因此, 为了不丢失 sentinel:hello 频道的任何信息, Sentinel 必须专门用一个订阅连接来接收该频道的信息。
而另一方面, 除了订阅频道之外, Sentinel 还又必须向主服务器发送命令, 以此来与主服务器进行通讯, 所以 Sentinel 还必须向主服务器创建命令连接。
并且因为 Sentinel 需要与多个实例创建多个网络连接, 所以 Sentinel 使用的是异步连接。
图 IMAGE_SENTINEL_CONNECT_SERVER 展示了一个 Sentinel 向被它监视的两个主服务器 master1 和 master2 创建命令连接和订阅连接的例子。
Sentinel 系统选举领头 Sentinel 的方法是对 Raft 算法的领头选举方法的实现
一个 Redis 集群通常由多个节点(node)组成, 在刚开始的时候, 每个节点都是相互独立的, 它们都处于一个只包含自己的集群当中, 要组建一个真正可工作的集群, 我们必须将各个独立的节点连接起来, 构成一个包含多个节点的集群。
连接各个节点的工作可以使用 CLUSTER MEET 命令来完成, 该命令的格式如下:
CLUSTER MEET <ip> <port>
向一个节点 node 发送 CLUSTER MEET 命令, 可以让 node 节点与 ip 和 port 所指定的节点进行握手(handshake), 当握手成功时, node节点就会将 ip 和 port 所指定的节点添加到 node 节点当前所在的集群中。
举个例子, 假设现在有三个独立的节点 127.0.0.1:7000 、 127.0.0.1:7001 、 127.0.0.1:7002 (下文省略 IP 地址,直接使用端口号来区分各个节点), 我们首先使用客户端连上节点 7000 , 通过发送 CLUSTER NODE 命令可以看到, 集群目前只包含 7000 自己一个节点:
1 | $ redis-cli -c -p 7000 |
通过向节点 7000 发送以下命令, 我们可以将节点 7001 添加到节点 7000 所在的集群里面:
1 | 127.0.0.1:7000> CLUSTER MEET 127.0.0.1 7001 |
继续向节点 7000 发送以下命令, 我们可以将节点 7002 也添加到节点 7000 和节点 7001 所在的集群里面:
1 | 127.0.0.1:7000> CLUSTER MEET 127.0.0.1 7002 |
现在, 这个集群里面包含了 7000 、 7001 和 7002 三个节点, 图 IMAGE_CONNECT_NODES_1 至 IMAGE_CONNECT_NODES_5 展示了这三个节点进行握手的整个过程。
节点(运行在集群模式下的 Redis 服务器)会继续使用所有在单机模式中使用的服务器组件, 比如说:
节点会继续使用文件事件处理器来处理命令请求和返回命令回复。
节点会继续使用时间事件处理器来执行 serverCron 函数, 而 serverCron 函数又会调用集群模式特有的 clusterCron 函数: clusterCron函数负责执行在集群模式下需要执行的常规操作, 比如向集群中的其他节点发送 Gossip 消息, 检查节点是否断线; 又或者检查是否需要对下线节点进行自动故障转移, 等等。
节点会继续使用数据库来保存键值对数据,键值对依然会是各种不同类型的对象。
节点会继续使用 RDB 持久化模块和 AOF 持久化模块来执行持久化工作。
节点会继续使用发布与订阅模块来执行 PUBLISH 、 SUBSCRIBE 等命令。
节点会继续使用复制模块来进行节点的复制工作。
节点会继续使用 Lua 脚本环境来执行客户端输入的 Lua 脚本。
集群数据结构
clusterNode 结构保存了一个节点的当前状态, 比如节点的创建时间, 节点的名字, 节点当前的配置纪元, 节点的 IP 和地址, 等等。
每个节点都会使用一个 clusterNode 结构来记录自己的状态, 并为集群中的所有其他节点(包括主节点和从节点)都创建一个相应的clusterNode 结构, 以此来记录其他节点的状态:
1 | struct clusterNode { |
clusterNode 结构的 link 属性是一个 clusterLink 结构, 该结构保存了连接节点所需的有关信息, 比如套接字描述符, 输入缓冲区和输出缓冲区:
1 | typedef struct clusterLink { |
redisClient 结构和 clusterLink 结构的相同和不同之处
redisClient 结构和 clusterLink 结构都有自己的套接字描述符和输入、输出缓冲区, 这两个结构的区别在于, redisClient 结构中的套接字和缓冲区是用于连接客户端的, 而 clusterLink 结构中的套接字和缓冲区则是用于连接节点的。
最后, 每个节点都保存着一个 clusterState 结构, 这个结构记录了在当前节点的视角下, 集群目前所处的状态 —— 比如集群是在线还是下线, 集群包含多少个节点, 集群当前的配置纪元, 诸如此类:
1 | typedef struct clusterState { |
以前面介绍的 7000 、 7001 、 7002 三个节点为例, 图 IMAGE_CLUSTER_STATE_OF_7000 展示了节点 7000 创建的 clusterState 结构, 这个结构从节点 7000 的角度记录了集群、以及集群包含的三个节点的当前状态 (为了空间考虑,图中省略了 clusterNode 结构的一部分属性):
节点 7001 和节点 7002 也会创建类似的 clusterState 结构:
不过在节点 7001 创建的 clusterState 结构中, myself 指针将指向代表节点 7001 的 clusterNode 结构, 而节点 7000 和节点 7002 则是集群中的其他节点。
而在节点 7002 创建的 clusterState 结构中, myself 指针将指向代表节点 7002 的 clusterNode 结构, 而节点 7000 和节点 7001 则是集群中的其他节点。
CLUSTER MEET 命令的实现
通过向节点 A 发送 CLUSTER MEET 命令, 客户端可以让接收命令的节点 A 将另一个节点 B 添加到节点 A 当前所在的集群里面:
CLUSTER MEET <ip> <port>
收到命令的节点 A 将与节点 B 进行握手(handshake), 以此来确认彼此的存在, 并为将来的进一步通信打好基础:
之后, 节点 A 会将节点 B 的信息通过 Gossip 协议传播给集群中的其他节点, 让其他节点也与节点 B 进行握手, 最终, 经过一段时间之后, 节点 B 会被集群中的所有节点认识。
假设有一个管道,进程A为管道的写入方,B为管道的读出方。
假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。
但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。
假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”
也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。
阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create)
1 | while true { |
如果所有的流都没有数据,那么只会白白浪费CPU
1 | while true { |
为了避免CPU空转,可以引进了一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流。
使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,每一次无差别轮询时间就越长。
epoll会把哪个流发生了怎样的I/O事件通知我们。(复杂度降低到了O(1))
1 | while true { |
在内核的最底层是中断,类似系统回调的机制。网卡设备对应一个中断号, 当网卡收到网络端的消息的时候会向CPU发起中断请求, 然后CPU处理该请求. 通过驱动程序 进而操作系统得到通知, 系统然后通知epoll, epoll通知用户代码。epoll在被内核初始化时(操作系统启动),同时会开辟出epoll自己的内核高速cache区,用于安置每一个我们想监控的socket,这些socket会以红黑树的形式保存在内核cache里,以支持快速的查找、插入、删除。这个内核高速cache区,就是建立连续的物理内存页,然后在之上建立slab层,简单的说,就是物理上分配好你想要的size的内存对象,每次使用时都是使用空闲的已分配好的对象。
进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select;这样select/poll可以帮我们侦测许多fd是否就绪;但是select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限。linux还提供了一个epoll系统调用,epoll是基于事件驱动方式,而不是顺序扫描,当有fd就绪时,立即回调函数rollback
传统的BIO里面socket.read(),如果TCP RecvBuffer里没有数据,函数会一直阻塞,直到收到数据,返回读到的数据。
对于NIO,如果TCP RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回0,永远不会阻塞。
最新的AIO(Async I/O)里面会更进一步:不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。
换句话说,BIO里用户最关心“我要读”,NIO里用户最关心”我可以读了”,在AIO模型里用户更需要关注的是“读完了”。
NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。
NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,记录的方式通常是在Selector上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。
下面具体看下如何利用事件模型单线程处理所有I/O请求:
NIO的主要事件有几个:读就绪、写就绪、有新连接到来。
我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。
其次,用一个死循环选择就绪的事件,会执行系统调用(Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP),还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。
注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以你可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。
Java NIO 由以下几个核心部分组成:
当我们需要与 NIO Channel 进行交互时, 我们就需要使用到 NIO Buffer, 即数据从 Buffer读取到 Channel 中, 并且从 Channel 中写入到 Buffer 中.
实际上, 一个 Buffer 其实就是一块内存区域, 我们可以在这个内存区域中进行数据的读写. NIO Buffer 其实是这样的内存块的一个封装, 并提供了一些操作方法让我们能够方便地进行数据的读写.
Buffer 类型有:
1 | public static ByteBuffer allocate(int capacity) { |
HeapByteBuffer 通过初始化字节数组hd,在虚拟机堆上申请内存空间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 HeapByteBuffer(int cap, int lim) { // package-private
super(-1, 0, lim, cap, new byte[cap], 0);
/*
hb = new byte[cap];
offset = 0;
*/
}
ByteBuffer(int mark, int pos, int lim, int cap, // package-private
byte[] hb, int offset)
{
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
final byte[] hb;
1 | public static ByteBuffer allocateDirect(int capacity) { |
DirectByteBuffer 通过unsafe.allocateMemory在物理内存中申请地址空间(非jvm堆内存),并在ByteBuffer的address变量中维护指向该内存的地址。
unsafe.setMemory(base, size, (byte) 0)方法把新申请的内存数据清零。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
//-Dsun.nio.PageAlignDirectMemory=true 判断是否开启按页分配对齐
boolean pa = VM.isDirectMemoryPageAligned();
//默认4k
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try {
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// 如果是按页分配对齐的,对齐到地址的页首,为什么要使用页首呢?
//CPU不会一次读取或写入一个字节。相反,CPU一次访问2、4、8、16或32字节块中的内存。这样做的原因是性能 —在4字节或16字节边界上访问地址要比在1字节边界上访问地址快得多。
//如果数据没有对齐为4字节的边界,CPU必须执行额外的工作来访问数据:加载2个数据块,转移不需要的字节,然后将它们组合在一起。这个过程肯定会降低性能,浪费CPU周期,只是为了从内存中获得正确的数据。
//http://www.songho.ca/misc/alignment/dataalign.html
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
使用Buffer读写数据一般遵循以下四个步骤:
当我们将数据写入到 Buffer 中时, Buffer 会记录我们已经写了多少的数据, 当我们需要从 Buffer 中读取数据时, 必须调用 Buffer.flip()将 Buffer 切换为读模式.
一旦读取了所有的 Buffer 数据, 那么我们必须清理 Buffer, 让其从新可写, 清理 Buffer 可以调用 Buffer.clear() 或 Buffer.compact().
例如:
1 | IntBuffer intBuffer = IntBuffer.allocate(2); |
mark():把当前的position赋值给mark
1 | public final Buffer mark() { |
reset():把mark值还原给position
1 | public final Buffer reset() { |
clear():一旦读完Buffer中的数据,需要让Buffer准备好再次被写入,clear会恢复状态值,但不会擦除数据。
1 | public final Buffer clear() { |
flip():Buffer有两种模式,写模式和读模式,flip后Buffer从写模式变成读模式。
1 | public final Buffer flip() { |
rewind():重置position为0,从头读写数据。
1 | public final Buffer rewind() { |
目前已知Channel的实现类有:
从Channel写到Buffer的例子
int bytesRead = inChannel.read(buf); //read into buffer.
从Buffer读取数据到Channel的例子:
int bytesWritten = inChannel.write(buf);
FileChannel的read、write和map通过其实现类FileChannelImpl实现。
1 | File file = new RandomAccessFile("data.txt", "rw"); |
1 | public int read(ByteBuffer dst) throws IOException { |
IOUtil
1 | static int read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException { |
通过上述实现可以看出,基于channel的文件数据读取步骤如下:
1、申请一块和缓存同大小的DirectByteBuffer bb。
2、读取数据到缓存bb,底层由NativeDispatcher的read实现。
3、把bb的数据读取到dst(用户定义的缓存,在jvm中分配内存)。
read方法导致数据复制了两次。
1 | public int write(ByteBuffer src) throws IOException { |
1 | static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd) throws IOException { |
通过上述实现可以看出,基于channel的文件数据写入步骤如下:
1、申请一块DirectByteBuffer,bb大小为byteBuffer中的limit - position。
2、复制byteBuffer中的数据到bb中。
3、把数据从bb中写入到文件,底层由NativeDispatcher的write实现,具体如下:
1 | private static int writeFromNativeBuffer(FileDescriptor fd, |
transferFrom()
FileChannel的transferFrom()方法可以将数据从源通道传输到FileChannel中。在SoketChannel的实现中,SocketChannel只会传输此刻准备好的数据(可能不足count字节)。因此,SocketChannel可能不会将请求的所有数据(count个字节)全部传输到FileChannel中。
toChannel.transferFrom(0, fromChannel.size(), fromChannel);
transferTo()
transferTo()方法将数据从FileChannel传输到其他的channel中
fromChannel.transferTo(position, count, toChannel);
Scattering Reads是指数据从一个channel读取到多个buffer中
1 | ByteBuffer header = ByteBuffer.allocate(128); |
read()方法按照buffer在数组中的顺序将从channel中读取的数据写入到buffer,当一个buffer被写满后,channel紧接着向另一个buffer中写
Gathering Writes是指数据从多个buffer写入到同一个channel
write()方法会按照buffer在数组中的顺序,将数据写入到channel,注意只有position和limit之间的数据才会被写入
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。
Selector的创建Selector selector = Selector.open();
向Selector注册通道
1 | channel.configureBlocking(false); |
可以用“位或”操作符将常量连接
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
1 | int interestSet = selectionKey.interestOps(); |
ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。可以这样访问ready集合:
1 | int readySet = selectionKey.readyOps(); |
1 | Set selectedKeys = selector.selectedKeys(); |
注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。
1 | selector = Selector.open();//创建多路复用器 |
非阻塞的IO管道(Non-blocking IO Pipelines)可以看做是整个非阻塞IO处理过程的链条。包括在以非阻塞形式进行的读与写操作
我们的组件(Component)通过Selector检查当前Channel是否有数据需要写入。此时component读入数据,并且根据输入的数据input对外提供数据输出output。这个对外的数据输出output被写到了另一个Channel中。
一个非阻塞的IO管道不必同时需要读和写数据,通常来说有些管道只需要读数据,而另一些管道则只需写数据。当然一个非阻塞的IO管道他也可以同时从多个Channel中读取数据,例如同时冲多个SocketChannel中读取数据;
非阻塞和阻塞通道比较(Non-blocking vs. Blocking IO Pipelines)
非阻塞IO管道和阻塞IO管道之间最大的区别是他们各自如何从Channel(套接字socket或文件file)读写数据。
IO管道通常直接从流中读取数据,然后把数据分割为连续的消息。这个处理与我们读取流信息,用tokenizer进行解析非常相似。不同的是我们在这里会把数据流分割为更大一些的消息块。我把这个过程叫做Message Reader.下面是一张说明的插图:
一个阻塞IO管道的使用可以和输入流一样调用,每次从Channel中读取一个字节的数据,阻塞自身直到有数据可读。这个流程就是一个阻塞的Messsage Reader实现。
使用阻塞IO大大简化了Message Reader的实现成本。阻塞的Message Reader无需关注没有数据返回的情形,无需关注返回部分数据或者数据解析需要被复用的问题。
相似的,一个阻塞的Message Writer也不需要关注写入部分数据,和数据复用的问题。
基础的非阻塞通道设计(Basic Non-blocking IO Pipeline Design)
一个非阻塞的IO通道可以用单线程读取多个数据流。这个前提是相关的流可以切换为非阻塞模式(并不是所有流都可以以非阻塞形式操作)。在非阻塞模式下,读取一个流可能返回0个或多个字节。如果流还没有可供读取的数据那么就会返回0,其他大于1的返回都表明这是实际读取到的数据;
为了避开没有数据可读的流,我们结合Java NIO中的Selector。一个Selector可以注册多个SelectableChannel实例。当我们调用select()或selectorNow()方法时Selector会返回一个有数据可读的SelectableChannel实例。这个设计可以如下插图:
读取部分信息(Reading Partial Messages)
当我们冲SelectableChannel中读取一段数据后,我们并不知道这段数据是否是完整的一个message。因为一个数据段可能包含部分message,也就是说即可能少于一个message,也可能多一个message,正如下面这张插图所示意的那样:
要处理这种截断的message,我们会遇到两个问题:
检测完整message要求Message Reader查看数据段中的数据是否至少包含一个完整的message。如果包含一个或多个完整message,这些message可以被下发到通道中处理。查找完整message的过程是个大量重复的操作,所以这个操作必须是越快越好的。
当数据段中有一个不完整的message时,无论不完整消息是整个数据段还是说在完整message前后,这个不完整的message数据都需要在剩余部分获得前存储起来。
检查message完整性和存储不完整message都是Message Reader的职责。为了避免混淆来自不同Channel的数据,我们为每一个Channel分配一个Message Reader。整个设计大概是这样的:
当我们通过Selector获取到一个有数据可以读取的Channel之后,改Channel关联的Message Reader会读取数据,并且把数据打断为Message块。得到完整的message后就可以通过通道下发到其他组件进行处理。
一个Message Reader自然是协议相关的。他需要知道message的格式以便读取。如果我们的服务器是跨协议复用的,那他必须实现Message Reader的协议-大致类似于接收一个Message Reader工厂作为配置参数。
存储不完整的Message(Storing Partial Messages)
现在我们已经明确了由Message Reader负责不完整消息的存储直到接收到完整的消息。闲杂我们还需要知道这个存储过程需要如何来实现。
在设计的时候我们需要考虑两个关键因素:
为每个Message Reade分配Buffer(A Buffer Per Message Reader)
显然不完整的消息数据需要存储在某种buffer中。比较直接的办法是我们为每个Message Reader都分配一个内部的buffer成员。但是,多大的buffer才合适呢?这个buffer必须能存储下一个message最大的大小。如果一个message最大是1MB,那每个Message Reader内部的buffer就至少有1MB大小。
在百万级别的并发链接数下,1MB的buffer基本没法正常工作。举例来说,1,000,000 x 1MB就是1TB的内存大小!如果消息的最大数据量是16MB又需要多少内存呢?128MB呢?
可伸缩Buffer(Resizable Buffers)
另一个方案是在每个Message Reader内部维护一个容量可变的buffer。一个可变的buffer在初始化时占用较少控件,在消息变得很大超出容量时自动扩容。这样每个链接就不需要都占用比如1MB的空间。每个链接只使用承载下一个消息所必须的内存大小。
拷贝扩容(Resize by Copy)
第一种实现可伸缩buffer的办法是初始化buffer的时候只申请较少的空间,比如4KB。如果消息超出了4KB的大小那么开赔一个更大的空间,比如8KB,然后把4KB中的数据拷贝纸8KB的内存块中。
以拷贝方式扩容的优点是一个消息的全部数据都被保存在了一个连续的字节数组中。这使得数据解析变得更加容易。
同时它的缺点是会增加大量的数据拷贝操作。
为了减少数据的拷贝操作,你可以分析整个消息流中的消息大小,一次来找到最适合当前机器的可以减少拷贝操作的buffer大小。例如,你可能会注意到觉大多数的消息都是小于4KB的,因为他们仅仅包含了一个非常请求和响应。这意味着消息的处所荣校应该设置为4KB。
同时,你可能会发现如果一个消息大于4KB,很可能是因为他包含了一个文件。你会可能注意到 大多数通过系统的数据都是小于128KB的。所以我们可以在第一次扩容设置为128KB。
最后你可能会发现当一个消息大于128KB后,没有什么规律可循来确定下次分配的空间大小,这意味着最后的buffer容量应该设置为消息最大的可能数据量。
结合这三次扩容时的大小设置,可以一定程度上减少数据拷贝。4KB以下的数据无需拷贝。在1百万的连接下需要的空间例如1,000,000x4KB=4GB,目前(2015)大多数服务器都扛得住。4KB到128KB会仅需拷贝一次,即拷贝4KB数据到128KB的里面。消息大小介于128KB和最大容量的时需要拷贝两次。首先4KB数据被拷贝第二次是拷贝128KB的数据,所以总共需要拷贝132KB数据。假设没有很多的消息会超过128KB,那么这个方案还是可以接受的。
当一个消息被完整的处理完毕后,它占用的内容应当即刻被释放。这样下一个来自东一个链接通道的消息可以从最小的buffer大小重新开始。这个操作是必须的如果我们需要尽可能高效地复用不同链接之间的内存。大多数情况下并不是所有的链接都会在同一时刻需要大容量的buffer。
笔者写了一个完整的教程阐述了如何实现一个内存buffer使其支持扩容:Resizable Arrays 。这个教程也附带了一个指向GitHub上的源码仓地址,里面有实现方案的具体代码。
追加扩容(Resize by Append)
另一种实现buffer扩容的方案是让buffer包含几个数组。当需要扩容的时候只需要在开辟一个新的字节数组,然后把内容写到里面去。
这种扩容也有两个具体的办法。一中是开辟单独的字节数组,然后用一个列表把这些独立数组关联起来。另一种是开辟一些更大的,相互共享的字节数组切片,然后用列表把这些切片和buffer关联起来。个人而言,笔者认为第二种切片方案更好一点点,但是它们之前的差异比较小。
这种追加扩容的方案不管是用独立数组还是切片都有一个优点,那就是写数据的时候不需要二外的拷贝操作。所有的数据可以直接从socket(Channel)中拷贝至数组活切片当中。
这种方案的缺点也很明显,就是数据不是存储在一个连续的数组中。这会使得数据的解析变得更加复杂,因为解析器不得不同时查找每一个独立数组的结尾和所有数组的结尾。正因为我们需要在写数据时查找消息的结尾,这个模型在设计实现时会相对不那么容易。
TLV编码消息(TLV Encoded Messages)
有些协议的消息消失采用的是一种TLV格式(Type, Length, Value)。这意味着当消息到达时,消息的完整大小存储在了消息的开始部分。我们可以立刻判断为消息开辟多少内存空间。
TLV编码是的内存管理变得更加简单。我们可以立刻知道为消息分配多少内存。即便是不完整的消息,buffer结尾后面也不会有浪费的内存。
TLV编码的一个缺点是我们需要在消息的全部数据接收到之前就开辟好需要用的所有内存。因此少量链接慢,但发送了大块数据的链接会占用较多内存,导致服务器无响应。
解决上诉问题的一个变通办法是使用一种内部包含多个TLV的消息格式。这样我们为每个TLV段分配内存而不是为整个的消息分配,并且只在消息的片段到达时才分配内存。但是消息片段很大时,任然会出现一样的问题。
另一个办法是为消息设置超时,如果长时间未接收到的消息(比如10-15秒)。这可以让服务器从偶发的并发处理大块消息恢复过来,不过还是会让服务器有一段时间无响应。另外恶意的DoS攻击会导致服务器开辟大量内存。
TLV编码有不同的变种。有多少字节使用这样确切的类型和字段长度取决于每个独立的TLV编码。有的TLV编码吧字段长度放在前面,接着放类型,最后放值。尽管字段的顺序不同,但他任然是一个TLV的类型。
TLV编码使得内存管理更加简单,这也是HTTP1.1协议让人觉得是一个不太优良的的协议的原因。正因如此,HTTP2.0协议在设计中也利用TLV编码来传输数据帧。也是因为这个原因我们设计了自己的利用TLV编码的网络协议VStack.co。
写不完整的消息(Writing Partial Messages)
在非阻塞IO管道中,写数据也是一个不小的挑战。当你调用一个非阻塞模式Channel的write()方法时,无法保证有多少机字节被写入了ByteBuffer中。write方法返回了实际写入的字节数,所以跟踪记录已被写入的字节数也是可行的。这就是我们遇到的问题:持续记录被写入的不完整的小树知道一个消息中所有的数据都发送完毕。
为了管理不完整消息的写操作,我们需要创建一个Message Writer。正如前面的Message Reader,我们也需要每个Channel配备一个Message Writer来写数据。在每个Message Writer中我们记录准确的已经写入的字节数。
为了避免多个消息传递到Message Writer超出他所能处理到Channel的量,我们需要让到达的消息进入队列。Message Writer则尽可能快的将数据写到Channel里。
下面是一个流程图,展示的是不完整消息被写入的过程:
为了使Message Writer能够持续发送刚才已经发送了一部分的消息,Message Writer需要被移植调用,这样他就可以发送更多数据。
如果你有大量的链接,你会持有大量的Message Writer实例。检查比如1百万的Message Writer实例是来确定他们是否处于可写状态是很慢的操作。首先,许多Message Writer可能根本就没有数据需要发送。我们不想检查这些实例。其次,不是所有的Channel都处于可写状态。我们不想浪费时间在这些非写入状态的Channel。
为了检查一个Channel是否可写,可以把它注册到Selector上。但是我们不希望把所有的Channel实例都注册到Selector。试想一下,如果你有1百万的链接,这里面大部分是空闲的,把1百万链接都祖册到Selector上。然后调用select方法的时候就会有很多的Channel处于可写状态。你需要检查所有这些链接中的Message Writer以确认是否有数据可写。
为了避免检查所有的这些Message Writer,以及那些根本没有消息需要发送给他们的Channel实例,我么可以采用入校两步策略:
这两个小步骤确保只有有数据要写的Channel才会被注册到Selector。
集成(Putting it All Together)
正如你所知到的,一个被阻塞的服务器需要时刻检查当前是否有显得完整消息抵达。在一个消息被完整的收到前,服务器可能需要检查多次。检查一次是不够的。
类似的,服务器也需要时刻检查当前是否有任何可写的数据。如果有的话,服务器需要检查相应的链接看他们是否处于可写状态。仅仅在消息第一次进入队列时检查是不够的,因为一个消息可能被部分写入。
总而言之,一个非阻塞的服务器要三个管道,并且经常执行:
假如你还是感觉这比较复杂难懂,可以去clone我们的源码仓: https://github.com/jjenkov/java-nio-server 也许亲眼看到了代码会帮助你理解这一块是如何实现的。
服务器线程模型(Server Thread Model)
我们在GitHub上的源码中实现的非阻塞IO服务使用了一个包含两条线程的线程模型。第一个线程负责从ServerSocketChannel接收到达的链接。另一个线程负责处理这些链接,包括读消息,处理消息,把响应写回到链接。这个双线程模型如下:
DatagramChannel数据报通道
1 | DatagramChannel channel = DatagramChannel.open(); |
receive()方法会把接收到的数据包中的数据拷贝至给定的Buffer中。如果数据包的内容超过了Buffer的大小,剩余的数据会被直接丢弃。
1 | String newData = "New String to wrte to file..."+System.currentTimeMillis(); |
上述示例会把一个字符串发送到“jenkov.com”服务器的UDP端口80.目前这个端口没有被任何程序监听,所以什么都不会发生。当发送了数据后,我们不会收到数据包是否被接收的的通知,这是由于UDP本身不保证任何数据的发送问题。
链接特定机器地址(Connecting to a Specific Address)
DatagramChannel实际上是可以指定到网络中的特定地址的。由于UDP是面向无连接的,这种链接方式并不会创建实际的连接,这和TCP通道类似。确切的说,他会锁定DatagramChannel,这样我们就只能通过特定的地址来收发数据包。
1 | channel.connect(new InetSocketAddress("jenkov.com"), 80)); |
NIO Pipe管道
一个Java NIO的管道是两个线程间单向传输数据的连接。一个管道(Pipe)有一个source channel和一个sink channel(没想到合适的中文名)。我们把数据写到sink channel中,这些数据可以同过source channel再读取出来
创建管道(Creating a Pipe)
打开一个管道通过调用Pipe.open()工厂方法,如下:Pipe pipe = Pipe.open();
向管道写入数据(Writing to a Pipe)
向管道写入数据需要访问他的sink channel,接下来就是调用write()方法写入数据了:
1 | Pipe.SinkChannel sinkChannel = pipe.sink(); |
从管道读取数据(Reading from a Pipe)
类似的从管道中读取数据需要访问他的source channel,接下来调用read()方法读取数据:
1 | Pipe.SourceChannel sourceChannel = pipe.source(); |
NIO AsynchronousFileChannel异步文件通道
1 | Path path = Paths.get("data/test.xml"); |
通过CompletionHandler读取数据(Reading Data Via a CompletionHandler)
1 | fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() { |
一旦读取完成,将会触发CompletionHandler的completed()方法,并传入一个Integer和ByteBuffer。前面的整形表示的是读取到的字节数大小。第二个ByteBuffer也可以换成其他合适的对象方便数据写入。 如果读取操作失败了,那么会触发failed()方法。
参考:http://www.jianshu.com/p/052035037297
http://ifeve.com/java-nio-scattergather/
https://java-nio.avenwu.net/java-nio-channel.html
http://www.importnew.com/24794.html