0%

SPI指的是Service Provider Interface,服务提供接口

我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。

为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制。Java spi就是提供这样的一个机制:为某个接口寻找服务实现的机制。有点类似IOC的思想,就是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要。

当服务的提供者,提供了服务接口的一种实现之后,在jar包的META-INF/services/目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。而当外部程序装配这个模块的时候,就能通过该jar包META-INF/services/里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。

基于这样一个约定就能很好的找到服务接口的实现类,而不需要再代码里制定。jdk提供服务实现查找的一个工具类:java.util.ServiceLoader.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package spi;  
public interface Search {
public void search();
}



package spi;
public class FileSearch implements Search {
@Override
public void search() {
System.out.println("哥是文件搜索");
}
}



package spi;
public class DataBaseSearch implements Search {
@Override
public void search() {
System.out.println("哥是database搜索");
}
}


public class DoSearch {
public static void main(String[] args) {
ServiceLoader<Search> sl = ServiceLoader.load(Search.class);
Iterator<Search> s = sl.iterator();
if (s.hasNext()) {
Search ss = s.next();
ss.search();
}
}
}

最后在META-INF/services目录下创建spi.Search(包名+接口名)文件,
当文件内容为spi.FileSearch(包名+实现类名)时,程序输出结果为:哥是文件搜索
当内容为spi.DataBaseSearch时,程序输出结果为:哥是database搜索.
由此可以看出DOSearch类中没有任何和具体实现有关的代码,而是基于spi的机制去查找服务的实现

dubbo中基于SPI思想的实现

Dubbo改进了JDK标准的SPI的以下问题:

1.JDK标准的SPI会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。

2.如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK标准的ScriptEngine,通过getName();获取脚本类型的名称,但如果RubyScriptEngine因为所依赖的jruby.jar不存在,导致RubyScriptEngine类加载失败,这个失败原因被吃掉了,和ruby对应不起来,当用户执行ruby脚本时,会报不支持ruby,而不是真正失败的原因。

3.增加了对扩展点IoC和AOP的支持,一个扩展点可以直接setter注入其它扩展点。

1
2
3
4
5
6
public @interface SPI {  
/**
* 缺省扩展点名。
*/
String value() default "";
}
1
2
3
@SPI("spring")   
public interface Container {
}

dubbo/dubbo-container/src/main/resources/META-INF/dubbo/internal

1
2
3
4
jetty=com.alibaba.dubbo.container.jetty.JettyContainer
log4j=com.alibaba.dubbo.container.log4j.Log4jContainer
logback=com.alibaba.dubbo.container.logback.LogbackContainer
spring=com.alibaba.dubbo.container.spring.SpringContainer

ExtensionLoader.getExtensionLoader(Container.class).getExtension(name)

ExtensionLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 此方法已经getExtensionClasses方法同步过。
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if(defaultAnnotation != null) {
String value = defaultAnnotation.value();
if(value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if(names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if(names.length == 1) cachedDefaultName = names[0];
}
}

Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}



private static final String SERVICES_DIRECTORY = "META-INF/services/";

private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";

private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

cachedAdaptiveClass : 当前Extension类型对应的AdaptiveExtension类型(只能一个)

cachedWrapperClasses : 当前Extension类型对应的所有Wrapper实现类型(无顺序)

cachedActivates : 当前Extension实现自动激活实现缓存(map,无序)

cachedNames : 扩展点实现类对应的名称(如配置多个名称则值为第一个)

cachedDefaultName : 当前扩展点的默认实现名称

cachedClasses : 扩展点实现名称对应的实现类(一个实现类可能有多个名称)

loadFile最后调用的是Class.forName(line, true, classLoader);

然后injectExtension 注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class SpiExtensionFactory implements ExtensionFactory {

public <T> T getExtension(Class<T> type, String name) {
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
if (loader.getSupportedExtensions().size() > 0) {
return loader.getAdaptiveExtension();
}
}
return null;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SpringExtensionFactory implements ExtensionFactory {

private static final Set<ApplicationContext> contexts = new ConcurrentHashSet<ApplicationContext>();

public static void addApplicationContext(ApplicationContext context) {
contexts.add(context);
}

public static void removeApplicationContext(ApplicationContext context) {
contexts.remove(context);
}

@SuppressWarnings("unchecked")
public <T> T getExtension(Class<T> type, String name) {
for (ApplicationContext context : contexts) {
if (context.containsBean(name)) {
Object bean = context.getBean(name);
if (type.isInstance(bean)) {
return (T) bean;
}
}
}
return null;
}

}

1

cglib(字节码生成库)是一个生成和转化Java字节码的高级api。被使用在AOP上。在实现内部,CGLIB库使用了ASM这一个轻量但高性能的字节码操作框架来转化字节码,产生新类

Enhancer是cglib一个很重要的类。Enhancer动态创建一个子类。

Enhancer只能在java字节码级别构造方法,但是不能构造static或者final类。

1
2
3
4
5
6
public Object createProxy(Class targetClass) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(targetClass);
enhancer.setCallback(NoOp.INSTANCE);
return enhancer.create();
}

在例子中,默认的无参构造方法被使用来创建目标对象。如果你希望CGLIB创建一个有参数的实例,你应该使用net.sf.cglib.proxy.Enhancer.create(Class[], Object[])。NoOp是内置的一个类,可以看下源码

