2016-06-28 14 views
1

Используя puma, faye-websocket-ruby и eventmachine, я пытаюсь реализовать сервер WebSocket, который расширяется для поддержки каналов с использованием redis.rb. Каждый клиент будет предоставлять канал, используя маршрут, который в настоящее время находится в разработке, как: «/ C# {random number}». Вся эта логика должна находиться на сервере, поскольку клиентами будут микропроцессорные системы Python, которые не будут поддерживать библиотеки более высокого уровня.Восстановление времени и ошибок в WebSocket и EventMachine

Мой код был основан на ruby-websockets-chat-demo, в качестве отправной точки. Одно из основных изменений заключалось в том, чтобы настроить его для поддержки нескольких каналов во время WebSocket «on open».

Код работает при нормальной работе. Однако часто, когда один клиент падает, сервер зависает до его перезапуска. Я пытаюсь решить эту проблему, но пока не могу этого сделать. Первоначально Heroku выдавал тайм-аут H12. Я реализовал тайм-аут стойки. Я попытался спасти тайм-ауты внутри сервера, но они никогда не срабатывают. Я реализовал событие «on error» внутри сервера, но он никогда не срабатывает. Чаще всего сервер просто уходит до перезапуска. Клиент должен заботиться о себе, но мне нужен сервер для восстановления и продолжения.

config.ru:

require './app' 
require './middlewares/myserver_backend' 
require 'rack-timeout' 
use Rack::Timeout, service_timeout: 20, wait_timeout: 30, wait_overtime: 60, service_past_wait: false 
use Myserver::MyserverBackend 
run Myserver::App 

Rack промежуточного слоя "бэкенд":

%w(faye/websocket thread redis json erb).each { |m| require m } 
module Myserver 
    class MyserverBackend 
    KEEPALIVE_TIME = ENV['KEEPALIVE_TIME'] 

    def initialize(app) 
     @app = app 
     @clients = [] 
     @uri = URI.parse(ENV["REDISCLOUD_URL"]) 
     @redis = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password) 
    end 

    def call(env) 
     begin 
     if Faye::WebSocket.websocket?(env) 
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME}) 
      ws.on :open do |event| 
      channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length] 
      Thread.new do 
       redis_sub = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password) 
       redis_sub.subscribe(channel) do |on| 
       on.message do |message_channel, message| 
        puts "MyserverBackend>> Redis  message received on channel:#{message_channel}; Message is:#{message};" 
        @clients.each { |clients_ws, clients_channel| clients_ws.send(message) if clients_channel == message_channel } 
       end 
       end 
      end 
      @clients << [ws, channel] 
      @clients.each do |clients_ws, clients_channel| 
       puts "MyserverBackend>> Client:#{clients_ws.object_id}; Channel:#{clients_channel};" 
      end 
      end 

      ws.on :message do |event| 
      @clients.each do |clients_ws, clients_channel| 
       if clients_ws == ws 
       puts "MyserverBackend>> Websocket message received on channel:#{clients_channel}; Message is:#{event.data};" 
       @redis.publish(clients_channel, sanitize(event.data)) 
       end 
      end 
      end 

      ws.on :close do |event| 
      # Close all channels for this client first 
      # ws gives a channel which we use to identify it here, but we're closing all of those that are open 
      @clients.each { |clients_ws, clients_channel| @redis.unsubscribe(clients_channel) if clients_ws == ws } 
      @clients.delete_if { |clients_ws, clients_channel| clients_ws == ws } 
      channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length] 
      puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};" 
      ws = nil 
      end 

      ws.on :error do |event| 
      puts "Error raised:#{nil}; ws:#{ws.object_id};" 
      ws.close unless ws.nil? 
      end 

      # Return async Rack response 
      ws.rack_response 

     else 
      @app.call(env) 
     end 

     rescue Rack::Timeout::RequestTimeoutError, Rack::Timeout::RequestExpiryError => exception 
     puts "Exception raised:#{exception}; ws:#{ws.object_id};" 
     ws.close(code=4999, reason=9999) unless ws.nil? 
     # ensure is executed immediately so it doesn't help... 
     end 
    end 

    private 
    def sanitize(message) 
     json = JSON.parse(message) 
     json.each { |key, value| json[key] = ERB::Util.html_escape(value) } 
     JSON.generate(json) 
    end 
    end 
end 

Синатра "интерфейс":

# https://github.com/heroku-examples/ruby-websockets-chat-demo 
require 'rubygems' 
require 'bundler' 
require 'sinatra/base' 
ENV['RACK_ENV'] ||= 'development' 
Bundler.require 
$: << File.expand_path('../', __FILE__) 
$: << File.expand_path('../lib', __FILE__) 

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file } 
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV'] 

module Myserver 
    class App < Sinatra::Base 
    get "/" do 
     erb :"index.html" 
    end 

    get "/assets/js/application.js" do 
     content_type :js 
     @scheme = env == "production" ? "wss://" : "ws://" 
     erb :"application.js" 
    end 
    end 
end 

Тестовый клиент:

