如何避免Kotlin协同程序的并发问题



我将在android应用程序中实现聊天功能。为了做到这一点,我每五秒钟通过协同程序流从服务器获取一次聊天消息。问题是,当我想发送消息时,有时服务器会收到两个并发请求,并返回一个错误。我应该如何确保这些请求在我的聊天存储库中按顺序运行?这是我的聊天库:

class ChatRepositoryImpl @Inject constructor(
private val api: ApolloApi,
private val checkTokenIsSetDataStore: CheckTokenIsSetDataStore
) : ChatRepository {
override fun chatMessages(
lastIndex: Int,
limit: Int,
offset: Int,
channelId: Int,
): Flow<Resource<ChatMessages>> = flow {
var token = ""
checkTokenIsSetDataStore.get.first {
token = it
true
}
while (true) {
val response = ChatMessagesQuery(
lastIndex = Input.fromNullable(lastIndex),
limit = Input.fromNullable(limit),
offset = Input.fromNullable(offset),
channelId
).let {
api.getApolloClient(token)
.query(it)
.await()
}
response.data?.let {
emit(
Resource.Success<ChatMessages>(
it.chatMessages
)
)
}
if (response.data == null)
emit(Resource.Error<ChatMessages>(message = response.errors?.get(0)?.message))
delay(5000L)
}
}.flowOn(Dispatchers.IO)
override fun chatSendText(channelId: Int, text: String): Flow<Resource<ChatSendText>> = flow {
var token = ""
checkTokenIsSetDataStore.get.first {
token = it
true
}
val response = ChatSendTextMutation(
channelId = channelId,
text = text
).let {
api.getApolloClient(token)
.mutate(it)
.await()
}
response.data?.let {
return@flow emit(
Resource.Success<ChatSendText>(
it.chatSendText
)
)
}
return@flow emit(Resource.Error<ChatSendText>(message = response.errors?.get(0)?.message))
}.flowOn(Dispatchers.IO)
}

限制并发的一种方法是使用诸如Mutex或Semaphore之类的utils。我们可以很容易地解决您的互斥问题:

class ChatRepositoryImpl ... {
private val apolloMutex = Mutex()
override fun chatMessages(...) {
...
apolloMutex.withLock {
api.getApolloClient(token)
.query(it)
.await()
}
...
}
override fun chatSendText(...) {
...
apolloMutex.withLock {
api.getApolloClient(token)
.mutate(it)
.await()
}
...
}

然而,这个问题不应该真正在客户端解决,而应该在服务器端解决。您尝试的解决方案并不能完全保护您免受并发请求的影响。如果由于某些原因,应用程序的两个实例具有相同的令牌,或者如果用户试图操作您的应用程序,它仍然可以发送并发请求。

如果您不能很容易地正确解决问题,您可以在服务器端应用与您打算在客户端应用的修复程序相同的修复程序。只需按顺序处理请求或部分请求。它更防错,也更具性能,因为这样只需要按顺序完成整个请求时间的一部分。

最新更新