1
2
3
4
public interface NoOp extends Callback {
NoOp INSTANCE = new NoOp() {
};
}
1
2
3
4
5
public class SampleClass {
public String test(String input) {
return "Hello world!";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testFixedValue() throws Exception {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(SampleClass.class);
enhancer.setCallback(new FixedValue() {
@Override
public Object loadObject() throws Exception {
return "Hello cglib!";
}
});
SampleClass proxy = (SampleClass) enhancer.create();
assertEquals("Hello cglib!", proxy.test(null));
}

上面的方式,任何方法都会被代理。比如proxy.toString()会返回”Hello cglib!” ,proxy.hashCode()则会抛出ClassCastException异常因为hashcode需要整数。此外final方法不会被拦截。Object#getClass 会返回类似 “SampleClass$$EnhancerByCGLIB$$e277c63c”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testInvocationHandler() throws Exception {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(SampleClass.class);
enhancer.setCallback(new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
if(method.getDeclaringClass() != Object.class && method.getReturnType() == String.class) {
return "Hello cglib!";
} else {
throw new RuntimeException("Do not know what to do.");
}
}
});
SampleClass proxy = (SampleClass) enhancer.create();
assertEquals("Hello cglib!", proxy.test(null));
assertNotEquals("Hello cglib!", proxy.toString());
}

所有调用方法将会分发到相同的InvocationHandler可能会导致死循环.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testMethodInterceptor() throws Exception {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(SampleClass.class);
enhancer.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy)
throws Throwable {
if(method.getDeclaringClass() != Object.class && method.getReturnType() == String.class) {
return "Hello cglib!";
} else {
return proxy.invokeSuper(obj, args);
}
}
});
SampleClass proxy = (SampleClass) enhancer.create();
assertEquals("Hello cglib!", proxy.test(null));
assertNotEquals("Hello cglib!", proxy.toString());
proxy.hashCode(); // Does not throw an exception or result in an endless loop.
}

MethodInterceptor的创建和链接需要生成不同类型的字节码和一些不需要Invocationhandler就能产生的运行对象

LazyLoader: Even though the LazyLoader’s only method has the same method signature as FixedValue, the LazyLoader is fundamentally different to the FixedValue interceptor. The LazyLoader is actually supposed to return an instance of a subclass of the enhanced class. This instance is requested only when a method is called on the enhanced object and then stored for future invocations of the generated proxy. This makes sense if your object is expensive in its creation without knowing if the object will ever be used. Be aware that some constructor of the enhanced class must be called both for the proxy object and for the lazily loaded object. Thus, make sure that there is another cheap (maybe protected) constructor available or use an interface type for the proxy. You can choose the invoked constructed by supplying arguments to Enhancer#create(Object…).在被代理对象需要懒加载场景下非常有用,如果被代理对象加载完成,那么在以后的代理调用时会重复使用。

Dispatcher: The Dispatcher is like the LazyLoader but will be invoked on every method call without storing the loaded object. This allows to change the implementation of a class without changing the reference to it. Again, be aware that some constructor must be called for both the proxy and the generated objects.与net.sf.cglib.proxy.LazyLoader差不多,但每次调用代理方法时都会调用loadObject方法来加载被代理对象。

ProxyRefDispatcher: This class carries a reference to the proxy object it is invoked from in its signature. This allows for example to delegate method calls to another method of this proxy. Be aware that this can easily cause an endless loop and will always cause an endless loop if the same method is called from within ProxyRefDispatcher#loadObject(Object).与Dispatcher相同,但它的loadObject方法支持传入代理对象。

NoOp: The NoOp class does not what its name suggests. Instead, it delegates each method call to the enhanced class’s method implementation.

net.sf.cglib.proxy.CallbackFilter允许你在方法级别设置回调。

1
2
3
4
5
6
7
8
9
10
public class PersistenceServiceImpl implements PersistenceService {

public void save(long id, String data) {
System.out.println(data + " has been saved successfully.");
}

public String load(long id) {
return "Jason Zhicheng Li";
}
}

