如何使用Retrofit+RxJava框架的简单封装使用
掌握之前需要相识以下知识相关知识
retrofit源码剖析
OkHttp原明白析
RXJava源码详解
上述文章只是讲了一下基本的原理,那么在实际应用里的如何和RXJAVA共同使用呢
一、怎么搭配Rxjava使用
我们知道,在使用retrofit的时候可以设置网络请求、日志、线程的适配器,其中有一个方法addCallAdapterFactory,这个方法就是为我们添加rxjava线程调治的适配器。
- //创建Retrofit对象 mRetrofit = new Retrofit .Builder() .client(mOkHttpClientBuilder.build()) .baseUrl(host) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .addConverterFactory(GsonConverterFactory.create(gson)) .build();
复制代码 如果我们不举行设置那么系统会为我们提供一个默认的适配器defaultCallAdapterFactory
- */ public Retrofit build() { ........... // Make a defensive copy of the adapters and add the default Call adapter. List callAdapterFactories = new ArrayList(this.callAdapterFactories); callAdapterFactories.add(platform.defaultCallAdapterFactory(callbackExecutor)); ........... return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories), unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly); }
复制代码 在这里层层走下去,会给我们返回一个Response的对象,在实际业务里,我们需要利用rxjava来举行一些请求的封装,实现整体代码架构更加简便
- @Override public void enqueue(final Callback callback) { checkNotNull(callback, "callback == null"); delegate.enqueue(new Callback() { @Override public void onResponse(Call call, final Response response) { callbackExecutor.execute(new Runnable() { @Override public void run() { if (delegate.isCanceled()) { // Emulate OkHttp's behavior of throwing/delivering an IOException on cancellation. callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled")); } else { callback.onResponse(ExecutorCallbackCall.this, response); } } }); } @Override public void onFailure(Call call, final Throwable t) { callbackExecutor.execute(new Runnable() { @Override public void run() { callback.onFailure(ExecutorCallbackCall.this, t); } }); } }); }
复制代码 所以引入了rxjava的封装,幸亏retrofit也是支持这种添加,我们加入了RxJava2CallAdapterFactory,最后封装返回了一个Observable的观察者对象。
- @Override public Object adapt(Call call) { Observable responseObservable = isAsync ? new CallEnqueueObservable(call) : new CallExecuteObservable(call); Observable observable; if (isResult) { observable = new ResultObservable(responseObservable); } else if (isBody) { observable = new BodyObservable(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return observable; }
复制代码 这就是我们使用rxjava的原理,为什么可以使用以及使用后返回了什么
当前我们有一个api
- public interface ApiService {//获取用户信息 @POST(RetrofitManager.ACCOUNT) Observable getTextUserInfo(@Body UserInfoRequest userInfoRequest);}
复制代码 第一步 :调用得到observable对象
- Observable observable = RetrofitManager.getInstance(activity).create(ApiService.class). getTextUserInfo(updateUserInfoRequest).map((new HttpResultFunc()));
复制代码
RetrofitManager封装对retrofit的初始化,包罗对rxjava的线程添加,
create就是调用retrofit的create,粘贴部门代码
- public class RetrofitManager { /** * 单例模式,创建RetrofitManager对象 */ public static RetrofitManager getInstance(Context activity) { if (instance == null) { synchronized (RetrofitManager.class) { instance = new RetrofitManager(activity); } }// instance.setRsa(); return instance; } private RetrofitManager(Context context) { File cacheFile = new File(context.getCacheDir(), "cache"); Cache cache = new Cache(cacheFile, 1024 * 1024 * 50); //50Mb mOkHttpClientBuilder = new OkHttpClient.Builder(); mOkHttpClientBuilder.connectTimeout(CONNECT_OUTTIME, TimeUnit.MILLISECONDS); mOkHttpClientBuilder.readTimeout(CONNECT_OUTTIME, TimeUnit.MILLISECONDS); mOkHttpClientBuilder.writeTimeout(CONNECT_OUTTIME, TimeUnit.MILLISECONDS); mOkHttpClientBuilder.retryOnConnectionFailure(true); //Logger拦截器,设置打印品级,可以打印请求日志 if (LogUtils.LOG_FLAG) { mHttpLoggingInterceptor = new HttpLoggingInterceptor(); mHttpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY); mOkHttpClientBuilder.addInterceptor(mHttpLoggingInterceptor); } //设置公共拦截器 mHttpCommonInterceptorBuilder = new HttpCommonInterceptor.Builder(); try { mHttpCommonInterceptorBuilder .addHeaderParams("xd-version-name", OSUtils.getVersionCodeName(context)) .addHeaderParams("xd-version-code", OSUtils.getVersionCode(context) + "") .addHeaderParams("xd-agent", "Android"); } catch (Exception e) { e.printStackTrace(); } setAccessToken(); mOkHttpClientBuilder.addInterceptor(mHttpCommonInterceptorBuilder.build()); mOkHttpClientBuilder.addInterceptor(new Interceptor() { @Override public Response intercept(Chain chain) throws IOException { // 以拦截到的请求为底子创建一个新的请求对象,然后插入Header Request request = null; try { request = chain.request().newBuilder() .addHeader("antiKey", RsaKeyUtil.encryptByPublicKey(CLIENT_SECRET + ":" + System.currentTimeMillis(), PUBLIC_KEY)) .addHeader("xd-request-id", RsaKeyUtil.createRandomCharData(6)) .build(); if (BuildConfig.DEBUG) { Log.e("TAG", "请求头==" + request.headers().toString()); } } catch (Exception e) { e.printStackTrace(); } // 开始请求 return chain.proceed(request); } }); //设置缓存拦截器 mHttpCacheInterceptor = new HttpCacheInterceptor(); gson = new GsonBuilder().registerTypeAdapterFactory(new NullStringToEmptyAdapterFactory()).serializeNulls().create(); mOkHttpClientBuilder.addNetworkInterceptor(mHttpCacheInterceptor); mOkHttpClientBuilder.cache(cache); //创建Retrofit对象 mRetrofit = new Retrofit .Builder() .client(mOkHttpClientBuilder.build()) .baseUrl(host) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .addConverterFactory(GsonConverterFactory.create(gson)) .build(); } private void setRsa() { try { mHttpCommonInterceptorBuilder.addHeaderParams("antiKey", RsaKeyUtil.encryptByPublicKey(CLIENT_SECRET + ":" + System.currentTimeMillis(), PUBLIC_KEY)); } catch (Exception e) { e.printStackTrace(); } } public void setAccessToken() { String token = Paper.book().read("access_token"); if (!TextUtils.isEmpty(token)) { mHttpCommonInterceptorBuilder.addHeaderParams("authorization", "bearer " + token); } else { mHttpCommonInterceptorBuilder.addHeaderParams("authorization", ""); } } public T create(Class service) { if (api == null) { T t = mRetrofit.create(service); api = t; } return (T) api; } /** * rx订阅 rx1.0的 */// public void toSubscribe(Observable o, Subscriber s) {// o.subscribeOn(Schedulers.io())// .unsubscribeOn(Schedulers.io())// .observeOn(AndroidSchedulers.mainThread())// .subscribe(s);// } /** * rx订阅 rx2.0的 */ public void toSubscribe(Observable o, DisposableObserver disposableObserver) { o.subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(disposableObserver); } //如果需要存在依赖关系请求使用map大概flatmap}
复制代码 create我们知道是调用了动态署理,serviceMethod.adapt就是调用了RxJava2CallAdapter的adapt,返回了oservable的对象
- public T create(final Class service) { Utils.validateServiceInterface(service); if (validateEagerly) { eagerlyValidateMethods(service); } return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class[] { service }, new InvocationHandler() { private final Platform platform = Platform.get(); @Override public Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable { // If the method is a method from Object then defer to normal invocation. if (method.getDeclaringClass() == Object.class) { return method.invoke(this, args); } if (platform.isDefaultMethod(method)) { return platform.invokeDefaultMethod(method, service, proxy, args); } ServiceMethod serviceMethod = (ServiceMethod) loadServiceMethod(method); OkHttpCall okHttpCall = new OkHttpCall(serviceMethod, args); return serviceMethod.adapt(okHttpCall); } }); }
复制代码 通过map内里的拦截,我们又可以对返回的参数举行定制化的操纵,使其可以或许适配差别类型的参数返回,如何调治,请参考rxjava的内容,使用map,在apply内里举行操纵.
至此,得到了应有的对象
二:使用rxjava开启线程调治
创建被观察者线程
- DisposableObserver disposable = new ProgressSubscriber(new SubscriberOnResponseListenter() { @Override public void next(Bean bean) { baseView.getNetWorkSuccess(Bean); } @Override public void error(String e) { baseView.showError(e); } }, activity, false); RetrofitManager.getInstance(activity).toSubscribe(observable, disposable);
复制代码- /** * rx订阅 rx2.0的 */ public void toSubscribe(Observable o, DisposableObserver disposableObserver) { o.subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(disposableObserver); }
复制代码
实在到这里基本就结束了,我们拿到了网络请求参数的回调,在这种结构里,将网络请求,参数设置,方法调用完全分开,适用于mvp mvc ,mvvc等各种解耦性质的架构设计.
最后,记得使用完要解绑订阅者
- public class BasePresenter { private CompositeDisposable mCompositeDisposable; private WeakReference mViewRef; public void attachModelView(V pView) { mViewRef = new WeakReference(pView); } public V getView() { if (isAttach()) { return mViewRef.get(); } else { return null; } } private boolean isAttach() { return null != mViewRef && null != mViewRef.get(); } //增加订阅者 protected void addSubscrebe(Disposable disposable) { if (null == mCompositeDisposable) { mCompositeDisposable = new CompositeDisposable(); } mCompositeDisposable.add(disposable); } //解绑订阅者 public void unSubscribe() { if (null != mCompositeDisposable) { mCompositeDisposable.clear(); } }}
复制代码
三、扩展:Folwable大概为你的网络请求添加生命周期绑定
什么是flowable,什么又是背压
这篇文章详解了flowable的先容
Android Flowable
,在这里主要相识一下背压的几种模式
在项目中,处理处罚数据过大大概发送数据频仍的地方可以使用flowable代替rxjava的observal
Compose
Android 使用RxLifecycle办理RxJava内存泄漏
在被观察者发送事件之后,可以使用lifecycle自动管理生命周期,防止内存泄露
来源:https://blog.csdn.net/czZ__czZ/article/details/111882967
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |