2017-01-12 13 views
0

У меня есть рубиновый процесс клиента, который пытается отправить данные службе, построенной на EventMachine. Рабочий процесс прост. Клиент всегда инициирует запрос и всегда ожидает ответ в ответ.Сообщение усечено до 16k отправки из TCPSocket в EventMachine

При отправке данных из TCPSocket он, по-видимому, обрезает данные примерно до 16k. Любые рекомендации о том, что я делаю неправильно, или какие предположения, которые я сделал, мне нужно переосмыслить? Предположительно, я делаю что-то неправильно в EM, где мне нужно накапливать все данные, которые отправляются, но я не уверен, как это сделать.

# client.rb 
def send_messages arr 
    arr = Array(messages) 
    if arr.length > 0 
    logger.debug { "Sending #{ arr.length } messages" } 


    marshaled = Marshal.dump(arr) 
    logger.debug { "Marshaled data #{ marshaled.length } bytes" } # roughly 200k 

    socket = TCPSocket.new(host, port) 
    written = socket.write(marshaled) 
    logger.debug { "Apparently sent #{ written } bytes" } # same size 
    socket.close_write 

    data = socket.read 
    logger.debug { "Received #{ data.length } bytes from service" } 
    Marshal.load(data) 
    end 

rescue Exception => ex 
    logger.error "Client threw exception communicating with service :: #{ ex.message }" 
    raise ex 
ensure 
    socket.close if socket 
end 

Каротаж на стороне клиента выглядит следующим образом:

D, [2017-01-12T10:02:56.908857 #9360] DEBUG -- : Sending 1 messages 
D, [2017-01-12T10:02:56.909907 #9360] DEBUG -- : Marshaled data length 205941 bytes 
D, [2017-01-12T10:02:56.910373 #9360] DEBUG -- : Apparently sent 205941 bytes 
D, [2017-01-12T10:02:56.955270 #9360] DEBUG -- : Received 0 bytes from service 

На стороне сервера ...

class EventedServer < EM::Connection 

    attr_reader :context 

    def initialize context 
    raise "Context must be defined" unless context 
    @context = context 
    end 

    def post_init 
    logger.debug { "-- someone connected to the server" } 
    end 

    def connection_completed 
    logger.debug { "-- connection completed" } 
    end 

    def unbind 
    logger.debug { "-- unbind" } 
    end 

    def receive_data data 
    logger.debug { "-- received data at the server #{ data.length }" } # approx 16k 
    send_data(process(data)) 
    end 

    def process request 

    logger.debug { "-- about to deserialize #{ request.length } bytes" } 
    model = Marshal.load(request) 
    logger.debug { "-- received #{ model.class.name }" } 

    context.process_message(model) 

    # Send data the application needs to stay up to date. 
    response = context.application(app_name).pending_configurations 
    logger.debug { "-- about to send #{ response.keys } for #{ app_name }" } 
    Marshal.dump(response) 
    rescue Exception => e 
    logger.error("** Error in processing request of #{ request.length } bytes") 
    raise e 
    end 
end 

Каротаж на стороне сервера выглядит следующим образом:

D, [2017-01-12T10:02:56.910251 #9330] DEBUG -- : In evented server initialize... 
D, [2017-01-12T10:02:56.910319 #9330] DEBUG -- : -- someone connected to the server 
D, [2017-01-12T10:02:56.910419 #9330] DEBUG -- : -- received data at the server 16384 
D, [2017-01-12T10:02:56.910463 #9330] DEBUG -- : -- about to deserialize 16384 bytes 
E, [2017-01-12T10:02:56.912067 #9330] ERROR -- : ** Error in processing request of 16384 bytes 

ответ

0

Короткий ответ receive_data не ждет возвращения всех данных, как я думал. Требуется буфер. В моем случае любое сообщение размером более 16 тыс. Должно буферизироваться. Мой текущий код выглядит примерно так:

def post_init 
    logger.debug { "-- someone connected to the server" } 
    @buffer = [] 
    logger.debug { "-- initializing message buffer" } 
    end 

    DELIMITER = "\nDELIM\n" 

    def receive_data data 

    # NOTE: actual code checks to ensure data was not split in the delimiter 
    if data.ends_with?(DELIMITER) 
     logger.debug { "-- received data at the server #{ data.length }" } 
     @buffer << data[0...(-1*DELIMITER.length)] 
     joined_data = @buffer.join 
     @buffer = [] 

     logger.debug { "-- total data received #{ joined_data.length }" } 
     send_data(process(joined_data)) 
    else 
     logger.debug { "-- received partial data at the server #{ data.length }" } 
     @buffer << data 
    end 
    end