Подписавшись на несколько источников в RxJava


Я использовал RxJava соблюдать несколько источников, а затем выполнить действие в зависимости от значений наблюдаемых. Я хочу знать, если это может быть дополнительно упрощена.

Цель: Полную реализацию RxJava.

Что у меня есть:

  1. Номер (ОРМ базы данных) для хранения локальных баз данных с помощью RxJava.
  2. Веб-сервиса API с модифицированной с помощью RxJava.

Условия:

  1. Запуск веб-сервиса звоните в регулярном интервале.
  2. В WebService вызова выполнить в зависимости от результата с данными, полученными из базы данных.

Код:

DAO-класс

@Query("select * from Login")
Flowable<List<Login>> getLoginRx();

Класс API

@Multipart
@POST("..")
Flowable<NotificationResponse> notification();

Java-класс

private Subscriber<List<Login>> loginSubscriber;
private Subscriber<NotificationResponse> notifSubscriber;
private Observer<Long> timerObserver;
private RoomDb roomDb;

@Override
protected void onCreate(Bundle b){
    roomDb = RoomDb.getInstance(this);
    notifSubscriber = new Subscriber<NotificationResponse>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(1);
            notifSubscription = s;
        }

        @Override
        public void onNext(NotificationResponse nr) {
            if (nr.getSuccess()) {
                if (NetworkClient.STATUS_OK.equals(nr.getData().getStatus())) {
                    updateCounts(nr.getData());
                }
            }
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    };

    timerObserver = new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            networkCallRepeat = d;
        }

        @Override
        public void onNext(Long aLong) {
            NetworkClient.getInstance(MainActivity.this).getNetworkAPI().notification(NetworkClient.stringPart(NetworkAPI.STAFF_TOKEN))
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io())
                    .subscribe(notifSubscriber);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };
    loginSubscriber = new Subscriber<List<Login>>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(1);
            loginSubscription = s;
        }

        @Override
        public void onNext(List<Login> logins) {
            if (logins != null && logins.size() > 0) {
                updateLoginState(logins.get(0));
                Observable
                        .interval(0L, 1L, TimeUnit.MINUTES)
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(timerObserver);
            } else {
                if (networkCallRepeat != null && !networkCallRepeat.isDisposed()) {
                    networkCallRepeat.dispose();
                }
                updateLoginState(null);
            }
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    };
}

@Override
protected void onResume() {
    super.onResume();
    roomDb.loginDao().getLoginRx()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(loginSubscriber);
}


Комментарии