您的位置:首页 > 财经 > 产业 > 双十一运用了哪些网络营销方式_上海美术设计公司_论坛推广网站_刷赞业务推广网站

双十一运用了哪些网络营销方式_上海美术设计公司_论坛推广网站_刷赞业务推广网站

2025/4/10 23:36:57 来源:https://blog.csdn.net/sixpp/article/details/146906964  浏览:    关键词:双十一运用了哪些网络营销方式_上海美术设计公司_论坛推广网站_刷赞业务推广网站
双十一运用了哪些网络营销方式_上海美术设计公司_论坛推广网站_刷赞业务推广网站

在这里插入图片描述

文章目录

    • 4.1 RxJava示例
      • 4.1.1 创建Observable
        • 基本创建方式
        • 高级创建方式
        • 特殊用途Observable
      • 4.1.2 订阅Observer
        • 基本订阅方式
        • 背压处理
        • 调度器控制
        • 组合订阅
      • 4.1.3 使用操作符
        • 转换操作符
        • 过滤操作符
        • 组合操作符
        • 错误处理操作符
        • 条件操作符
        • 数学和聚合操作符
        • 实用操作符![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/453b1e743b334afb961ea96a8edf996e.png)
        • 实际应用示例

在这里插入图片描述

4.1 RxJava示例

RxJava是ReactiveX在Java虚拟机上的实现,它使用可观察序列来构建异步和基于事件的程序。RxJava提供了丰富的操作符来处理异步数据流,使开发者能够以声明式的方式组合异步操作。
在这里插入图片描述

4.1.1 创建Observable

Observable是RxJava中的基本构建块,代表一个可观察的数据源,能够发射0到N个数据项,然后可能以一个完成或错误通知终止。

基本创建方式
  1. Observable.just():创建一个发射指定值的Observable

    Observable<String> observable = Observable.just("Hello", "World");
    
  2. Observable.fromIterable():从集合创建Observable

    List<String> list = Arrays.asList("Apple", "Banana", "Cherry");
    Observable<String> observable = Observable.fromIterable(list);
    
  3. Observable.range():创建一个发射特定整数序列的Observable

    Observable<Integer> observable = Observable.range(1, 5); // 1, 2, 3, 4, 5
    
  4. Observable.interval():创建一个按固定时间间隔发射整数序列的Observable

    Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS); // 0, 1, 2... 每秒
    
  5. Observable.create():自定义Observable创建

    Observable<String> observable = Observable.create(emitter -> {emitter.onNext("First");emitter.onNext("Second");emitter.onComplete();
    });
    
  6. Observable.empty():创建一个不发射任何数据但正常终止的Observable

    Observable<String> observable = Observable.empty();
    
  7. Observable.error():创建一个不发射任何数据但以错误终止的Observable

    Observable<String> observable = Observable.error(new RuntimeException("Error occurred"));
    
  8. Observable.never():创建一个不发射任何数据也不终止的Observable

    Observable<String> observable = Observable.never();
    
高级创建方式

在这里插入图片描述

  1. Observable.defer():延迟创建,直到有观察者订阅

    Observable<Long> observable = Observable.defer(() -> Observable.just(System.currentTimeMillis()));
    
  2. Observable.fromCallable():从Callable创建,适合可能有异常抛出的场景

    Observable<String> observable = Observable.fromCallable(() -> {if (Math.random() > 0.5) {throw new IOException("Random error");}return "Success";
    });
    
  3. Observable.fromFuture():从Future创建

    Future<String> future = Executors.newSingleThreadExecutor().submit(() -> "Result from Future");
    Observable<String> observable = Observable.fromFuture(future);
    
  4. Observable.generate():同步生成复杂流

    Observable<Integer> observable = Observable.generate(() -> 0, // 初始状态(state, emitter) -> {emitter.onNext(state);if (state == 10) {emitter.onComplete();}return state + 1;}
    );
    
  5. Observable.merge():合并多个Observable

    Observable<String> first = Observable.just("A", "B", "C");
    Observable<String> second = Observable.just("1", "2", "3");
    Observable<String> merged = Observable.merge(first, second);
    
  6. Observable.concat():顺序连接多个Observable

    Observable<String> concatenated = Observable.concat(first, second);
    
  7. Observable.zip():组合多个Observable

    Observable<String> zipped = Observable.zip(first, second, (f, s) -> f + s);
    
