2013-07-15 3 views
0

Я пытаюсь создать библиотеку STOMP, которая может быть использована для подключения к RabbitMQ из хром-приложений. Мои первоначальные эксперименты из внешнего хрома работали хорошо. Однако я не могу преобразовать код Socket для работы из хром-приложений. Можно ли изменить код клиента api ниже для работы из хром-приложений?Клиент STOMP для приложений Chrome

ответ

1

Мои первоначальные эксперименты были успешными. Мне бы хотелось увидеть полный Socket API в библиотеках dart на стороне клиента, чтобы мы могли использовать такой код непосредственно из приложений Chrome. Хотя есть некоторые попытки предоставить Socket API с использованием возможностей js socket для хром, он не так чист, как тот, который в пакете dart.io доступен для кода на стороне сервера.

Вот код. Чтобы использовать его, разверните RabbitMQ и включите плагин STOMP.

import 'dart:io'; 
import 'dart:async'; 

void main() { 
    List<String> versions = ["1.2","1.1","1.0"]; 
    String host = "127.0.0.1"; // localhost 
    int port = 61613; // rabbitmq default port 
    Socket.connect(host, 61613).then((connection) { 
    String hostpath="/"; 
    String login = "guest"; // rabbitmq default login 
    String passcode = "guest"; // rabbitmq default passcode 
    stomp(connection, versions, hostpath, login, passcode); // stomp connect 
    connection 
     .transform(new StompTransformer()) 
     .listen((frame) { 
      if(frame.headers.containsKey("ack")) { 
      ack(connection, frame.headers["ack"]); 
      } 
      dumpStompFrame(frame); 
      stdout.write("enter a message> "); 
     }, 
     onDone:() { print("done"); }, 
     onError: (e) { print (e); }); 
    subscribe(connection, "/queue/a", 1, "client", true); 
    stdin 
     .transform(new StringDecoder()) 
     .transform(new LineTransformer()) 
     .listen((line) { 
     send(connection, "/queue/a", line); 
     }); 
    }); 
} 

/* 
* STOMP 
* accept-version:1.0,1.1,2.0 
* host:/ 
* 
* ^@ 
*/ 

void stomp(Socket connection, 
    List<String> versions, 
    String hostpath, 
    String login, String passcode) { 
    connection.writeln("STOMP"); 
    if(versions.length > 0) { 
    connection.writeln("accept-version:${versions.join(',')}"); 
    } 
    connection.writeln("host:$hostpath"); 
    connection.writeln("login:$login"); 
    connection.writeln("passcode:$passcode"); 
    connection.writeln(); 
    connection.add([0x00]); 
} 

/* 
* SUBSCRIBE 
* id:0 
* destination:/queue/foo 
* ack:client 
* 
* ^@ 
*/ 
void subscribe(Socket connection, 
    String destination, 
    int id, String ack, bool persistent) { 
    connection.writeln("SUBSCRIBE"); 
    connection.writeln("id:$id"); 
    connection.writeln("destination:$destination"); 
    connection.writeln("ack:$ack"); 
    connection.writeln("persistent:$persistent"); 
    connection.writeln(); 
    connection.add([0x00]); 
} 

/* 
* UNSUBSCRIBE 
* id:0 
* 
* ^@ 
*/ 
void unsubscribe(Socket connection, int id) { 
    connection.writeln("UNSUBSCRIBE"); 
    connection.writeln("id:$id"); 
    connection.writeln(); 
    connection.add([0x00]); 
} 

/* 
* ACK 
* id:12345 
* transaction:tx1 
* 
* ^@ 
*/ 
void ack(Socket connection, String id, [String transaction]) { 
    connection.writeln("ACK"); 
    connection.writeln("id:$id"); 
    if(?transaction) { 
    connection.writeln("transaction:$transaction"); 
    } 
    connection.writeln(); 
    connection.add([0x00]); 
} 

/* 
* NACK 
* id:12345 
* transaction:tx1 
* 
* ^@ 
*/ 
void nack(Socket connection, String id, [String transaction]) { 
    connection.writeln("NACK"); 
    connection.writeln("id:$id"); 
    if(?transaction) { 
    connection.writeln("transaction:$transaction"); 
    } 
    connection.writeln(); 
    connection.add([0x00]); 
} 

