在Dropwizard/JPA/Hibernate中自动重试事务/请求



我目前正在使用Dropwizard框架和Dropwizard - Hibernate分别实现JPA/Hibernate的REST API web服务(使用PostgreSQL数据库)。我在一个资源中有一个方法,我用@UnitOfWork注释了这个方法,以便为整个请求获得一个事务。资源方法调用我的一个dao的方法,该dao扩展了AbstractDAO<MyEntity>,用于与数据库通信我的实体(类型为MyEntity)的检索或修改。

这个DAO方法做了以下事情:首先,它从数据库中选择一个实体实例,也就是一行。之后,检查实体实例,并根据其属性更改其某些属性。在这种情况下,应该更新数据库中的行。我没有指定任何关于缓存、锁定或事务的其他内容,所以我假设默认是Hibernate强制的某种乐观锁定机制。因此(我认为),当在当前线程中从数据库中选择实体实例后,在另一个线程中删除实体实例时,在试图提交事务时抛出StaleStateException,因为应该更新的实体实例之前已被另一个线程删除。

当使用@UnitOfWork注释时,我的理解是我无法捕获此异常,无论是在DAO方法中还是在资源方法中。我现在可以实现一个ExceptionMapper<StaleStateException>为Jersey提供一个HTTP 503响应与Retry-After头或类似的东西给客户端,告诉它重试它的请求。但我宁愿先尝试请求/事务(这基本上是相同的,因为这里的@UnitOfWork注释),而仍然在服务器上。

在使用Dropwizard时,是否有任何服务器端事务重试机制的示例实现?比如重试可配置的次数(例如3次),然后失败,出现异常/HTTP 503响应。你将如何实现这一点?我首先想到的是另一个注释,比如@Retry(exception = StaleStateException.class, count = 3),我可以把它添加到我的资源中。对此有什么建议吗?或者是否有另一种解决方案来解决我的问题,考虑不同的锁定/事务相关的事情?

另一种方法是使用注入框架——在我的例子中是向导——并为此使用方法拦截器。这是一个更通用的解决方案。

DW通过https://github.com/xvik/dropwizard-guicey非常顺利地与guice集成

我有一个可以重试任何异常的通用实现。它在注释上起作用,如下所示:

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Retry {
}

拦截器然后(使用docs):

 /**
 * Abstract interceptor to catch exceptions and retry the method automatically.
 * Things to note:
 * 
 * 1. Method must be idempotent (you can invoke it x times without alterint the result) 
 * 2. Method MUST re-open a connection to the DB if that is what is retried. Connections are in an undefined state after a rollback/deadlock. 
 *    You can try and reuse them, however the result will likely not be what you expected 
 * 3. Implement the retry logic inteligently. You may need to unpack the exception to get to the original.
 * 
 * @author artur
 *
 */
public abstract class RetryInterceptor implements MethodInterceptor {
    private static final Logger log = Logger.getLogger(RetryInterceptor.class);
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        if(invocation.getMethod().isAnnotationPresent(Retry.class)) {
            int retryCount = 0;
            boolean retry = true;
            while(retry && retryCount < maxRetries()) {
                try {
                    return invocation.proceed();
                } catch(Exception e) {
                    log.warn("Exception occured while trying to executed method", e);
                    if(!retry(e)) {
                        retry = false;
                    } {
                        retryCount++;
                    }
                }
            }
        }
        throw new IllegalStateException("All retries if invocation failed");
    }
    protected boolean retry(Exception e) {
        return false;
    }
    protected int maxRetries() {
        return 0;
    }
}

关于这种方法有几点需要注意。

  • 重试的方法必须被设计成可以多次调用而不改变任何结果(例如,如果方法以增量的形式存储临时结果,那么执行两次可能会增加两次)

  • 数据库异常通常不保存以备重试。他们必须打开一个新的连接(特别是当重试死锁时,这是我的情况)

除此之外,这个基本实现只是捕获任何东西,然后将重试计数和检测委托给实现类。例如,我的特定死锁重试拦截器:

public class DeadlockRetryInterceptor extends RetryInterceptor {
    private static final Logger log = Logger.getLogger(MsRetryInterceptor.class);
    @Override
    protected int maxRetries() {
        return 6;
    }
    @Override
    protected boolean retry(Exception e) {
        SQLException ex = unpack(e);
        if(ex == null) {
            return false;
        }
        int errorCode = ex.getErrorCode();
        log.info("Found exception: " + ex.getClass().getSimpleName() + " With error code: " + errorCode, ex);
        return errorCode == 1205;
    }
    private SQLException unpack(final Throwable t) {
        if(t == null) {
            return null;
        }
        if(t instanceof SQLException) {
            return (SQLException) t;
        }
        return unpack(t.getCause());
    }
}

最后,我可以这样做:

bindInterceptor(Matchers.any(), Matchers.annotatedWith(Retry.class), new MsRetryInterceptor());

检查任何类和任何带有重试注释的方法。

重试的示例方法如下:

    @Override
    @Retry
    public List<MyObject> getSomething(int count, String property) {
        try(Connection con = datasource.getConnection();
                Context c = metrics.timer(TIMER_NAME).time()) 
        {
            // do some work
            // return some stuff
        } catch (SQLException e) {
            // catches exception and throws it out
            throw new RuntimeException("Some more specific thing",e);
        }
    }

我需要解包的原因是旧的遗留案例,比如这个DAO impl,已经捕获了它们自己的异常。

还请注意方法(a get)如何从我的数据源池中调用两次时检索新连接,以及如何在其中不进行任何修改(因此:重试是安全的)

我希望这对你有帮助。

你可以通过实现ApplicationListeners或RequestFilters或类似的方法来做类似的事情,但是我认为这是一种更通用的方法,可以在任何方法上重试任何类型的失败。

还要注意,向导只能在构造类(注入带注释的构造函数等)时拦截方法

希望有帮助,

阿图尔

我在Dropwizard存储库中找到了一个帮助我的pull request。它基本上允许在资源方法之外的其他方法上使用@UnitOfWork注释。

使用这个,我能够通过将@UnitOfWork注释从资源方法移动到负责导致StaleStateException的数据操作的DAO方法,将会话打开/关闭和事务创建/提交生命周期从资源方法分离出来。然后我就可以围绕这个DAO方法构建一个重试机制。

榜样的解释:

// class MyEntityDAO extends AbstractDAO<MyEntity>
@UnitOfWork
void tryManipulateData() {
    // Due to optimistic locking, this operations cause a StaleStateException when
    // committed "by the @UnitOfWork annotation" after returning from this method.
}
// Retry mechanism, implemented wheresoever.
void manipulateData() {
    while (true) {
        try {
            retryManipulateData();
        } catch (StaleStateException e) {
            continue; // Retry.
        }
        return;
    }
}
// class MyEntityResource
@POST
// ...
// @UnitOfWork can also be used here if nested transactions are desired.
public Response someResourceMethod() {
    // Call manipulateData() somehow.
}

当然,也可以将@UnitOfWork注释附加在使用DAO的服务类中的方法上,而不是直接将其应用于DAO方法。在任何使用注释的类中,请记住使用pull请求中描述的UnitOfWorkAwareProxyFactory创建实例的代理。

最新更新