特殊用途Observable

在这里插入图片描述

  1. Subject:既是Observable又是Observer

    • PublishSubject:向所有订阅者广播所有后续事件

      PublishSubject<String> subject = PublishSubject.create();
      subject.onNext("Hello");
      subject.subscribe(System.out::println); // 不会收到"Hello"
      subject.onNext("World"); // 会打印"World"
      
    • BehaviorSubject:向新订阅者发送最近的一个事件

      BehaviorSubject<String> subject = BehaviorSubject.createDefault("Initial");
      subject.onNext("First");
      subject.subscribe(System.out::println); // 立即收到"First"
      
    • ReplaySubject:向所有订阅者重放所有事件

      ReplaySubject<String> subject = ReplaySubject.create();
      subject.onNext("Hello");
      subject.subscribe(System.out::println); // 会收到"Hello"
      
    • AsyncSubject:只在完成时发送最后一个事件

      AsyncSubject<String> subject = AsyncSubject.create();
      subject.onNext("Hello");
      subject.onNext("World");
      subject.subscribe(System.out::println); // 不会立即收到
      subject.onComplete(); // 会打印"World"
      
  2. ConnectableObservable:需要调用connect()才开始发射数据

    ConnectableObservable<String> connectable = Observable.just("A", "B", "C").publish();
    connectable.subscribe(System.out::println);
    connectable.connect(); // 此时才开始发射数据
    

4.1.2 订阅Observer

在这里插入图片描述
Observer是RxJava中的消费者,用于接收Observable发射的数据和通知。

基本订阅方式
  1. 简单订阅

    Observable.just("Hello").subscribe();
    
  2. 带Consumer的订阅

    Observable.just("Hello").subscribe(value -> System.out.println("Received: " + value), // onNexterror -> System.err.println("Error: " + error),    // onError() -> System.out.println("Completed")             // onComplete);
    
  3. 使用Observer接口

    Observer<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("Subscribed");}@Overridepublic void onNext(String s) {System.out.println("Received: " + s);}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e);}@Overridepublic void onComplete() {System.out.println("Completed");}
    };Observable.just("Hello", "World").subscribe(observer);
    
  4. 使用Disposable控制订阅

    Disposable disposable = Observable.interval(1, TimeUnit.SECONDS).subscribe(System.out::println);// 在需要时取消订阅
    disposable.dispose();
    
背压处理

在这里插入图片描述
RxJava 2.x引入了Flowable来处理背压(Backpressure),当数据生产速度大于消费速度时:

  1. Flowable基本使用

    Flowable.range(1, 1000).onBackpressureBuffer() // 缓冲策略.observeOn(Schedulers.io()).subscribe(System.out::println);
    
  2. 背压策略

    • BUFFER:缓冲所有数据(可能导致OOM)
    • DROP:丢弃无法处理的数据
    • LATEST:只保留最新数据
    • ERROR:抛出MissingBackpressureException
    • MISSING:不实现背压,由下游处理
    Flowable.interval(1, TimeUnit.MILLISECONDS).onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped)).observeOn(Schedulers.computation()).subscribe(System.out::println);
    
调度器控制

RxJava使用Schedulers控制线程:

  1. subscribeOn:指定Observable操作执行的线程

    Observable.just("Hello").subscribeOn(Schedulers.io()) // 在IO线程执行.subscribe(System.out::println);
    
  2. observeOn:指定Observer接收数据的线程

    Observable.range(1, 5).observeOn(Schedulers.computation()) // 在计算线程接收.subscribe(System.out::println);
    
  3. 常用调度器

    • Schedulers.io():适合I/O操作(无界线程池)
    • Schedulers.computation():适合计算操作(固定大小线程池)
    • Schedulers.newThread():为每个任务创建新线程
    • Schedulers.single():单一线程顺序执行
    • Schedulers.trampoline():在当前线程排队执行
    • Schedulers.from(Executor):自定义Executor