/* 
* BEGIN 
* transaction:tx1 
* 
* ^@ 
*/ 
void begin(Socket connection, String transaction) { 
    connection.writeln("BEGIN"); 
    connection.writeln("transaction:$transaction"); 
    connection.writeln(); 
    connection.add([0x00]); 
} 

/* 
* COMMIT 
* transaction:tx1 
* 
* ^@ 
*/ 
void commit(Socket connection, String transaction) { 
    connection.writeln("COMMIT"); 
    connection.writeln("transaction:$transaction"); 
    connection.writeln(); 
    connection.add([0x00]); 
} 

/* 
* ABORT 
* transaction:tx1 
* 
* ^@ 
*/ 
void abort(Socket connection, String transaction) { 
    connection.writeln("ABORT"); 
    connection.writeln("transaction:$transaction"); 
    connection.writeln(); 
    connection.add([0x00]); 
} 

/* 
* SEND 
* destination:/queue/a 
* content-type:text/plain 
* 
* hello queue a 
* ^@ 
*/ 
void send(Socket connection, String queue, String message) { 
    connection.writeln("SEND"); 
    connection.writeln("destination:$queue"); 
    connection.writeln("content-type:text/plain"); 
    connection.writeln(); 
    connection.write(message); 
    connection.add([0x00]); 
} 

class StompServerFrame { 
    String frame; 
    Map<String, String> headers = new Map<String, String>(); 
    List<int> body = new List<int>(); 
    String toString() { 
    StringBuffer sb = new StringBuffer(); 
    sb.writeln(frame); 
    for(String key in headers.keys) { 
     sb.writeln("$key=${headers[key]}"); 
    } 
    sb.writeln(new String.fromCharCodes(body)); 
    return sb.toString(); 
    } 
} 

void dumpStompFrame(StompServerFrame frame) { 
    print("BEGIN STOMP FRAME DUMP"); 
    print(frame.toString()); 
    print("END STOMP FRAME DUMP"); 
} 

class StompTransformer extends StreamEventTransformer<List<int>, StompServerFrame> { 
    List<String> serverFrames = ['CONNECTED', 'MESSAGE', 'RECEIPT', 'ERROR']; 
    String state = 'COMMAND'; // 'COMMAND', 'HEADERS', 'BODY' 
    List<int> token = new List<int>(); 
    StompServerFrame stompServerFrame = new StompServerFrame(); 
    StompTransformer() {} 
    int lastValue = -1; 
    void handleData(List<int> intList, EventSink<StompServerFrame> sink) { 
    for(int b in intList) { 
     switch(state) { 
     case 'COMMAND': 
      if(b == 0x0a) { // done with command 
      stompServerFrame.frame = new String.fromCharCodes(token); 
      state = 'HEADERS'; 
      token.clear(); 
      } else { 
      token.add(b); 
      } 
      lastValue = b; 
      break; 
     case 'HEADERS': 
      if(b == 0x0a && lastValue == 0x0a) { // done with all headers 
      state = 'BODY'; 
      token.clear(); 
      lastValue = -1; 
      } else if(b == 0x0a && lastValue != 0x0a) { // done with a header 
      String tokenString = new String.fromCharCodes(token); 
      List<String> tokenStringParts = tokenString.split(":"); 
      if(tokenStringParts.length == 2) { 
       stompServerFrame.headers.putIfAbsent(tokenStringParts.elementAt(0), 
       () => tokenStringParts.elementAt(1)); 
      } else { 
       // possible header format error 
       print("was here with $tokenString"); 
      } 
      token.clear(); 
      lastValue = b; 
      } else { 
      token.add(b); 
      lastValue = b; 
      } 
      break; 
     case 'BODY': 
      if(b == 0x00) { // done with body 
      sink.add(stompServerFrame); 
      stompServerFrame = new StompServerFrame(); 
      state = 'COMMAND'; 
      token.clear(); 
      lastValue = -1; 
      } else { 
      stompServerFrame.body.add(b); 
      } 
      break; 
     default: 
      break; 
     } 
    } 
    } 
}