Я пытаюсь создать библиотеку STOMP, которая может быть использована для подключения к RabbitMQ из хром-приложений. Мои первоначальные эксперименты из внешнего хрома работали хорошо. Однако я не могу преобразовать код Socket для работы из хром-приложений. Можно ли изменить код клиента api ниже для работы из хром-приложений?Клиент STOMP для приложений Chrome
0
A
ответ
1
Мои первоначальные эксперименты были успешными. Мне бы хотелось увидеть полный Socket API в библиотеках dart на стороне клиента, чтобы мы могли использовать такой код непосредственно из приложений Chrome. Хотя есть некоторые попытки предоставить Socket API с использованием возможностей js socket для хром, он не так чист, как тот, который в пакете dart.io доступен для кода на стороне сервера.
Вот код. Чтобы использовать его, разверните RabbitMQ и включите плагин STOMP.
import 'dart:io';
import 'dart:async';
void main() {
List<String> versions = ["1.2","1.1","1.0"];
String host = "127.0.0.1"; // localhost
int port = 61613; // rabbitmq default port
Socket.connect(host, 61613).then((connection) {
String hostpath="/";
String login = "guest"; // rabbitmq default login
String passcode = "guest"; // rabbitmq default passcode
stomp(connection, versions, hostpath, login, passcode); // stomp connect
connection
.transform(new StompTransformer())
.listen((frame) {
if(frame.headers.containsKey("ack")) {
ack(connection, frame.headers["ack"]);
}
dumpStompFrame(frame);
stdout.write("enter a message> ");
},
onDone:() { print("done"); },
onError: (e) { print (e); });
subscribe(connection, "/queue/a", 1, "client", true);
stdin
.transform(new StringDecoder())
.transform(new LineTransformer())
.listen((line) {
send(connection, "/queue/a", line);
});
});
}
/*
* STOMP
* accept-version:1.0,1.1,2.0
* host:/
*
* ^@
*/
void stomp(Socket connection,
List<String> versions,
String hostpath,
String login, String passcode) {
connection.writeln("STOMP");
if(versions.length > 0) {
connection.writeln("accept-version:${versions.join(',')}");
}
connection.writeln("host:$hostpath");
connection.writeln("login:$login");
connection.writeln("passcode:$passcode");
connection.writeln();
connection.add([0x00]);
}
/*
* SUBSCRIBE
* id:0
* destination:/queue/foo
* ack:client
*
* ^@
*/
void subscribe(Socket connection,
String destination,
int id, String ack, bool persistent) {
connection.writeln("SUBSCRIBE");
connection.writeln("id:$id");
connection.writeln("destination:$destination");
connection.writeln("ack:$ack");
connection.writeln("persistent:$persistent");
connection.writeln();
connection.add([0x00]);
}
/*
* UNSUBSCRIBE
* id:0
*
* ^@
*/
void unsubscribe(Socket connection, int id) {
connection.writeln("UNSUBSCRIBE");
connection.writeln("id:$id");
connection.writeln();
connection.add([0x00]);
}
/*
* ACK
* id:12345
* transaction:tx1
*
* ^@
*/
void ack(Socket connection, String id, [String transaction]) {
connection.writeln("ACK");
connection.writeln("id:$id");
if(?transaction) {
connection.writeln("transaction:$transaction");
}
connection.writeln();
connection.add([0x00]);
}
/*
* NACK
* id:12345
* transaction:tx1
*
* ^@
*/
void nack(Socket connection, String id, [String transaction]) {
connection.writeln("NACK");
connection.writeln("id:$id");
if(?transaction) {
connection.writeln("transaction:$transaction");
}
connection.writeln();
connection.add([0x00]);
}
/*
* BEGIN
* transaction:tx1
*
* ^@
*/
void begin(Socket connection, String transaction) {
connection.writeln("BEGIN");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* COMMIT
* transaction:tx1
*
* ^@
*/
void commit(Socket connection, String transaction) {
connection.writeln("COMMIT");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* ABORT
* transaction:tx1
*
* ^@
*/
void abort(Socket connection, String transaction) {
connection.writeln("ABORT");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* SEND
* destination:/queue/a
* content-type:text/plain
*
* hello queue a
* ^@
*/
void send(Socket connection, String queue, String message) {
connection.writeln("SEND");
connection.writeln("destination:$queue");
connection.writeln("content-type:text/plain");
connection.writeln();
connection.write(message);
connection.add([0x00]);
}
class StompServerFrame {
String frame;
Map<String, String> headers = new Map<String, String>();
List<int> body = new List<int>();
String toString() {
StringBuffer sb = new StringBuffer();
sb.writeln(frame);
for(String key in headers.keys) {
sb.writeln("$key=${headers[key]}");
}
sb.writeln(new String.fromCharCodes(body));
return sb.toString();
}
}
void dumpStompFrame(StompServerFrame frame) {
print("BEGIN STOMP FRAME DUMP");
print(frame.toString());
print("END STOMP FRAME DUMP");
}
class StompTransformer extends StreamEventTransformer<List<int>, StompServerFrame> {
List<String> serverFrames = ['CONNECTED', 'MESSAGE', 'RECEIPT', 'ERROR'];
String state = 'COMMAND'; // 'COMMAND', 'HEADERS', 'BODY'
List<int> token = new List<int>();
StompServerFrame stompServerFrame = new StompServerFrame();
StompTransformer() {}
int lastValue = -1;
void handleData(List<int> intList, EventSink<StompServerFrame> sink) {
for(int b in intList) {
switch(state) {
case 'COMMAND':
if(b == 0x0a) { // done with command
stompServerFrame.frame = new String.fromCharCodes(token);
state = 'HEADERS';
token.clear();
} else {
token.add(b);
}
lastValue = b;
break;
case 'HEADERS':
if(b == 0x0a && lastValue == 0x0a) { // done with all headers
state = 'BODY';
token.clear();
lastValue = -1;
} else if(b == 0x0a && lastValue != 0x0a) { // done with a header
String tokenString = new String.fromCharCodes(token);
List<String> tokenStringParts = tokenString.split(":");
if(tokenStringParts.length == 2) {
stompServerFrame.headers.putIfAbsent(tokenStringParts.elementAt(0),
() => tokenStringParts.elementAt(1));
} else {
// possible header format error
print("was here with $tokenString");
}
token.clear();
lastValue = b;
} else {
token.add(b);
lastValue = b;
}
break;
case 'BODY':
if(b == 0x00) { // done with body
sink.add(stompServerFrame);
stompServerFrame = new StompServerFrame();
state = 'COMMAND';
token.clear();
lastValue = -1;
} else {
stompServerFrame.body.add(b);
}
break;
default:
break;
}
}
}
}