组合订阅

在这里插入图片描述

  1. 合并多个订阅

    Observable<String> first = Observable.just("A", "B", "C");
    Observable<String> second = Observable.just("1", "2", "3");Observable.merge(first, second).subscribe(System.out::println);
    
  2. 条件订阅

    Observable<String> source = Observable.just("Hello", "World", "Error");source.flatMap(s -> {if ("Error".equals(s)) {return Observable.error(new RuntimeException("Error encountered"));}return Observable.just(s.toUpperCase());
    }).subscribe(System.out::println, System.err::println);
    
  3. 资源管理

    Observable.using(() -> new FileInputStream("file.txt"), // 创建资源inputStream -> Observable.just(readFile(inputStream)), // 使用资源inputStream -> inputStream.close() // 释放资源
    ).subscribe(System.out::println);
    

4.1.3 使用操作符

RxJava提供了数百个操作符来处理Observable流,下面介绍最常用的几类操作符。

转换操作符
  1. map:对每个元素应用函数

    Observable.just("Hello", "World").map(String::toUpperCase).subscribe(System.out::println);
    
  2. flatMap:将每个元素转换为Observable并合并

    Observable.just("Hello", "World").flatMap(s -> Observable.fromArray(s.split(""))).subscribe(System.out::println);
    
  3. concatMap:类似flatMap但保持顺序

    Observable.just("Hello", "World").concatMap(s -> Observable.fromArray(s.split(""))).subscribe(System.out::println);
    
  4. switchMap:只保留最新的Observable

    Observable.just("Hello", "World").switchMap(s -> Observable.interval(100, TimeUnit.MILLISECONDS).map(i -> s + " " + i).take(5)).subscribe(System.out::println);
    
  5. cast:强制类型转换

    Observable<Object> objObs = Observable.just("Hello");
    Observable<String> strObs = objObs.cast(String.class);
    
  6. scan:累加器函数

    Observable.range(1, 5).scan((sum, item) -> sum + item).subscribe(System.out::println); // 1, 3, 6, 10, 15
    
  7. groupBy:按条件分组

    Observable.just("Apple", "Banana", "Cherry", "Date").groupBy(s -> s.length()).flatMapSingle(group -> group.toList().map(list -> group.getKey() + ": " + list)).subscribe(System.out::println);
    
过滤操作符

在这里插入图片描述

  1. filter:基于条件过滤

    Observable.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println); // 2, 4, 6, 8, 10
    
  2. take:取前N个元素

    Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(System.out::println); // 0, 1, 2, 3, 4
    
  3. skip:跳过前N个元素

    Observable.range(1, 10).skip(5).subscribe(System.out::println); // 6, 7, 8, 9, 10
    
  4. distinct:去重

    Observable.just(1, 2, 2, 3, 1, 4).distinct().subscribe(System.out::println); // 1, 2, 3, 4
    
  5. distinctUntilChanged:过滤连续重复

    Observable.just(1, 1, 2, 2, 3, 1, 1, 4).distinctUntilChanged().subscribe(System.out::println); // 1, 2, 3, 1, 4
    
  6. first/last:取第一个/最后一个元素

    Observable.range(1, 10).first(0) // 默认值.subscribe(System.out::println); // 1
    
  7. elementAt:取指定位置的元素

    Observable.range(1, 10).elementAt(5) // 索引从0开始.subscribe(System.out::println); // 6
    
  8. sample/throttleLast:定期采样

    Observable.interval(100, TimeUnit.MILLISECONDS).sample(1, TimeUnit.SECONDS).subscribe(System.out::println); // 大约每秒一个数
    
  9. debounce/throttleWithTimeout:防抖动

    Observable.create(emitter -> {emitter.onNext("H");Thread.sleep(100);emitter.onNext("He");Thread.sleep(200);emitter.onNext("Hel");Thread.sleep(300);emitter.onNext("Hell");Thread.sleep(400);emitter.onNext("Hello");
    }).debounce(350, TimeUnit.MILLISECONDS).subscribe(System.out::println); // 只输出"Hello"
    