# https://github.com/faye/faye-websocket-ruby/issues/52 
# https://github.com/faye/faye-websocket-ruby 
%w(bundler/setup faye/websocket eventmachine json).each { |m| require m } 
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file } 
class ClientWs 

    def self.em_run 
    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV'] 
    EM.run do 

     uri = 'myserver.herokuapp.com' 
     #uri = 'localhost' if env == 'development' 
     channel = "C#{rand(999999999999).to_s}" 
     url = uri == 'localhost' ? "ws://#{uri}:3000/#{channel}" : "ws://#{uri}/#{channel}" 
     @ws = Faye::WebSocket::Client.new(url) 
     start = Time.now 
     count ||= 0 

     timer = EventMachine.add_periodic_timer(5+rand(5)) { 
     count += 1 
     send({'PING': channel, 'COUNT': count.to_s}) 
     } 

     @ws.on :open do |event| 
     puts "{'OPEN':#{channel}}" 
     ClientWs.send({'OPEN': channel}) 
     end 

     @ws.on :message do |event| 
     @ip_address ||= Addrinfo.ip(URI.parse(event.target.url).host).ip_address 
     begin 
      parsed = JSON.parse event.data 
     rescue => e 
      puts ">>>> [Error! Failed to parse JSON]" 
      puts ">>>> [#{e.message}]" 
      puts ">>>> #{event.data}" 
     end 
     puts ">> #{@ip_address}:#{channel}:#{event.data};" 
     end 

     @ws.on :close do |event| 
     timer.cancel 
     stop = Time.now - start 
     puts "#{stop} seconds;" 
     p [:close, event.code, event.reason] 
     ws = nil 
     ClientWs.em_run 
     end 
    end 
    end 

    def self.send message 
    payload = message.is_a?(Hash) ? message : {payload: message} 
    @ws.send(payload.to_json) 
    end 

end 
ClientWs.em_run 

Gemfile.lock:

GEM 
    remote: https://rubygems.org/ 
    specs: 
    activesupport (4.2.5.1) 
     i18n (~> 0.7) 
     json (~> 1.7, >= 1.7.7) 
     minitest (~> 5.1) 
     thread_safe (~> 0.3, >= 0.3.4) 
     tzinfo (~> 1.1) 
    eventmachine (1.2.0.1-x86-mingw32) 
    faye-websocket (0.10.4) 
     eventmachine (>= 0.12.0) 
     websocket-driver (>= 0.5.1) 
    i18n (0.7.0) 
    json (1.8.3) 
    json_pure (1.8.3) 
    minitest (5.9.0) 
    multi_json (1.12.1) 
    oj (2.16.1) 
    permessage_deflate (0.1.3) 
    progressbar (0.21.0) 
    puma (3.4.0) 
    rack (1.6.4) 
    rack-protection (1.5.3) 
     rack 
    rack-timeout (0.4.2) 
    rake (11.2.2) 
    redis (3.3.0) 
    rollbar (2.11.5) 
     multi_json 
    sinatra (1.4.7) 
     rack (~> 1.5) 
     rack-protection (~> 1.4) 
     tilt (>= 1.3, < 3) 
    thread_safe (0.3.5) 
    tilt (2.0.5) 
    tzinfo (1.2.2) 
     thread_safe (~> 0.1) 
    websocket-driver (0.6.4) 
     websocket-extensions (>= 0.1.0) 
    websocket-extensions (0.1.2) 

PLATFORMS 
    x86-mingw32 

DEPENDENCIES 
    activesupport (= 4.2.5.1) 
    bundler 
    faye-websocket 
    json_pure 
    oj (~> 2.16.0) 
    permessage_deflate 
    progressbar 
    puma 
    rack 
    rack-timeout 
    rake 
    redis (>= 3.2.0) 
    rollbar 
    sinatra 

RUBY VERSION 
    ruby 2.2.4p230 

BUNDLED WITH 
    1.12.5 

Что видит клиент при попытке подключиться к серверу застопорился:

ruby client.rb 
20.098119 seconds; 
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"] 
20.07921 seconds; 
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"] 
20.075731 seconds; 
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"] 

конфигурации/puma.rb:

env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV'] 
if env.nil? || env == 'development' || env == 'test' 
    concurrency = 0 # Set to zero to ensure single mode, not clustered mode 
    max_threads = 1 
end 
# WEB_CONCURRENCY and RAILS_MAX_THREADS == 1 in Heroku for now. 
concurrency ||= (ENV['WEB_CONCURRENCY'] || 2) 
max_threads ||= (ENV['RAILS_MAX_THREADS'] || 5) 
worker_timeout 15 
workers Integer(concurrency) 
threads_count = Integer(max_threads) 
threads threads_count, threads_count 

#preload_app! 

rackup  DefaultRackup 
port  ENV['PORT']  || 3000 
environment ENV['RACK_ENV'] || 'development' 

ответ

0

Что мне нужно do было завершено событие сервера «on close». Ему нужно было очистить все, а затем перезапустить себя, чего он не делал.

Однако мне не нравится этот окончательный ответ. Вопрос в том, почему сервер закрывает магазин, заканчивается и перезапускается только потому, что клиент упал? Разве нет более чистого способа сметать детрит неудавшегося клиента? Последующее действие: это исправление действительно отвечает на этот конкретный вопрос, в любом случае, в том, что завершение onclose разрешило указанную проблему. Дальнейшие усовершенствования наполнили события WebSocket клиента в дополнение к событиям Redis, так что onclose закрывает клиент, а не сервер.

Новое событие:

ws.on :close do |event| 
    if @debug 
     puts "MyserverBackend>> Close entered. Last error:#{$!.class}:#{$!.to_s};Module:#{$0};Line:#{$.};" 
     [email protected] { |backtrace| puts backtrace } 
     exit 
    end 
    @clients.each do |clients_ws, clients_channel| 
     begin 
     @redis.unsubscribe(clients_channel) 
     rescue RuntimeError => exception 
      unless exception.to_s == "Can't unsubscribe if not subscribed." 
      raise 
      end 
     false 
     end 
    end 
    @clients.delete_if { |clients_ws, clients_channel| clients_ws == ws } 
    channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length] 
    puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};" 
    ws = nil 
    app = Myserver::App 
    myserver = MyserverBackend.new(app) 
    myserver 
    end 

 Смежные вопросы

  • Нет связанных вопросов^_^