2017-02-20 30 views
0

Я изучаю rxSwift, и я хочу сделать сервис для взаимодействия сервера longpolling c этой службой, имитирующей постоянное соединение. Я написал это, но мне кажется, разве это не могло быть сделано лучше? Возможно ли каким-то образом повторить Observable, независимо от ошибки, и в зависимости от ответа сервера longpoll.LongPolling С rxSwift

Может ли кто-нибудь поделиться решением? Или помогите с советом? Как лучше организовать? Я хотел бы видеть лучшее решение, так как только начал изучать rxswift

class LongPollingService { 

    public var messageReciver: PublishSubject<EventProtocol> = PublishSubject<EventProtocol>() 

    private let transport = DefaultTransport() 

    private let disposeBag = DisposeBag() 

    private var currentRequestInfo = Variable<LongpollingServerInfo?>(nil) 

    private var currentRequestDisposable: Disposable? 

    private var currentLongpollingConnection: Disposable? // Subsribee for request server info 

    private var eventListener : Disposable? 

    private var currentReqursiveConnection: Disposable? // Subscriber for event listener from longpoll server 

    func startObservableEvents() { 
     getServerConnection() 
     subscribeServerInfo() 
     //testing listen events 
     eventListener = messageReciver.showMessagesInDebugMode().subscribe() 
     eventListener?.addDisposableTo(disposeBag) 
    } 

    func disconnect() { 
     currentRequestDisposable?.dispose() 
     currentLongpollingConnection?.dispose() 
     currentReqursiveConnection?.dispose() 
    } 

    private func subscribeServerInfo() { 
     currentLongpollingConnection = currentRequestInfo 
      .asObservable() 
      .filter({$0 != nil}) 
      .subscribe(onNext: { [weak self] (info) in 
       guard let sSelf = self else { return } 
       sSelf.subscribeToEvents(timeStamp: info!.ts) 
      }) 
     currentLongpollingConnection?.addDisposableTo(disposeBag) 
    } 

    private func subscribeToEvents(timeStamp: TimeInterval) { 
     if let serverInfo = currentRequestInfo.value { 
      currentReqursiveConnection?.dispose() 
      currentReqursiveConnection = getEventsFromLongpollServer(serverInfo: serverInfo, with: timeStamp) 
       .flatMap(parseUpdates) 
       .flatMap(reciveEvents) 
       .showErrorsSwiftMessagesInDebugMode() 
       .subscribe(onNext: { [weak self] updates in 
        guard let sSelf = self else { return } 
        sSelf.subscribeToEvents(timeStamp: updates) 
       }, 
       onError: { [weak self] error in 
        guard let sSelf = self else { return } 
         if let error = error as? LongPollError { 
          switch error { 
          case .olderHistory(let ts): sSelf.subscribeToEvents(timeStamp: ts) 
          default: sSelf.getServerConnection() 
          } 
         } 
       }) 
      currentReqursiveConnection?.addDisposableTo(disposeBag) 
     } 
    } 

    private func getServerConnection() { 
     //get longpolling server info for connection. 
     currentRequestDisposable = getLongpollServerInfo() 
      .subscribe(onNext: {[weak self] info in 
       guard let sSelf = self else { return } 
       sSelf.currentRequestInfo.value = info 
      }) 
     currentRequestDisposable?.addDisposableTo(disposeBag) 
    } 

    private func parseUpdates(json: Any) throws -> Observable<LongPollingUpdates> { 
     let response = try Mapper<LongPollingUpdates>().map(JSONObject: json) 
     return .just(response) 
    } 

    private func reciveEvents(updates:LongPollingUpdates) throws -> Observable<TimeInterval> { 
     if let errors = updates.failed { 
      throw parseErrors(errors: errors) 
     } 
     if let events = updates.updates { 
      parseUpdates(updates: events) 
     } 
     return Observable.just(updates.timeStamp!) 
    } 

    private func parseUpdates(updates: [[Any]]) { 
     updates.forEach { (array) in 
      let firstElementInUpdate = array.first 
      if let update = firstElementInUpdate as? Int { 
       switch update { 
       case 1: break 
       case 2: break 
       case 3: break 
       case 4: messageReciver.onNext(NewMessage(array: array)) 
       default: break 
       } 
      } 
     } 
    } 

    private func parseErrors(errors: [String: Any]) -> LongPollError { 
     if let error = errors["failed"] as? Int { 
      switch error { 
      case 1: 
       guard let ts = errors["ts"] as? TimeInterval else { return .unkownError } 
       return .olderHistory(ts: ts) 
      case 2: return .needNewkey 
      case 3: return .needCaseAndTs 
      case 4: return .unkownVersion 
      default: 
       return .unkownError 
      } 
     } 
     return .unkownError 
    } 

    private func getEventsFromLongpollServer(serverInfo: LongpollingServerInfo, with ts: TimeInterval) -> Observable<Any> { 
     let url = buildLongPollingServerRoute(from: serverInfo, with: ts) 
     let request = buldLongPollRequst(route: url) 
     let requestConvert = try? URLEncoding.default.encode(request!, with: nil) 
     return transport.makeRequest(request: requestConvert!) 
    } 

    private func getEventsFromLongpollServer(serverInfo: LongpollingServerInfo) -> Observable<Any> { 
     let url = buildLongPollingServerRoute(from: serverInfo) 
     let request = buldLongPollRequst(route: url) 
     let requestConvert = try? URLEncoding.default.encode(request!, with: nil) 
     return transport.makeRequest(request: requestConvert!) 
    } 

    private func getLongpollServerInfo() -> Observable<LongpollingServerInfo> { 
     let request = MessageRouter.getLongpollServer(useSsl: false, needPts: false) 
     return transport.makeModel(request: request) 
    } 

} 

ответ

1

Так если у вас есть функции, как:

func getData() -> Observable<Data> 

И вы хотите длительный опрос его в определенной period, вы можете сделать что-то вроде этого:

Observable<Int>.interval(period, scheduler: MainScheduler.instance) 
    .map { _ in return } 
    .flatMap(getData) 
    .subscribe(/* ... handle data ... */) 
    .disposed(by: disposeBag) 

Вы можете использовать другие планировщики, чем MainScheduler, если это более уместно.

Теперь, если вы хотите также обрабатывать Error сек, что getData может излучать, и вы не хотите, чтобы обязательно отписываться длинный опрос, то вы можете сделать это:

func handleError(error: Error) -> Observable<Data> { 
    return Observable.empty() 
} 

Observable<Int>.interval(period, scheduler: MainScheduler.instance) 
    .map { _ in return } 
    .flatMap { return getData.catchError(handleError) } 
    .subscribe(/* ... handle data ... */) 
    .disposed(by: disposeBag) 

Вы также можете анализировать ошибки в handleError и решите, хотите ли вы продолжить, испустив пустой Observable или отменив длительный опрос, выпустив еще одну ошибку.

+0

Спасибо, но этот интервал не подходит для моих целей, мне нужно, чтобы событие не было определенной периодичностью, и со времени последнего события, которое приходит с сервером. subscribeToEvents (timeStamp: - timestamp это параметр как последнее событие, которое произошло на сервере. – Zept