组合操作符

在这里插入图片描述

  1. merge:合并多个Observable

    Observable<String> first = Observable.interval(1, TimeUnit.SECONDS).map(i -> "First: " + i);
    Observable<String> second = Observable.interval(750, TimeUnit.MILLISECONDS).map(i -> "Second: " + i);Observable.merge(first, second).subscribe(System.out::println);
    
  2. concat:顺序连接多个Observable

    Observable.concat(Observable.just("First", "Second"),Observable.just("Third", "Fourth")
    ).subscribe(System.out::println); // First, Second, Third, Fourth
    
  3. zip:组合多个Observable

    Observable<String> letters = Observable.just("A", "B", "C");
    Observable<Integer> numbers = Observable.just(1, 2, 3);Observable.zip(letters, numbers, (l, n) -> l + n).subscribe(System.out::println); // A1, B2, C3
    
  4. combineLatest:当任一Observable发射时组合最新值

    Observable<String> letters = Observable.interval(1, TimeUnit.SECONDS).map(i -> "Letter" + (char)(i + 65));
    Observable<Integer> numbers = Observable.interval(750, TimeUnit.MILLISECONDS).map(i -> i + 1);Observable.combineLatest(letters, numbers, (l, n) -> l + n).subscribe(System.out::println);
    
  5. withLatestFrom:类似combineLatest但由主Observable触发

    letters.withLatestFrom(numbers, (l, n) -> l + n).subscribe(System.out::println);
    
  6. startWith:在Observable开始前插入数据

    Observable.just("World").startWith("Hello").subscribe(System.out::println); // Hello, World
    
  7. switchOnNext:切换Observable流

    Observable<Observable<String>> observables = Observable.just(Observable.interval(100, TimeUnit.MILLISECONDS).map(i -> "A" + i),Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "B" + i)
    );Observable.switchOnNext(observables.delay(500, TimeUnit.MILLISECONDS)).subscribe(System.out::println);
    
错误处理操作符

在这里插入图片描述

  1. onErrorReturn:出错时返回默认值

    Observable.error(new RuntimeException("Error")).onErrorReturn(e -> "Default").subscribe(System.out::println); // Default
    
  2. onErrorResumeNext:出错时切换到另一个Observable

    Observable.error(new RuntimeException("Error")).onErrorResumeNext(Observable.just("A", "B", "C")).subscribe(System.out::println); // A, B, C
    
  3. retry:重试

    Observable.create(emitter -> {System.out.println("Attempting");emitter.onError(new RuntimeException("Failed"));
    }).retry(3).subscribe(System.out::println, System.err::println);
    
  4. retryWhen:条件重试

    Observable.error(new RuntimeException("Error")).retryWhen(errors -> errors.zipWith(Observable.range(1, 3), (e, i) -> i).flatMap(i -> {System.out.println("Retry #" + i);return Observable.timer(i, TimeUnit.SECONDS);})).subscribe(System.out::println, System.err::println);
    
条件操作符
  1. all:是否所有元素满足条件

    Observable.range(1, 5).all(i -> i < 10).subscribe(System.out::println); // true
    
  2. contains:是否包含指定元素

    Observable.just("A", "B", "C").contains("B").subscribe(System.out::println); // true
    
  3. isEmpty:是否为空

    Observable.empty().isEmpty().subscribe(System.out::println); // true
    
  4. defaultIfEmpty:如果为空提供默认值

    Observable.empty().defaultIfEmpty("Default").subscribe(System.out::println); // Default
    
  5. sequenceEqual:比较两个Observable序列

    Observable.sequenceEqual(Observable.just(1, 2, 3),Observable.just(1, 2, 3)
    ).subscribe(System.out::println); // true
    