accept方法将代理方法映射到回调。方法返回值是一个回调对象数组中的下标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class PersistenceServiceCallbackFilter implements CallbackFilter {

//callback index for save method
private static final int SAVE = 0;

//callback index for load method
private static final int LOAD = 1;

/**
* Specify which callback to use for the method being invoked.
* @method the method being invoked.
* @return the callback index in the callback array for this method
*/
public int accept(Method method) {
String name = method.getName();
if ("save".equals(name)) {
return SAVE;
}
// for other methods, including the load method, use the
// second callback
return LOAD;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(PersistenceServiceImpl.class);

CallbackFilter callbackFilter = new PersistenceServiceCallbackFilter();
enhancer.setCallbackFilter(callbackFilter);

AuthorizationService authorizationService = ...
Callback saveCallback = new AuthorizationInterceptor(authorizationService);
Callback loadCallback = NoOp.INSTANCE;
Callback[] callbacks = new Callback[]{saveCallback, loadCallback };
enhancer.setCallbacks(callbacks);
...
return (PersistenceServiceImpl)enhancer.create();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void testCallbackFilter() throws Exception {
Enhancer enhancer = new Enhancer();
CallbackHelper callbackHelper = new CallbackHelper(SampleClass.class, new Class[0]) {
@Override
protected Object getCallback(Method method) {
if(method.getDeclaringClass() != Object.class && method.getReturnType() == String.class) {
return new FixedValue() {
@Override
public Object loadObject() throws Exception {
return "Hello cglib!";
};
}
} else {
return NoOp.INSTANCE; // A singleton provided by NoOp.
}
}
};
enhancer.setSuperclass(MyClass.class);
enhancer.setCallbackFilter(callbackHelper);
enhancer.setCallbacks(callbackHelper.getCallbacks());
SampleClass proxy = (SampleClass) enhancer.create();
assertEquals("Hello cglib!", proxy.test(null));
assertNotEquals("Hello cglib!", proxy.toString());
proxy.hashCode(); // Does not throw an exception or result in an endless loop.
}

Bean generator

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testBeanGenerator() throws Exception {
BeanGenerator beanGenerator = new BeanGenerator();
beanGenerator.addProperty("value", String.class);
Object myBean = beanGenerator.create();

Method setter = myBean.getClass().getMethod("setValue", String.class);
setter.invoke(myBean, "Hello cglib!");
Method getter = myBean.getClass().getMethod("getValue");
assertEquals("Hello cglib!", getter.invoke(myBean));
}

Bean copier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class OtherSampleBean {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}


@Test
public void testBeanCopier() throws Exception {
BeanCopier copier = BeanCopier.create(SampleBean.class, OtherSampleBean.class, false);
SampleBean bean = new SampleBean();
bean.setValue("Hello cglib!");
OtherSampleBean otherBean = new OtherSampleBean();
copier.copy(bean, otherBean, null);
assertEquals("Hello cglib!", otherBean.getValue());
}

Bulk bean

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testBulkBean() throws Exception {
BulkBean bulkBean = BulkBean.create(SampleBean.class,
new String[]{"getValue"},
new String[]{"setValue"},
new Class[]{String.class});
SampleBean bean = new SampleBean();
bean.setValue("Hello world!");
assertEquals(1, bulkBean.getPropertyValues(bean).length);
assertEquals("Hello world!", bulkBean.getPropertyValues(bean)[0]);
bulkBean.setPropertyValues(bean, new Object[] {"Hello cglib!"});
assertEquals("Hello cglib!", bean.getValue());
}

Bean map

1
2
3
4
5
6
7
@Test
public void testBeanGenerator() throws Exception {
SampleBean bean = new SampleBean();
BeanMap map = BeanMap.create(bean);
bean.setValue("Hello cglib!");
assertEquals("Hello cglib", map.get("value"));
}

https://github.com/cglib/cglib/wiki/Tutorial

AOP源码如下~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Configure CGLIB Enhancer...
Enhancer enhancer = createEnhancer();
if (classLoader != null) {
enhancer.setClassLoader(classLoader);
if (classLoader instanceof SmartClassLoader &&
((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) {
enhancer.setUseCache(false);
}
}
enhancer.setSuperclass(proxySuperClass);
enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised));
enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE);
enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader));

Callback[] callbacks = getCallbacks(rootClass);
Class<?>[] types = new Class<?>[callbacks.length];
for (int x = 0; x < types.length; x++) {
types[x] = callbacks[x].getClass();
}
// fixedInterceptorMap only populated at this point, after getCallbacks call above
enhancer.setCallbackFilter(new ProxyCallbackFilter(
this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset));
enhancer.setCallbackTypes(types);

protected Object createProxyClassAndInstance(Enhancer enhancer, Callback[] callbacks) {
enhancer.setInterceptDuringConstruction(false);
enhancer.setCallbacks(callbacks);
return (this.constructorArgs != null ?
enhancer.create(this.constructorArgTypes, this.constructorArgs) :
enhancer.create());
}

使用动态代理的五大步骤
1.通过实现InvocationHandler接口来自定义自己的InvocationHandler;

2.通过Proxy.getProxyClass获得动态代理类

3.通过反射机制获得代理类的构造方法,方法签名为getConstructor(InvocationHandler.class)

4.通过构造函数获得代理对象并将自定义的InvocationHandler实例对象传为参数传入

5.通过代理对象调用目标方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface HelloWorld {  
void sayHello(String name);
}


public class HelloWorldImpl implements HelloWorld {
@Override
public void sayHello(String name) {
System.out.println("Hello " + name);
}
}


public class CustomInvocationHandler implements InvocationHandler {
private Object target;

public CustomInvocationHandler(Object target) {
this.target = target;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("Before invocation");
Object retVal = method.invoke(target, args);
System.out.println("After invocation");
return retVal;
}
}

方法1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ProxyTest {  
public static void main(String[] args) throws Exception {
//生成$Proxy0的class文件
System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles", "true");
//获取动态代理类
Class proxyClazz = Proxy.getProxyClass(HelloWorld.class.getClassLoader(),HelloWorld.class);

//获得代理类的构造函数,并传入参数类型InvocationHandler.class
Constructor constructor = proxyClazz.getConstructor(InvocationHandler.class);
//通过构造函数来创建动态代理对象,将自定义的InvocationHandler实例传入
HelloWorld proxy = (HelloWorld) constructor.newInstance(new CustomInvocationHandler(new HelloWorldImpl()));
proxy.sayHello("Mikan");
}
}

方法2

1
2
3
4
5
6
7
8
9
10
11
12
public class ProxyTest {  
public static void main(String[] args) throws Exception {
//生成$Proxy0的class文件
System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles", "true");
CustomInvocationHandler handler = new CustomInvocationHandler(new HelloWorldImpl());
HelloWorld proxy = (HelloWorld) Proxy.newProxyInstance(
ProxyTest.class.getClassLoader(),
new Class[]{HelloWorld.class},
handler);
proxy.sayHello("Mikan");
}
}

http://www.cnblogs.com/MOBIN/p/5597215.html

假设有如下业务:有一堆有颜色和重量的苹果,我需要通过颜色和重量取出相应苹果
定义苹果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Apple {
private int weight = 0;
private String color = "";

public Apple(int weight, String color){
this.weight = weight;
this.color = color;
}

public Integer getWeight() {
return weight;
}

public void setWeight(Integer weight) {
this.weight = weight;
}

public String getColor() {
return color;
}

public void setColor(String color) {
this.color = color;
}

public String toString() {
return "Apple{" +
"color='" + color + '\'' +
", weight=" + weight +
'}';
}
}

假设

1
2
inventory = Arrays.asList(new Apple(80,"green"), new Apple(155, "green"), new Apple(120, "red"));

解决方案1:

