2016-12-30 2 views
2

В одном из моих проектов, я следующий код, и при переходе на RxJS5, Rx.Observer кажется, больше не будет определено: неПереход от RxJS4 к RxJS5 - реализации наблюдателя

let index = 0; 

let obsEnqueue = this.obsEnqueue = new Rx.Subject(); 

this.queueStream = Rx.Observable.create(obs => { 
    var push = Rx.Observer.create(v => {    // ! error 
     if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) { 
      obs.next(v); 
     } 
    }); 
    return obsEnqueue.subscribe(push); 
}); 

this.push = (v) => { 
    obsEnqueue.next(v); 
    index++; 
}; 

это уже не работает, потому что Rx.Observer не определен

в руководстве по миграции:

https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md

он говорит:

Observer is an interface now

Однако это не должно означать, что Rx.Observer, даже если это интерфейс, не должен иметь «статический» метод, называемый create.

В любом случае, Rx.Observer больше не существует. я получаю эту ошибку:

TypeError: Cannot read property 'create' of undefined 

Как я могу создать наблюдателя каким-то образом получают аналогичные результаты моего кода выше?

ответ

1

От источника:

export interface Observer<T> { 
    closed?: boolean; 
    next: (value: T) => void; 
    error: (err: any) => void; 
    complete:() => void; 
} 

Observer<T> представляет собой интерфейс с методом onNext, onCompleted и onError. Интерфейс - это только языковая конструкция. Он просто используется компилятором typescript для проверки типов объектов, требующих Observer<T>. Он стирается при компиляции.

Класс Subscriber<T> реализует интерфейс Observer<T>. Это означает, что Subscriber<T> - это конкретный класс класса с указанными выше методами.

Вместо этого вы используете var push = Rx.Subscriber.create(v => { [...].

Примечание:

В первоначальной реализации Rx, интерфейсы были IObservable<T> и IObserver<T> и использовали методы расширения, чтобы позволить композиции. Когда дело дошло до JS, они должны были иметь методы на прототипе Observable/Observer, чтобы включить композицию, поэтому сам класс имел свои методы.

+0

, как вы думаете, код можно было бы упростить так, как Марк? –

+1

Нет, вам нужна прямая ссылка для наблюдения. – Asti

+0

в коде, который вы мне дали, я пытаюсь выяснить, какая разница между использованием this.queueStream и this.obsEnqueue, приведет ли это к тому же или нет? Зачем? Пожалуйста, lmk спасибо! –

1

Почему бы не просто встроить функцию onNext непосредственно в подписку?

this.queueStream = Rx.Observable.create(obs => { 
    return obsEnqueue.subscribe(
     v => { 
     if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) { 
      obs.next(v); 
     } 
     } 
    ); 
}); 
+0

переменная проталкивание не Defi в моем случае, похоже, нам нужен дескриптор на этом –

2

Честно говоря, я не понимаю, что вы код делает, но даже если Observer класс не существует больше он был в основном заменен Subscriber класса, который используется почти так же, как Observer.

У этого есть статический метод Subscriber.create. См. https://github.com/ReactiveX/rxjs/blob/master/src/Subscriber.ts#L32

Этот метод возвращает объект Subscriber, который впоследствии можно использовать, например obsEnqueue.subscribe(push);.

+0

LOL Я действительно не знаю, как это работает, я украл его у кого-то другого, и он работает, я просто не знаю, как :) Я думаю, там был бы способ упростить его, и Марк получает от этого –

1

Если я понял, что этот частичный код пытается сделать ...
Я думаю Нет,
Я не вижу, как вы можете сделать это «проще».
возможно, что там будет улучшена
, чтобы сделать его более «многоразовый»,
сделать это модуль?,
или, возможно, оператор Rx, если есть не один, как же ...

это может быть попыткой ему

/* 
    "dependencies": { 
     "rxjs": "^5.0.2" 
    } 
*/ 

import {Observable, Observer, Subject, Subscriber} from "rxjs"; 

export interface ICircularQueue<T> extends Observable<T> { 
    push(value: T): void; 
} 

/** 
* on every push use 'NEXT' observer/subscription, 
* in the order they've been subscribed, 
* cycling back to 1st subscription after last 
*/ 
export function create<T>(): ICircularQueue<T> { 

    let index = 0; 

    let obsEnqueue = new Subject<T>(); 

    let queueStream = Observable.create((obs: Observer<T>) => { 

     let push = Subscriber.create<T>(v => { 
      // ! error ? 
      if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) { 
       obs.next(v); 
      } 
     }); 

     return obsEnqueue.subscribe(push); 
    }); 

    queueStream.push = (v: T) => { 
     obsEnqueue.next(v); 
     index++; 
    }; 

    return queueStream; 
} 

затем основной тест ....

import * as CircularQueue from "./CircularQueue"; 
import * as assert from "assert"; 

const $in = (array: any[], x: any) => { 
    for (let $x of array) { 
     if ($x === x) { return true; } 
    } 
    return false; 
}; 

describe("CircularQueue",() => { 

    it("works",() => { 
     let queue = CircularQueue.create(); 

     let result: number[] = []; 
     queue.subscribe(x => { 
      assert.ok($in([0, 4, 8], x)); 
      result.push(0); 
     }); 
     queue.subscribe(x => { 
      assert.ok($in([1, 5, 9], x)); 
      result.push(1); 
     }); 
     queue.subscribe(x => { 
      assert.ok($in([2, 6, 10], x)); 
      result.push(2); 
     }); 
     queue.subscribe(x => { 
      assert.ok($in([3, 7, 11], x)); 
      result.push(3); 
     }); 

     for (let i = 0; i < 12; i++) { 
      queue.push(i); 
     } 

     assert.equal(result.join(), "0,1,2,3,0,1,2,3,0,1,2,3"); 
    }); 
}); 

 Смежные вопросы

  • Нет связанных вопросов^_^