数学和聚合操作符

在这里插入图片描述

  1. count:计数

    Observable.range(1, 10).count().subscribe(System.out::println); // 10
    
  2. reduce:累积计算

    Observable.range(1, 5).reduce((sum, i) -> sum + i).subscribe(System.out::println); // 15
    
  3. collect:收集到容器

    Observable.range(1, 5).collect(ArrayList::new, List::add).subscribe(System.out::println); // [1, 2, 3, 4, 5]
    
  4. toList/toMap/toSet:转换为集合

    Observable.just("A", "B", "A").toSet().subscribe(System.out::println); // [A, B]
    
  5. sum/average/max/min:数学运算

    Observable.range(1, 5).map(Integer::doubleValue).average().subscribe(System.out::println); // 3.0
    
实用操作符在这里插入图片描述
  1. doOnNext/doOnError/doOnComplete:副作用操作

    Observable.just("Hello").doOnNext(s -> System.out.println("About to emit: " + s)).doOnComplete(() -> System.out.println("Completed")).subscribe();
    
  2. materialize/dematerialize:将通知转换为对象

    Observable.just("Hello").materialize().subscribe(notification -> {if (notification.isOnNext()) {System.out.println("Value: " + notification.getValue());} else if (notification.isOnComplete()) {System.out.println("Completed");}});
    
  3. timeInterval/timestamp:添加时间信息

    Observable.interval(1, TimeUnit.SECONDS).take(3).timeInterval().subscribe(ti -> System.out.println("Value: " + ti.value() + ", Interval: " + ti.time() + "ms"));
    
  4. cache:缓存发射的数据

    Observable<Long> cached = Observable.interval(1, TimeUnit.SECONDS).take(5).cache();cached.subscribe(System.out::println); // 开始发射
    Thread.sleep(3000);
    cached.subscribe(System.out::println); // 从缓存中获取
    
  5. replay:重放给后续订阅者

    ConnectableObservable<Long> replay = Observable.interval(1, TimeUnit.SECONDS).take(5).replay();replay.connect(); // 开始发射
    Thread.sleep(3000);
    replay.subscribe(System.out::println); // 从开始重放
    
实际应用示例

在这里插入图片描述

  1. 网络请求组合

    Observable<Profile> profileObservable = userService.getProfile(userId);
    Observable<List<Friend>> friendsObservable = userService.getFriends(userId);Observable.zip(profileObservable, friendsObservable, (profile, friends) -> new UserData(profile, friends)).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(userData -> updateUI(userData),error -> showError(error));
    
  2. 搜索建议

    RxTextView.textChanges(searchInput).debounce(300, TimeUnit.MILLISECONDS).filter(text -> text.length() > 2).distinctUntilChanged().switchMap(query -> searchService.suggest(query.toString()).onErrorResumeNext(Observable.empty())).observeOn(AndroidSchedulers.mainThread()).subscribe(suggestions -> updateSuggestions(suggestions));
    
  3. 轮询检查

    Observable.interval(5, TimeUnit.SECONDS).flatMap(i -> checkStatus().retryWhen(errors -> errors.delay(1, TimeUnit.SECONDS))).distinctUntilChanged().subscribe(status -> updateStatus(status));
    
  4. 批量处理

    Observable.fromIterable(hugeList).buffer(100) // 每100个一批.flatMap(batch -> processBatch(batch).subscribeOn(Schedulers.io())).subscribe(result -> aggregateResult(result));
    
  5. 事件总线

    public class RxEventBus {private final PublishSubject<Object> subject = PublishSubject.create();public void post(Object event) {subject.onNext(event);}public <T> Observable<T> observe(Class<T> eventClass) {return subject.ofType(eventClass);}
    }
    

RxJava的操作符非常丰富,掌握这些操作符能够帮助开发者高效处理各种异步数据流场景。实际开发中,应根据具体需求选择合适的操作符组合,同时注意线程调度和资源管理,以构建高效可靠的响应式应用。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com