1
2
3
4
5
6
List<Apple> result = new ArrayList<>();
for(Apple apple: inventory){
if("green".equals(apple.getColor())){
result.add(apple);
}
}

这是最常见的方法。但是这样的结构很难复用。比如我颜色不确定呢?

解决方案2:

1
2
3
4
5
6
List<Apple> result = new ArrayList<>();
for(Apple apple: inventory){
if(apple.getColor().equals(color)){
result.add(apple);
}
}

如果我需要100g以上的且红色的苹果我就需要

1
2
3
4
5
6
7
List<Apple> result = new ArrayList<>();
for(Apple apple: inventory){
if(apple.getColor().equals(color)
&& apple.getWeight() > weight){
result.add(apple);
}
}

如果我需要100g以上或者红色的苹果

1
2
3
4
5
6
7
List<Apple> result = new ArrayList<>();
for(Apple apple: inventory){
if(apple.getColor().equals(color)
|| apple.getWeight() > weight){
result.add(apple);
}
}

是不是变得没完没了了?
解决方案3:

1
2
3
4
5
6
7
8
9
public static List<Apple> filterApples(List<Apple> inventory, ApplePredicate p){
List<Apple> result = new ArrayList<>();
for(Apple apple : inventory){
if(p.test(apple)){
result.add(apple);
}
}
return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
interface ApplePredicate{
boolean test(Apple a);
}
class AppleWeightPredicate implements ApplePredicate{
public boolean test(Apple apple){
return apple.getWeight() > 150;
}
}
class AppleColorPredicate implements ApplePredicate{
public boolean test(Apple apple){
return "green".equals(apple.getColor());
}
}

class AppleRedAndHeavyPredicate implements ApplePredicate{
public boolean test(Apple apple){
return "red".equals(apple.getColor())
&& apple.getWeight() > 150;
}
}
1
List<Apple> greenApples2 = filterApples(inventory, new AppleColorPredicate());

这种方法和合适。不过如果规则也是不确定的呢?

解决方案4:

1
2
3
4
5
List<Apple> redApples2 = filterApples(inventory, new ApplePredicate() {
public boolean test(Apple a){
return a.getColor().equals("red");
}
});

Good!这样就能做到定制化了。不过通过lambda写起来更加优美

解决方案5:

1
2
List<Apple> redApples2 = filterApples(inventory, (Apple a)-> a.getColor().equals("red"));

如果我们要推广。不只是苹果而是所有的判断规则?

解决方案6:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
interface Predicate<T>{
boolean test(T t);
}

public static <T> List<T> filter(List<T> inventory, Predicate<T> p){
List<T> result = new ArrayList<>();
for(T apple : inventory){
if(p.test(apple)){
result.add(apple);
}
}
return result;
}

1
List<Apple> redApples2 = filter(inventory, (Apple a)-> a.getColor().equals("red"));

其实java 8 的思路也是这样的
解决方案7:

1
2
3
4
List<Apple> redApples2 = inventory
.stream()
.filter((Apple a)-> a.getColor().equals("red"))
.collect(Collectors.toList());

方案一:数据库乐观锁

乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。

异常实现流程

1
2
3
4
5
6
7
8
9
10
11
12
-- 可能会发生的异常情况
-- 线程1查询,当前left_count为1,则有记录
select * from t_bonus where id = 10001 and left_count > 0

-- 线程2查询,当前left_count为1,也有记录
select * from t_bonus where id = 10001 and left_count > 0

-- 线程1完成领取记录,修改left_count为0,
update t_bonus set left_count = left_count - 1 where id = 10001

-- 线程2完成领取记录,修改left_count为-1,产生脏数据
update t_bonus set left_count = left_count - 1 where id = 10001

通过乐观锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 添加版本号控制字段
ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;

-- 线程1查询,当前left_count为1,则有记录,当前版本号为1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 线程2查询,当前left_count为1,有记录,当前版本号为1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 线程1,更新完成后当前的version为1235,update状态为1,更新成功
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

-- 线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

方案二:基于Redis的分布式锁

SETNX命令(SET if Not eXists)
语法:SETNX key value
功能:原子性操作,当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。

Expire命令
语法:expire(key, expireTime)
功能:key设置过期时间

GETSET命令
语法:GETSET key value
功能:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。

GET命令
语法:GET key
功能:返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。

DEL命令
语法:DEL key [KEY …]
功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。

SET命令
语法:SET key value [expiration EX seconds |PX milliseconds][NX|XX]
功能:如果key已经存在,则覆盖
解释:EX seconds – 指定到期时间,秒
PX milliseconds – 指定到期时间,毫秒
NX – 不存在才设置
XX – 只有存在才设置

第一种:使用redis的setnx()、expire()方法,用于分布式锁

  1. setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功
  2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。
  3. 执行完业务代码后,可以通过delete命令删除key。

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题。这个问题可以通过使用Lua脚本(包含SETNX和EXPIRE两条命令),但是如果Redis仅执行了一条命令后crash或者发生主从切换,依然会出现锁没有过期时间,最终导致无法释放。

1
2
3
4
5
local r = tonumber(redis.call('SETNX', KEYS[1],ARGV[1]));
if (r == 1) then
redis.call('PEXPIRE',KEYS[1],ARGV[2]);
end
return r

第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题

  1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
  2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
  3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
  4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
  5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

注意:这个版本去掉了EXPIRE命令,改为通过Value时间戳值来判断过期

问题:

1. 在锁竞争较高的情况下,会出现Value不断被覆盖,但是没有一个Client获取到锁。
2. 在获取锁的过程中不断的修改原有锁的数据,设想一种场景C1,C2竞争锁,C1获取到了锁,C2锁执行了GETSET操作修改了C1锁的过期时间,如果C1没有正确释放锁,锁的过期时间被延长,其它Client需要等待更久的时间。

第三种:使用 SET resource_name my_random_value NX PX 30000

redis 2.6.12版本以后支持了set带过期时间的写法,官方的意思是以后会逐步用setnx取代set

  1. Redis客户端为了获取锁,向Redis节点发送如下命令:

    1
    SET resource_name my_random_value NX PX 30000

    上面的命令如果执行成功,则客户端成功获取到了锁,接下来就可以访问共享资源了;而如果上面的命令执行失败,则说明获取锁失败。

    注意,在上面的SET命令中:

    • my_random_value是由客户端生成的一个随机字符串,它要保证在足够长的一段时间内在所有客户端的所有获取锁的请求中都是唯一的。
    • NX表示只有当resource_name对应的key值不存在的时候才能SET成功。这保证了只有第一个请求的客户端才能获得锁,而其它客户端在锁被释放之前都无法获得锁。
    • PX 30000表示这个锁有一个30秒的自动过期时间。当然,这里30秒只是一个例子,客户端可以选择合适的过期时间。
  2. 当客户端完成了对共享资源的操作之后,执行下面的Redis Lua脚本来释放锁

    1
    2
    3
    4
    5
    if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
    else
    return 0
    end

这段Lua脚本在执行的时候要把前面的my_random_value作为ARGV[1]的值传进去,把resource_name作为KEYS[1]的值传进去。

问题:

  1. 这个锁必须要设置一个过期时间。否则的话,当一个客户端获取锁成功之后,假如它崩溃了,或者由于发生了网络分割(network partition)导致它再也无法和Redis节点通信了,那么它就会一直持有这个锁,而其它客户端永远无法获得锁了。antirez在后面的分析中也特别强调了这一点,而且把这个过期时间称为锁的有效时间(lock validity time)。获得锁的客户端必须在这个时间之内完成对共享资源的访问。

  2. 第一步获取锁的操作,网上不少文章把它实现成了两个Redis命令:

    1
    2
    SETNX resource_name my_random_value
    EXPIRE resource_name 30

    虽然这两个命令和前面算法描述中的一个SET命令执行效果相同,但却不是原子的。如果客户端在执行完SETNX后崩溃了,那么就没有机会执行EXPIRE了,导致它一直持有这个锁。

  3. 设置一个随机字符串my_random_value是很有必要的,它保证了一个客户端释放的锁必须是自己持有的那个锁。假如获取锁时SET的不是一个随机字符串,而是一个固定值,那么可能会发生下面的执行序列:

    1. 客户端1获取锁成功。
    2. 客户端1在某个操作上阻塞了很长时间。
    3. 过期时间到了,锁自动释放了。
    4. 客户端2获取到了对应同一个资源的锁。
    5. 客户端1从阻塞中恢复过来,释放掉了客户端2持有的锁。

    之后,客户端2在访问共享资源的时候,就没有锁为它提供保护了。

  4. 释放锁的操作必须使用Lua脚本来实现。释放锁其实包含三步操作:’GET’、判断和’DEL’,用Lua脚本来实现能保证这三步的原子性。否则,如果把这三步操作放到客户端逻辑中去执行的话,就有可能发生与前面第三个问题类似的执行序列:

    1. 客户端1获取锁成功。
    2. 客户端1访问共享资源。
    3. 客户端1为了释放锁,先执行’GET’操作获取随机字符串的值。
    4. 客户端1判断随机字符串的值,与预期的值相等。
    5. 客户端1由于某个原因阻塞住了很长时间。
    6. 过期时间到了,锁自动释放了。
    7. 客户端2获取到了对应同一个资源的锁。
    8. 客户端1从阻塞中恢复过来,执行DEL操纵,释放掉了客户端2持有的锁。

    实际上,在上述第三个问题和第四个问题的分析中,如果不是客户端阻塞住了,而是出现了大的网络延迟,也有可能导致类似的执行序列发生。

  5. 假如Redis节点宕机了,那么所有客户端就都无法获得锁了,服务变得不可用。为了提高可用性,我们可以给这个Redis节点挂一个Slave,当Master节点不可用的时候,系统自动切到Slave上(failover)。但由于Redis的主从复制(replication)是异步的,这可能导致在failover过程中丧失锁的安全性。考虑下面的执行序列:

    1. 客户端1从Master获取了锁。
    2. Master宕机了,存储锁的key还没有来得及同步到Slave上。
    3. Slave升级为Master。
    4. 客户端2从新的Master获取到了对应同一个资源的锁。

    于是,客户端1和客户端2同时持有了同一个资源的锁。

第四种:Redlock算法

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。

  1. 获取当前Unix时间,以毫秒为单位。

  2. 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。

  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。

  4. 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。

  5. 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。

https://github.com/www1350/redislock

方案三:基于Zookeeper的分布式锁

利用节点名称的唯一性来实现独占锁

ZooKeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。

利用临时顺序节点控制时序实现

/lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。
算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。 对于解锁操作,只需要将自身创建的节点删除即可。

  1. 客户端调用create()方法创建名为“locknode/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
  2. 客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher
  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
  4. 如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。

释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可。

问题所在

上面这个分布式锁的实现中,大体能够满足了一般的分布式集群竞争锁的需求。这里说的一般性场景是指集群规模不大,一般在10台机器以内。

不过,细想上面的实现逻辑,我们很容易会发现一个问题,步骤4,“即获取所有的子点,判断自己创建的节点是否已经是序号最小的节点”,这个过程,在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知——这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,服务器就会像其余客户端发送大量的事件通知——这就是所谓的羊群效应。而这个问题的根源在于,没有找准客户端真正的关注点。

我们再来回顾一下上面的分布式锁竞争过程,它的核心逻辑在于:判断自己是否是所有节点中序号最小的。于是,很容易可以联想的到的是,每个节点的创建者只需要关注比自己序号小的那个节点。

改进后的分布式锁实现

下面是改进后的分布式锁实现,和之前的实现方式唯一不同之处在于,这里设计成每个锁竞争者,只需要关注”_locknode_”节点下序号比自己小的那个节点是否存在即可。实现如下:

  1. 客户端调用create()方法创建名为“locknode/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
  2. 客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,注意,这里不注册任何Watcher。
  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点序号最小,那么就认为这个客户端获得了锁。
  4. 如果在步骤3中发现自己并非所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时注册事件监听。
  5. 之后当这个被关注的节点被移除了,客户端会收到相应的通知。这个时候客户端需要再次调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,确保自己确实是最小的节点了,然后进入步骤3。

方案四:基于memcached的分布式锁

利用add操作实现独占锁

memcache中**Memcache::add()**方法在缓存服务器之前不存在key时, 以key作为key存储一个变量var到缓存服务器,该操作是原子操作,可以设置一个超时时间。del用来解锁。

参考:

http://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

https://mp.weixin.qq.com/s/JTsJCDuasgIJ0j95K8Ay8w

https://mp.weixin.qq.com/s/4CUe7OpM6y1kQRK8TOC_qQ

http://redis.cn/topics/distlock.html

http://tech.dianwoda.com/2018/04/11/redisfen-bu-shi-suo-jin-hua-shi/

方案1

通过MyBatis配置文件创建读写分离两个DataSource,每个SqlSessionFactoryBean对象的mapperLocations属性制定两个读写数据源的配置文件。将所有读的操作配置在读文件中,所有写的操作配置在写文件中。

  • 优点:实现简单
  • 缺点:维护麻烦,需要对原有的xml文件进行重新修改,不支持多读,不易扩展
  • 实现方式
Read more »

假如配置了两个数据源dataSourceOne、dataSourceTwo

1
2
3
4
5
6
7
8
9
10
<bean id="dynamicDataSource" class="com.core.DynamicDataSource">  
<property name="targetDataSources">
<map key-type="java.lang.String">
<entry value-ref="dataSourceOne" key="dataSourceOne"></entry>
<entry value-ref="dataSourceTwo" key="dataSourceTwo"></entry>
</map>
</property>
<property name="defaultTargetDataSource" ref="dataSourceOne">
</property>
</bean>

DynamicDataSource.class

1
2
3
4
5
6
7
8
9
10
11
12
package com.core;  

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class DynamicDataSource extends AbstractRoutingDataSource{

@Override
protected Object determineCurrentLookupKey() {
return DatabaseContextHolder.getCustomerType();
}

}

DatabaseContextHolder.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.core;  

public class DatabaseContextHolder {

private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();

public static void setCustomerType(String customerType) {
contextHolder.set(customerType);
}

public static String getCustomerType() {
return contextHolder.get();
}

public static void clearCustomerType() {
contextHolder.remove();
}
}

DataSourceInterceptor.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.core;  

import org.aspectj.lang.JoinPoint;
import org.springframework.stereotype.Component;

@Component
public class DataSourceInterceptor {

public void setdataSourceOne(JoinPoint jp) {
DatabaseContextHolder.setCustomerType("dataSourceOne");
}

public void setdataSourceTwo(JoinPoint jp) {
DatabaseContextHolder.setCustomerType("dataSourceTwo");
}
}

aop配置

1
2
3
4
5
6
7
<aop:config>  
<aop:aspect id="dataSourceAspect" ref="dataSourceInterceptor">
<aop:pointcut id="daoOne" expression="execution(* com.dao.one.*.*(..))" />
<aop:pointcut id="daoTwo" expression="execution(* com.dao.two.*.*(..))" />
<aop:before pointcut-ref="daoOne" method="setdataSourceOne" />
<aop:before pointcut-ref="daoTwo" method="setdataSourceTwo" />
</aop:aspect>

事务传播属性

1
2
3
4
5
6
7
8
9
10
@Transactional(propagation=Propagation.REQUIRED) //如果有事务,那么加入事务,没有的话新建一个(不写的情况下)  **默认**
@Transactional(propagation=Propagation.NOT_SUPPORTED) //容器不为这个方法开启事务
@Transactional(propagation=Propagation.REQUIRES_NEW) //不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务
@Transactional(propagation=Propagation.MANDATORY) //必须在一个已有的事务中执行,否则抛出异常
@Transactional(propagation=Propagation.NEVER) //必须在一个没有的事务中执行,否则抛出异常(与Propagation.MANDATORY相反)
@Transactional(propagation=Propagation.SUPPORTS) //如果其他bean调用这个方法,在其他bean中声明事务,那就用事务.如果其他bean没有声明事务,那就不用事务.
@Transactional(propagation=Propagation.NESTED) //如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。
@Transactional (propagation = Propagation.REQUIRED,readOnly=true) //readOnly=true只读,不能更新,删除
@Transactional (propagation = Propagation.REQUIRED,timeout=30)//设置超时时间
@Transactional (propagation = Propagation.REQUIRED,isolation=Isolation.DEFAULT)//设置数据库隔离级别

xml配置:

1
2
3
4
5
6
7
8
9
10
  <!-- hibernate事务管理器 -->  
<bean id="transactionManager"
class="org.springframework.orm.hibernate3.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory" />
</bean>

<!-- mybatis配置事务管理 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"></property>
</bean>

1.直接配置

1
2
3
4
5
6
7
8
9
10
11
12
13
<bean id="userDao"  
class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
<!-- 配置事务管理器 -->
<property name="transactionManager" ref="transactionManager" />
<property name="target" ref="userDaoTarget" />
<property name="proxyInterfaces" value="com.absurd.xxxDao" />
<!-- 配置事务属性 -->
<property name="transactionAttributes">
<props>
<prop key="*">PROPAGATION_REQUIRED</prop>
</props>
</property>
</bean>

2.共享一个代理基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<bean id="transactionBase"  
class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean"
lazy-init="true" abstract="true">
<!-- 配置事务管理器 -->
<property name="transactionManager" ref="transactionManager" />
<!-- 配置事务属性 -->
<property name="transactionAttributes">
<props>
<prop key="*">PROPAGATION_REQUIRED</prop>
</props>
</property>
</bean>


<bean id="userDao" parent="transactionBase" >
<property name="target" ref="userDaoTarget" />
</bean>

3.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<bean id="transactionInterceptor"  
class="org.springframework.transaction.interceptor.TransactionInterceptor">
<property name="transactionManager" ref="transactionManager" />
<!-- 配置事务属性 -->
<property name="transactionAttributes">
<props>
<prop key="*">PROPAGATION_REQUIRED</prop>
</props>
</property>
</bean>

<bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator">
<property name="beanNames">
<list>
<value>*Dao</value>
</list>
</property>
<property name="interceptorNames">
<list>
<value>transactionInterceptor</value>
</list>
</property>
</bean>

4.使用tx标签配置的拦截器

1
2
3
4
5
6
7
8
9
10
11
12
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*" propagation="REQUIRED" />
</tx:attributes>
</tx:advice>

<aop:config>
<aop:pointcut id="interceptorPointCuts"
expression="execution(* com.absurd.*.*(..))" />
<aop:advisor advice-ref="txAdvice"
pointcut-ref="interceptorPointCuts" />
</aop:config>

5.全注解

<tx:annotation-driven transaction-manager="transactionManager"/>

6.编程:

1
2
3
4
5
6
7
8
9
10
11
12
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

TransactionStatus status = txManager.getTransaction(def);
try {
userMapper.insertUser(user);
}
catch (MyException ex) {
txManager.rollback(status);
throw ex;
}
txManager.commit(status);

TransactionStatus

1
2
3
4
5
6
7
8
9
10
11
boolean isNewTransaction();

boolean hasSavepoint();

void setRollbackOnly();

boolean isRollbackOnly();

void flush();

boolean isCompleted();

PlatformTransactionManager

1
2
3
4
5
TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;

void commit(TransactionStatus status) throws TransactionException;

void rollback(TransactionStatus status) throws TransactionException;

源码解析:
从TxNamespaceHandler入手

1
2
3
4
5
6
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}

可以看到分别用不同的解析器去解析xml,
TxAdviceBeanDefinitionParser用来解析<tx:advice>并被解析为RuleBasedTransactionAttribute。

parse(Element element, ParserContext parserContext)->parseInternal(Element element, ParserContext parserContext)->getBeanClass(Element element)->TransactionInterceptor
解析到的bean被设置为TransactionInterceptor统一拦截

image

AnnotationDrivenBeanDefinitionParser是用来解析<annotation-driven>为RootBeanDefinition

image

DataSourceTransactionManager 和DataSourceTransactionManager
image

TransactionInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
	@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}



protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//选择事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
//切面方法标识
final String joinpointIdentification = methodIdentification(method, targetClass);

if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
//原有逻辑执行
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清理TransactionInfo信息
cleanupTransactionInfo(txInfo);
}
//提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}

else {
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
return new ThrowableHolder(ex);
}
}
finally {
cleanupTransactionInfo(txInfo);
}
}
});

// Check result: It might indicate a Throwable to rethrow.
if (result instanceof ThrowableHolder) {
throw ((ThrowableHolder) result).getThrowable();
}
else {
return result;
}
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}


protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//获取事务状态
status = tm.getTransaction(txAttr);
}
else {
//debug..
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

接下来看下:AbstractPlatformTransactionManager#getTransaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();

//...
//如果存在事务
if (isExistingTransaction(transaction)) {

return handleExistingTransaction(definition, transaction, debugEnabled);
}

// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}

//不存在事务,Propagation.MANDATORY,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//不用挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
//debug...
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
//创建空事务,同步
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//PROPAGATION_NEVER抛出异常(Propagation.NEVER)
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//PROPAGATION_NOT_SUPPORTED不为这个方法开启事务(Propagation.NOT_SUPPORTED)
//线程存在事务则挂起(挂起实现其实就是断开连接下次进行重连)
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
//debug...
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//创建非事务的事务状态,让方法非事务地执行
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//PROPAGATION_REQUIRES_NEW不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务(Propagation.REQUIRES_NEW )
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
//...
//挂起原来事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
//只要不是never就执行完毕新的
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
//异常了恢复原来事务
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
//NESTED如果是嵌套事务(TransactionDefinition.PROPAGATION_NESTED)
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//如果不允许事务嵌套,则抛出异常
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");
}
//debug...
}
//如果允许使用savepoint保存点保存嵌套事务
if (useSavepointForNestedTransaction()) {
//为当前事务创建一个保存点
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
//如果不允许使用savepoint保存点保存嵌套事务 ,使用JTA的嵌套commit/rollback调用
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}

