敞开成长之旅!这是我参加「日新计划 12 月更文应战」的第30天,点击检查活动概况

RX界说

  • Rx是一个函数库,让开发者能够利用可观察序列和LINQ风格查询操作符来编写异步和根据事情的程序
  • Rx是微软.NET的一个呼应式扩展。Rx凭借可观测的序列供给一种简略的办法来创立异步的,根据事情驱动的程序。
  • Rx便是一种呼应式编程,来创立根据事情的异步程序

RXJava简介

RxJava便是一种用Java语言完结的呼应式编程,来创立根据事情的异步程序

官方界说: 一个在 Java VM 上运用可观测的序列来组成异步的、根据事情的程序的库

RXJava原理

  • RxJava: 根据 一种扩展的观察者模式
  • RxJava的扩展观察者模式中有4个角色:

图片

简略运用办法

代码示例:

    Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override
      public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onComplete();
       }
     })
         .subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(new Consumer<Integer>() {
          @Override
          public void accept(Integer integer) throws Exception {
            Log.d(TAG, "accept integer = " + integer);
           }
         });

解释说明:

  • 创立被观察者,发送事情,即emitter.onNext(1),发送了数字1;
  • 指定被观察者的执行线程,Schedulers.io();
  • 观察者处理接收到的数据;
  • 切换线程到主线程,观察者将在主线程处理这些数据。

实战项目解析

添加依赖

compile 'io.reactivex.rxjava2:rxjava:2.0.0-RC5'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'

基理

ObservableObserver经过subscribe()办法完结订阅联系Rxjava中是**主动发送事情**的, 一旦**订阅**就开端**发送**


根本运用三个过程

Android架构师RX响应式编程——Rxjava实战项目教学

  • 第一步,创立ObservableEmitter发射器的意思, 在subscribe办法中不断调用发射器的办法; 总共有onNext()、onComplete()、onError()三个办法; 用法参考Rxjava1:

Android架构师RX响应式编程——Rxjava实战项目教学

Android架构师RX响应式编程——Rxjava实战项目教学

  • 第二步,创立Observer, 总共有onNext()、onComplete()、onError()、onSubscribe()四个办法; 其间, **onNext()、onComplete()、onError()**三个办法别离对应着第一步中**Observable****onNext()、onComplete()、onError()**三个办法, 只需**Observable**宣布(调用)对应的办法, **Observer**对应的办法就会被调用;
  • onError()onComplete是互斥的,一次只能调用一个;
  • 第三步,订阅, observable.subscribe(observer);

下面开端实战内容

  • 在app/build.gradle中添加依赖:
dependencies {
  implementation fileTree(dir: 'libs', include: ['*.jar'])
​
  implementation 'com.android.support:appcompat-v7:28.0.0'
  implementation 'io.reactivex.rxjava2:rxjava:2.0.0-RC5'
  implementation 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'
  implementation 'com.squareup.retrofit2:retrofit:2.1.0'
  implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
//   compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
​
//   compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
}

activity_main.xml:

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
  xmlns:android="http://schemas.android.com/apk/res/android"
  xmlns:tools="http://schemas.android.com/tools"
  android:id="@+id/activity_main"
  android:layout_width="match_parent"
  android:layout_height="match_parent"
  android:paddingBottom="@dimen/activity_vertical_margin"
  android:paddingLeft="@dimen/activity_horizontal_margin"
  android:paddingRight="@dimen/activity_horizontal_margin"
  android:paddingTop="@dimen/activity_vertical_margin"
  android:orientation="vertical"
  tools:context="com.cniao5.cniao5rxjava2demo.MainActivity">
​
​
  <Button
        android:layout_height="wrap_content"
        android:layout_width="match_parent"
        android:text="test"
        android:onClick="click"/>
​
  <TextView
    android:id="@+id/text"
    android:layout_width="wrap_content"
    android:layout_height="wrap_content"
    android:layout_marginTop="10dp"
    />
</LinearLayout>

MainActivity,java: 第一步,经过create()创立Observable(模拟目标:程序员), 经过onNext()发送数据:

  public Observable<String> getObservable() {
    return Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("奏笛");
        e.onNext("泡吧");
        e.onComplete();
       }
     });
   }

第二步,创立Observer(模拟目标:程序员女朋友), 创立的办法是直接new

  public Observer<String> getObserver(){
    return  new Observer<String>() {
​
      @Override
      public void onSubscribe(Disposable d) {
        Log.d("MainActivity","onSubscribe");
       }
​
      @Override
      public void onNext(String value) {
        Log.d("MainActivity","onNext");
       }
​
      @Override
      public void onError(Throwable e) {
        Log.d("MainActivity","onError");
       }
​
      @Override
      public void onComplete() {
        Log.d("MainActivity","onComplete");
       }
     };
   }

第三步,在click()中, getObservableobserver 完结订阅:

  public void click(View view) {
​
    Observable<String> observale = getObservable();
    Observer<String> observer = getObserver();
    observale.subscribe(observer);
   }

Android架构师RX响应式编程——Rxjava实战项目教学

以上便完结了一个最根本的运用; 运转作用:

Android架构师RX响应式编程——Rxjava实战项目教学

点击按钮后打印日志:

Android架构师RX响应式编程——Rxjava实战项目教学

由此能够应证, Rxjava中是主动发送事情的, 一旦Observable 被 observer 订阅了(observale.subscribe(observer);) Observable就开端发送; Observable经过自身ObservableOnSubscribe中的subscribe()中的 onNext()等办法主动宣布信息, observer接收到信息后执行对应的onNext()等办法; 在订阅之后,Observer中, onSubscribe()每次接收数据之前必须要调用的办法; onNext()则是对应Observable调用的次数去调用相应的次数; onComplete()onError()对应完结/反常状态时分调用;

 @Override
      public void onSubscribe(Disposable d) {
        Log.d("MainActivity","onSubscribe");
       }

