如何为MongoChangeStream使用SchedulerLock ?有类似物吗?



我有一个spring mongodb应用程序,在那里我使用MongoChangeStream进行一些业务逻辑。但是如果有多个实例,业务逻辑就会被复制为多个实例。所以我决定尝试使用@SchedulerLocknet.javacrumbs.shedlock。但是没有帮助,集合shedLock也没有创建。如果我没有添加@SchedulerLock,应用程序的工作原理是一样的。

@Component
class BookChangeStreamListener(
mongoTemplate: MongoTemplate
) : MongoChangeStreamListener<BookDocument>(mongoTemplate) {
override fun createMessageListener() =
createMessageListener(
object : CallbackChangeStream<BookDocument> {
@Scheduled(fixedDelayString = "1s")
@SchedulerLock(
name = "BookChangeStreamListenerInsert",
lockAtMostFor = "5m",
lockAtLeastFor = "5m"
)
override fun insert(raw: ChangeStreamDocument<Document>, body: BookDocument?) {
body?.let {
// business logic
}
}
}
}

配置:

import com.mongodb.client.MongoClient
import net.javacrumbs.shedlock.core.LockProvider
import net.javacrumbs.shedlock.provider.mongo.MongoLockProvider
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock
import org.springframework.boot.autoconfigure.mongo.MongoProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableScheduling
@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "5m")
class LockConfig {
@Bean
fun lockProvider(mongo: MongoClient, properties: MongoProperties): LockProvider {
return MongoLockProvider(mongo.getDatabase(properties.database))
}
}

build.gradle.kts:

implementation("net.javacrumbs.shedlock:shedlock-provider-mongo:4.34.0")
implementation("net.javacrumbs.shedlock:shedlock-spring:4.34.0")

@SchedulerLock不能用于匿名类方法。适合我的最终解决方案:

@Component
class BoookChangeStreamListener(
mongoTemplate: MongoTemplate,
@Qualifier("BookCallbackChangeStream")
val callbackChangeStream: CallbackChangeStream<BookDocument>
) : MongoChangeStreamListener<BookDocument>(mongoTemplate) {
override fun createMessageListener() = createMessageListener(callbackChangeStream)
}
@Component("bookCallbackChangeStream")
class BookCallbackChangeStreamImpl(
val eventPublisher: ApplicationEventPublisher,
val mapper: BookToDomainMapper
) : CallbackChangeStream<BookDocument> {
@SchedulerLock(
name = "BookChangeStreamListenerInsert",
lockAtMostFor = "10ms",
lockAtLeastFor = "10ms"
)
override fun insert(raw: ChangeStreamDocument<Document>, body: BookDocument?) {
// business logic
}
}

最新更新