//debug...
//PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
//校验已存在的事务,如果已有事务与事务属性配置不一致,则抛出异常
if (isValidateExistingTransaction()) {
//如果事务隔离级别不是默认隔离级别 ,获取到的当前事务隔离级别为null 或 获取不等于事务属性配置的隔离级别
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
//如果当前已有事务是只读 ,抛出异常
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
	@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
//debug...
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);


if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
//debug...
con.setAutoCommit(false);
}
txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// Bind the session holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}

catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
//如果事务是激活的,且当前线程事务同步机制也是激活状态
if (TransactionSynchronizationManager.isSynchronizationActive()) {
//挂起当前线程中所有同步的事务 TransactionSynchronization#suspend 还有 TransactionSynchronizationManager#unbindResource
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
catch (Error err) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw err;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}



//doSuspendSynchronization方法将逐个挂起当前线程中的TransactionSynchronization
private List<TransactionSynchronization> doSuspendSynchronization() {
List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.suspend();
}
TransactionSynchronizationManager.clearSynchronization();
return suspendedSynchronizations;
}

//ConnectionSynchronization#suspend
public void suspend() {
if (this.holderActive) {
//解绑数据源
TransactionSynchronizationManager.unbindResource(this.dataSource);
// 如果存在连接,且处于打开状态
if (this.connectionHolder.hasConnection() && !this.connectionHolder.isOpen()) {
// 当挂起的时候如果没有句柄连接到该connection,将释放该连接
// 当resume的时候 会重开打开一个连接参与到原来的事务中
releaseConnection(this.connectionHolder.getConnection(), this.dataSource);
this.connectionHolder.setConnection(null);
}
}
}