接下来重视一下Observer结构办法中的onSubscribe()办法;

其间留意参数Disposable d

Disposable一次性的意思; 其主要有以下两个办法:

Android架构师RX响应式编程——Rxjava实战项目教学

用法示例 (用于监听Observable发送的数据, 假如Observable发送的数据等于某个值, 就隔绝订阅联系):

Android架构师RX响应式编程——Rxjava实战项目教学

更改Observable代码:

    public Observable<String> getObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("奏笛");
                e.onNext("泡吧");
                e.onNext("酗酒");
                e.onComplete();
            }
        });
    }

运转示例,点击按钮:

Android架构师RX响应式编程——Rxjava实战项目教学

能够发现现已没有onComplete()办法的打印信息了, 因为在onNext()中途现已隔绝订阅联系了;

另外还有省略observer的简练写法

    public void click(View view) {
        Observable<String> observale = getObservable();
        observale.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                if(s.equals("奏笛")){
                    Log.d("MainActivity","收到奏笛!!这儿类似于Observer的onNext()");
                }
                if(s.equals("吟诗")){
                    Log.d("MainActivity","收到吟诗!!这儿类似于Observer的onNext()");
                }
                if(s.equals("酗酒")){
                    Log.d("MainActivity","收到酗酒!!这儿类似于Observer的onNext()");
                }
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d("MainActivity","这儿类似于Observer的onError()");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d("MainActivity","这儿类似于Observer的onComplete()");
            }
        });
    }
    public Observable<String> getObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("奏笛");
                e.onNext("吟诗");
                e.onNext("酗酒");
                e.onComplete();
            }
        });
    }

运转示例,点击按钮:

Android架构师RX响应式编程——Rxjava实战项目教学

更改上面代码,

Observable.just()创立Observable目标,

作用也是相同的; 运转示例,点击按钮,打印日志同上:

    public Observable<String> getObservable() {
        Observable observable = Observable.just("奏笛","吟诗","酗酒");
        return  observable;
    }

Android架构师RX响应式编程——Rxjava实战项目教学

或者显示在TextView上:

Android架构师RX响应式编程——Rxjava实战项目教学

Android架构师RX响应式编程——Rxjava实战项目教学

Observable.fromArray()创立Observable目标,

Observable observable = Observable.fromArray("奏笛","泡吧","吟诗"); 其实用法跟**just()**是相同的; **just()**源码如下, 里边最终也是调用**fromArray()**完结的:

Android架构师RX响应式编程——Rxjava实战项目教学

Observable.fromCallable()创立Observable目标

特色:只能回来一个数据;

Android架构师RX响应式编程——Rxjava实战项目教学

Android架构师RX响应式编程——Rxjava实战项目教学

本节笔记Activity全文(留意io.reactivex包的引用):

package com.cniao5.cniao5rxjava2demo;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import java.util.concurrent.Callable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
public class MainActivity extends AppCompatActivity {
    private TextView textView;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        textView = (TextView) findViewById(R.id.text);
    }
    public void click(View view) {
        Observable<String> observale = getObservable();
//        Observer<String> observer = getObserver();
//
//        observale.subscribe(observer);
//        observale.subscribe(new Consumer<String>() {
//            @Override
//            public void accept(String s) throws Exception {
//                Log.d("MainActivity","accept="+s);
//
//                textView.append(s);
//                textView.append("//n");
//            }
//        });
        observale.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                if(s.equals("奏笛")){
                    Log.d("MainActivity","收到奏笛!!这儿类似于Observer的onNext()");
                }
                if(s.equals("吟诗")){
                    Log.d("MainActivity","收到吟诗!!这儿类似于Observer的onNext()");
                }
                if(s.equals("酗酒")){
                    Log.d("MainActivity","收到酗酒!!这儿类似于Observer的onNext()");
                }
                textView.append(s);
                textView.append("\n");
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d("MainActivity","这儿类似于Observer的onError()");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d("MainActivity","这儿类似于Observer的onComplete()");
            }
        });
    }
    public Observable<String> getObservable() {
//        Observable observable = Observable.just("奏笛","吟诗","酗酒");
//        Observable observable = Observable.fromArray("奏笛","泡吧","吟诗");
       return  Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "奏笛";
            }
        });
//        return  observable;
//        return Observable.create(new ObservableOnSubscribe<String>() {
//            @Override
//            public void subscribe(ObservableEmitter<String> e) throws Exception {
//                e.onNext("奏笛");
//                e.onNext("吟诗");
//                e.onNext("酗酒");
//                e.onComplete();
////                e.onError(new);
//
//            }
//        });
    }
    public Observer<String>  getObserver(){
        return   new Observer<String>() {
              Disposable dd =null;//界说一个变量局部变量
            @Override
            public void onSubscribe(Disposable d) {
                dd = d;//把这段订阅联系的Disposable变量拿下来
                Log.d("MainActivity","onSubscribe");
            }
            @Override
            public void onNext(String value) {
                Log.d("MainActivity","onNext");
                if(value.equals("酗酒")){
                    dd.dispose();//假如发送的数据等于某个值,就隔绝联系
                    Log.d("MainActivity","你的小可爱现已不想理你了!!!");
                }
            }
            @Override
            public void onError(Throwable e) {
                Log.d("MainActivity","onError");
            }
            @Override
            public void onComplete() {
                Log.d("MainActivity","onComplete");
            }
        };
    }
}