我有一个聊天应用程序,它使用socket.io与nodejs服务器(由我编写(进行通信。多个人可以一对一地聊天。该应用程序的用户界面与WhatsApp类似。存在不同的Chat Threads
,其中Users
可以交换Chat Messages
。这些需要存储在SQLite数据库中。
为了确保应用程序即使在关闭时也能正常工作,我在服务中启动了socket.io连接。
当收到新消息时,订单即为
- 服务解析响应并将其写入SQLite数据库
- 如果应用程序关闭,则会创建一个通知并向用户显示
- 如果应用程序已打开,并且用户在发送新消息的同一聊天线程上,则必须更新
recyclerView
- 基本上感觉和whatsapp一样,只是我使用的是socket.io而不是XMPP
我想使用RxJava
在SQLite
上执行CRUD操作。
SQLiteHelper.Class
public List<ChatMessage> getChatMessagesForThread(String threadName){
List<ChatMessage>list = new ArrayList<>();
String query = "Some Query Here";
SQLiteDatabase db = this.getReadableDatabase();
Cursor c = db.rawQuery(query, null);
if (c.moveToFirst()){
do {
ChatMessage message = new ChatMessage();
message.setMessage(c.getString((c.getColumnIndex(CHAT_MESSAGES_KEY_MESSAGE))));
message.setChatThread(c.getInt(c.getColumnIndex(CHAT_MESSAGES_KEY_CHAT_THREAD)));
message.setUser(c.getString(c.getColumnIndex(CHAT_MESSAGES_KEY_USER)));
list.add(message);
} while (c.moveToNext());
}else {
//No such thread exists. Returns null
}
c.close();
return list;
}
Rx Java相关功能
public static <T> Observable<T> makeObservable(final Callable<T> func) {
return Observable.create(
new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
T observed = func.call();
if (observed != null) { // to make defaultIfEmpty work
subscriber.onNext(observed);
}
//TODO: DECIDE IF THIS STAYS OR NOT !!!
//subscriber.onCompleted();
} catch (Exception ex) {
subscriber.onError(ex);
}
}
}).subscribeOn(Schedulers.io());
}
@SuppressWarnings("unchecked")
private Callable<List<ChatMessage>> getData(String threadName) {
return new Callable() {
public List<ChatMessage> call() {
return getChatMessagesForThread(String threadName);
}
};
}
public Observable<List<ChatMessage>> getDataObservable(String threadName) {
return makeObservable(getData(String threadName));
}
在ChatActivity.class
中
databaseHelper.getDataObservable(String currentThreadName)
.subscribe(new Action1<List<ChatMessage>>() {
@Override
public void call(List<ChatMessage> chatMessages) {
for (ChatMessage message : chatMessages){
//Add to list and update recyclerView
mAdapter.notifyDatasetChanged();
}
}
});
问题是,上面的Rxjava调用只执行一次,而不是每次执行新的写操作时都执行。
这个函数在SQLiteHelper类中,是从我的服务调用的
public long writeMessageToDB(ChatMessage message){
ContentValues values = new ContentValues();
values.put(CHAT_MESSAGES_KEY_MESSAGE, message.getMessage());
values.put(CHAT_MESSAGES_KEY_CHAT_THREAD, message.getChatThread());
values.put(CHAT_MESSAGES_KEY_USER, message.getUser());
SQLiteDatabase db = this.getWritableDatabase();
return db.insert(CHAT_MESSAGES_TABLE_NAME, null, values);
}
我读过Square的SQL Delite,但想了解如何使上述功能发挥作用。请帮忙!
是的,如果您希望每次都创建observable,则应该使用.dedefe((运算符。
Observable.defer(()-> Observable.just("some arg"))
.subscribeOn(Schedulers.io))
.map(t -> getDataFromDb())
.map(v -> {
// handle your result
})
.subscribe(..);