Используя puma, faye-websocket-ruby и eventmachine, я пытаюсь реализовать сервер WebSocket, который расширяется для поддержки каналов с использованием redis.rb. Каждый клиент будет предоставлять канал, используя маршрут, который в настоящее время находится в разработке, как: «/ C# {random number}». Вся эта логика должна находиться на сервере, поскольку клиентами будут микропроцессорные системы Python, которые не будут поддерживать библиотеки более высокого уровня.Восстановление времени и ошибок в WebSocket и EventMachine
Мой код был основан на ruby-websockets-chat-demo, в качестве отправной точки. Одно из основных изменений заключалось в том, чтобы настроить его для поддержки нескольких каналов во время WebSocket «on open».
Код работает при нормальной работе. Однако часто, когда один клиент падает, сервер зависает до его перезапуска. Я пытаюсь решить эту проблему, но пока не могу этого сделать. Первоначально Heroku выдавал тайм-аут H12. Я реализовал тайм-аут стойки. Я попытался спасти тайм-ауты внутри сервера, но они никогда не срабатывают. Я реализовал событие «on error» внутри сервера, но он никогда не срабатывает. Чаще всего сервер просто уходит до перезапуска. Клиент должен заботиться о себе, но мне нужен сервер для восстановления и продолжения.
config.ru:
require './app'
require './middlewares/myserver_backend'
require 'rack-timeout'
use Rack::Timeout, service_timeout: 20, wait_timeout: 30, wait_overtime: 60, service_past_wait: false
use Myserver::MyserverBackend
run Myserver::App
Rack промежуточного слоя "бэкенд":
%w(faye/websocket thread redis json erb).each { |m| require m }
module Myserver
class MyserverBackend
KEEPALIVE_TIME = ENV['KEEPALIVE_TIME']
def initialize(app)
@app = app
@clients = []
@uri = URI.parse(ENV["REDISCLOUD_URL"])
@redis = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password)
end
def call(env)
begin
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
Thread.new do
redis_sub = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password)
redis_sub.subscribe(channel) do |on|
on.message do |message_channel, message|
puts "MyserverBackend>> Redis message received on channel:#{message_channel}; Message is:#{message};"
@clients.each { |clients_ws, clients_channel| clients_ws.send(message) if clients_channel == message_channel }
end
end
end
@clients << [ws, channel]
@clients.each do |clients_ws, clients_channel|
puts "MyserverBackend>> Client:#{clients_ws.object_id}; Channel:#{clients_channel};"
end
end
ws.on :message do |event|
@clients.each do |clients_ws, clients_channel|
if clients_ws == ws
puts "MyserverBackend>> Websocket message received on channel:#{clients_channel}; Message is:#{event.data};"
@redis.publish(clients_channel, sanitize(event.data))
end
end
end
ws.on :close do |event|
# Close all channels for this client first
# ws gives a channel which we use to identify it here, but we're closing all of those that are open
@clients.each { |clients_ws, clients_channel| @redis.unsubscribe(clients_channel) if clients_ws == ws }
@clients.delete_if { |clients_ws, clients_channel| clients_ws == ws }
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};"
ws = nil
end
ws.on :error do |event|
puts "Error raised:#{nil}; ws:#{ws.object_id};"
ws.close unless ws.nil?
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
rescue Rack::Timeout::RequestTimeoutError, Rack::Timeout::RequestExpiryError => exception
puts "Exception raised:#{exception}; ws:#{ws.object_id};"
ws.close(code=4999, reason=9999) unless ws.nil?
# ensure is executed immediately so it doesn't help...
end
end
private
def sanitize(message)
json = JSON.parse(message)
json.each { |key, value| json[key] = ERB::Util.html_escape(value) }
JSON.generate(json)
end
end
end
Синатра "интерфейс":
# https://github.com/heroku-examples/ruby-websockets-chat-demo
require 'rubygems'
require 'bundler'
require 'sinatra/base'
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
module Myserver
class App < Sinatra::Base
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end
end
Тестовый клиент:
# https://github.com/faye/faye-websocket-ruby/issues/52
# https://github.com/faye/faye-websocket-ruby
%w(bundler/setup faye/websocket eventmachine json).each { |m| require m }
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
class ClientWs
def self.em_run
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
EM.run do
uri = 'myserver.herokuapp.com'
#uri = 'localhost' if env == 'development'
channel = "C#{rand(999999999999).to_s}"
url = uri == 'localhost' ? "ws://#{uri}:3000/#{channel}" : "ws://#{uri}/#{channel}"
@ws = Faye::WebSocket::Client.new(url)
start = Time.now
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) {
count += 1
send({'PING': channel, 'COUNT': count.to_s})
}
@ws.on :open do |event|
puts "{'OPEN':#{channel}}"
ClientWs.send({'OPEN': channel})
end
@ws.on :message do |event|
@ip_address ||= Addrinfo.ip(URI.parse(event.target.url).host).ip_address
begin
parsed = JSON.parse event.data
rescue => e
puts ">>>> [Error! Failed to parse JSON]"
puts ">>>> [#{e.message}]"
puts ">>>> #{event.data}"
end
puts ">> #{@ip_address}:#{channel}:#{event.data};"
end
@ws.on :close do |event|
timer.cancel
stop = Time.now - start
puts "#{stop} seconds;"
p [:close, event.code, event.reason]
ws = nil
ClientWs.em_run
end
end
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
ClientWs.em_run
Gemfile.lock:
GEM
remote: https://rubygems.org/
specs:
activesupport (4.2.5.1)
i18n (~> 0.7)
json (~> 1.7, >= 1.7.7)
minitest (~> 5.1)
thread_safe (~> 0.3, >= 0.3.4)
tzinfo (~> 1.1)
eventmachine (1.2.0.1-x86-mingw32)
faye-websocket (0.10.4)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
i18n (0.7.0)
json (1.8.3)
json_pure (1.8.3)
minitest (5.9.0)
multi_json (1.12.1)
oj (2.16.1)
permessage_deflate (0.1.3)
progressbar (0.21.0)
puma (3.4.0)
rack (1.6.4)
rack-protection (1.5.3)
rack
rack-timeout (0.4.2)
rake (11.2.2)
redis (3.3.0)
rollbar (2.11.5)
multi_json
sinatra (1.4.7)
rack (~> 1.5)
rack-protection (~> 1.4)
tilt (>= 1.3, < 3)
thread_safe (0.3.5)
tilt (2.0.5)
tzinfo (1.2.2)
thread_safe (~> 0.1)
websocket-driver (0.6.4)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.2)
PLATFORMS
x86-mingw32
DEPENDENCIES
activesupport (= 4.2.5.1)
bundler
faye-websocket
json_pure
oj (~> 2.16.0)
permessage_deflate
progressbar
puma
rack
rack-timeout
rake
redis (>= 3.2.0)
rollbar
sinatra
RUBY VERSION
ruby 2.2.4p230
BUNDLED WITH
1.12.5
Что видит клиент при попытке подключиться к серверу застопорился:
ruby client.rb
20.098119 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
20.07921 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
20.075731 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
конфигурации/puma.rb:
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
if env.nil? || env == 'development' || env == 'test'
concurrency = 0 # Set to zero to ensure single mode, not clustered mode
max_threads = 1
end
# WEB_CONCURRENCY and RAILS_MAX_THREADS == 1 in Heroku for now.
concurrency ||= (ENV['WEB_CONCURRENCY'] || 2)
max_threads ||= (ENV['RAILS_MAX_THREADS'] || 5)
worker_timeout 15
workers Integer(concurrency)
threads_count = Integer(max_threads)
threads threads_count, threads_count
#preload_app!
rackup DefaultRackup
port ENV['PORT'] || 3000
environment ENV['RACK_ENV'] || 'development'