RxJava与SQLite动态更新聊天界面



我有一个聊天应用程序,它使用socket.io与nodejs服务器(由我编写(进行通信。多个人可以一对一地聊天。该应用程序的用户界面与WhatsApp类似。存在不同的Chat Threads,其中Users可以交换Chat Messages。这些需要存储在SQLite数据库中。

为了确保应用程序即使在关闭时也能正常工作,我在服务中启动了socket.io连接。

当收到新消息时,订单即为

  • 服务解析响应并将其写入SQLite数据库
  • 如果应用程序关闭,则会创建一个通知并向用户显示
  • 如果应用程序已打开,并且用户在发送新消息的同一聊天线程上,则必须更新recyclerView
  • 基本上感觉和whatsapp一样,只是我使用的是socket.io而不是XMPP

我想使用RxJavaSQLite上执行CRUD操作。

SQLiteHelper.Class

public List<ChatMessage> getChatMessagesForThread(String threadName){
    List<ChatMessage>list = new ArrayList<>();
    String query = "Some Query Here";
    SQLiteDatabase db = this.getReadableDatabase();
    Cursor c = db.rawQuery(query, null);
    if (c.moveToFirst()){
        do {
            ChatMessage message = new ChatMessage();
            message.setMessage(c.getString((c.getColumnIndex(CHAT_MESSAGES_KEY_MESSAGE))));
            message.setChatThread(c.getInt(c.getColumnIndex(CHAT_MESSAGES_KEY_CHAT_THREAD)));
            message.setUser(c.getString(c.getColumnIndex(CHAT_MESSAGES_KEY_USER)));
            list.add(message);
        } while (c.moveToNext());
    }else {
        //No such thread exists. Returns null
    }
    c.close();
    return list;
}

Rx Java相关功能

    public static <T> Observable<T> makeObservable(final Callable<T> func) {
        return Observable.create(
                new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        try {
                            T observed = func.call();
                            if (observed != null) { // to make defaultIfEmpty work
                                subscriber.onNext(observed);
                            }
                            //TODO: DECIDE IF THIS STAYS OR NOT !!!
                            //subscriber.onCompleted();
                        } catch (Exception ex) {
                            subscriber.onError(ex);
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }
@SuppressWarnings("unchecked")
private Callable<List<ChatMessage>> getData(String threadName) {
    return new Callable() {
        public List<ChatMessage> call() {
            return getChatMessagesForThread(String threadName);
        }
    };
}

    public Observable<List<ChatMessage>> getDataObservable(String threadName) {
        return makeObservable(getData(String threadName));
    }

ChatActivity.class

    databaseHelper.getDataObservable(String currentThreadName)
            .subscribe(new Action1<List<ChatMessage>>() {
                @Override
                public void call(List<ChatMessage> chatMessages) {
                    for (ChatMessage message : chatMessages){
                        //Add to list and update recyclerView
                        mAdapter.notifyDatasetChanged();
                    }
                }
            });

问题是,上面的Rxjava调用只执行一次,而不是每次执行新的写操作时都执行。

这个函数在SQLiteHelper类中,是从我的服务调用的

public long writeMessageToDB(ChatMessage message){
    ContentValues values = new ContentValues();
    values.put(CHAT_MESSAGES_KEY_MESSAGE, message.getMessage());
    values.put(CHAT_MESSAGES_KEY_CHAT_THREAD, message.getChatThread());
    values.put(CHAT_MESSAGES_KEY_USER, message.getUser());
    SQLiteDatabase db = this.getWritableDatabase();
    return db.insert(CHAT_MESSAGES_TABLE_NAME, null, values);
}

我读过Square的SQL Delite,但想了解如何使上述功能发挥作用。请帮忙!

是的,如果您希望每次都创建observable,则应该使用.dedefe((运算符。

Observable.defer(()-> Observable.just("some arg"))
.subscribeOn(Schedulers.io))
.map(t -> getDataFromDb())
.map(v ->  {
     // handle your result
})
.subscribe(..);

相关内容

  • 没有找到相关文章

最新更新