//恢复事务
private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.resume();
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}

//SqlSessionSynchronization#resume
@Override
public void resume() {
if (this.holderActive) {
//debug...
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder);
}
}


//DataSourceTransactionManager#doSuspend
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
ConnectionHolder conHolder = (ConnectionHolder)
TransactionSynchronizationManager.unbindResource(this.dataSource);
return conHolder;
}

事务提交

AbstractPlatformTransactionManager##commit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
	@Override
public final void commit(TransactionStatus status) throws TransactionException {
//如果事务状态是已完成,再次提交会抛出“Transaction is already completed - do not call commit or rollback more than once per transaction”
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
//rollback only
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
//回滚
processRollback(defStatus);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus);
// Throw UnexpectedRollbackException only at outermost transaction boundary
// or if explicitly asked to.
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
return;
}

processCommit(defStatus);
}

private void processRollback(DefaultTransactionStatus status) {
try {
try {
//回调TransactionSynchronization对象的beforeCompletion方法。
triggerBeforeCompletion(status);
//有保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
//回滚到保存点
status.rollbackToHeldSavepoint();
}
//如果是一个新事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
//rollback
doRollback(status);
}
else if (status.hasTransaction()) {
//如果RollbackOnly或者globalRollbackOnParticipationFailure(部分失败)
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex) {
//TransactionSynchronization 的afterCompletion
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}


private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
//TransactionSynchronization 的beforeCommit 提交前提示
triggerBeforeCommit(status);
//TransactionSynchronization 的beforeCompletion 完成前提示
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
//提交事务
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}

// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}

}
finally {
cleanupAfterCompletion(status);
}
}

隔离级别(其实最后是通过jdbc的隔离级别做的):

Isolation.DEFAULT(TransactionDefinition.ISOLATION_DEFAULT)
使用数据库默认的事务隔离级别。

Isolation.READ_UNCOMMITTED(TransactionDefinition.ISOLATION_READ_UNCOMMITTED),
这是事务最低的隔离级别,它允许另外一个事务可以看到这个事务未提交的数据。这种隔离级别会产生脏读,不可重复读和幻像读。
实现:SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED

Isolation.READ_COMMITTED(TransactionDefinition.ISOLATION_READ_COMMITTED),
保证一个事务修改的数据提交后才能被另外一个事务读取。另外一个事务不能读取该事务未提交的数据。这种事务隔离级别可以避免脏读出现,但是可能会出现不可重复读和幻像读。
实现: SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED

Isolation.REPEATABLE_READ(TransactionDefinition.ISOLATION_REPEATABLE_READ),
这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻像读。
实现:SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ

Isolation.SERIALIZABLE(TransactionDefinition.ISOLATION_SERIALIZABLE);
这是花费最高代价但是最可靠的事务隔离级别。事务被处理为顺序执行。除了防止脏读,不可重复读外,还避免了幻读。
实现:SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE

详细:http://www1350.github.io/#post/64

效果如下
image

关于表设计,网上大部分人的设计是储存一个父评论id,采用自反递归查询
http://www.tracefact.net/Software-Design/Unlimited-comment-quote-using-recursion.aspx

我认为
1.这个父id可以按层次顺序存储所有父id,这样批量查询一次就行了。
2.可以在评论内容里面直接写入盖楼效果,展示的时候css控制就好了

3.闭包表
http://blog.jobbole.com/112315/

线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue

  • 单生产者,单消费者 用 LinkedBlockingqueue
  • 多生产者,单消费者 用 LinkedBlockingqueue
  • 单生产者 ,多消费者 用 ConcurrentLinkedQueue
  • 多生产者 ,多消费者 用 ConcurrentLinkedQueue

可能报异常 返回布尔值 可能阻塞 设定等待时间

  • 入队 add(e) offer(e) put(e) offer(e, timeout, unit)
  • 出队 remove() poll() take() poll(timeout, unit)
  • 查看 element() peek() 无 无
  • add(e) remove() element() 方法不会阻塞线程。当不满足约束条件时,会抛出IllegalStateException 异常。例如:当队列被元素填满后,再调用add(e),则会抛出异常。
  • offer(e) poll() peek() 方法即不会阻塞线程,也不会抛出异常。例如:当队列被元素填满后,再调用offer(e),则不会插入元素,函数返回false。
  • 要想要实现阻塞功能,需要调用put(e) take() 方法。当不满足约束条件时,会阻塞线程。

ConcurrentLinkedQueue内部有个匿名内部类Node
源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private static class Node<E> {
volatile E item;
volatile Node<E> next;

/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}

boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

// Unsafe mechanics

private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}