В настоящее время я отправляю сообщения устройств в IoTHub на экземпляре Azure, а затем все сообщения отправляются в EventHub для обработки.Отправка/получение пакетных сообщений на шлюз Azure Protocol
Моя цель - использовать Azure Protocol Cloud Gateway, чтобы выступать в качестве посредника для получения пакетных сообщений, а затем разворачивать их перед отправкой их для обработки. Благодаря размещению сообщений, это позволит мне уменьшить количество передаваемых данных, сократив расходы на использование данных. После того, как данные находятся в облаке, он может быть без сжатия, а затем обработан в обычном режиме.
После некоторых исследований и игры с Gateway на моей локальной машине и использования некоторых тестов Unit, встроенных в решение, я видел, как сообщения отправляются на шлюз/IoTHub.
ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);
Stopwatch sw = Stopwatch.StartNew();
await this.CleanupDeviceQueueAsync(hubConnectionStringBuilder.HostName, device);
var clientScenarios = new ClientScenarios(hubConnectionStringBuilder.HostName, this.deviceId, this.deviceSas);
var group = new MultithreadEventLoopGroup();
string targetHost = this.tlsCertificate.GetNameInfo(X509NameType.DnsName, false);
var readHandler1 = new ReadListeningHandler(CommunicationTimeout);
Bootstrap bootstrap = new Bootstrap()
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(this.ComposeClientChannelInitializer(targetHost, readHandler1));
IChannel clientChannel = await bootstrap.ConnectAsync(this.ServerAddress, protocolGatewayPort);
this.ScheduleCleanup(() => clientChannel.CloseAsync());
Task testWorkTask = Task.Run(async() =>
{ //Where the messaging actually starts and sends
Tuple<EventData, string>[] ehMessages = await CollectEventHubMessagesAsync(receivers, 2); //Async task for recieving messages back from the IoTHub
Tuple<EventData, string> qos0Event = Assert.Single(ehMessages.Where(x => TelemetryQoS0Content.Equals(x.Item2, StringComparison.Ordinal)));
Tuple<EventData, string> qos1Event = Assert.Single(ehMessages.Where(x => TelemetryQoS1Content.Equals(x.Item2, StringComparison.Ordinal)));
string qosPropertyName = ConfigurationManager.AppSettings["QoSPropertyName"];
var qos0Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS0Content));
qos0Notification.Properties[qosPropertyName] = "0";
qos0Notification.Properties["subTopic"] = "tips";
await serviceClient.SendAsync(this.deviceId, qos0Notification);
var qos1Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS1Content));
qos1Notification.Properties["subTopic"] = "firmware-update";
await serviceClient.SendAsync(this.deviceId, qos1Notification);
var qos2Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content));
qos2Notification.Properties[qosPropertyName] = "2";
qos2Notification.Properties["subTopic"] = "critical-alert";
await serviceClient.SendAsync(this.deviceId, qos2Notification);
var qos2Notification2 = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content2));
qos2Notification2.Properties[qosPropertyName] = "2";
await serviceClient.SendAsync(this.deviceId, qos2Notification2);
});
Так что "ServiceClient" отправляет 4 сообщения в этом тестовом модуле: qos0Notification, qos1Notification, qos2Notification, qos2Notification2, и использует метод SendAsync для передачи информации.
Метод SendAsync является частью базового кода для приложения и недоступен для просмотра. Этот метод также принимает DeviceId и Message Object. Сообщение имеет 3 перегрузки для Object: Base, Byte Stream или Byte Array.
После шлюза инициализации он получает это сообщения, используя этот метод:
public override void ChannelRead(IChannelHandlerContext context, object message)
{
var packet = message as Packet;
if (packet == null)
{
CommonEventSource.Log.Warning($"Unexpected message (only `{typeof(Packet).FullName}` descendants are supported): {message}", this.ChannelId);
return;
}
this.lastClientActivityTime = DateTime.UtcNow; // notice last client activity - used in handling disconnects on keep-alive timeout
if (this.IsInState(StateFlags.Connected) || packet.PacketType == PacketType.CONNECT)
{
this.ProcessMessage(context, packet);
}
else
{
if (this.IsInState(StateFlags.ProcessingConnect))
{
Queue<Packet> queue = this.connectPendingQueue ?? (this.connectPendingQueue = new Queue<Packet>(4));
queue.Enqueue(packet);
}
else
{
// we did not start processing CONNECT yet which means we haven't received it yet but the packet of different type has arrived.
ShutdownOnError(context, string.Empty, new InvalidOperationException($"First packet in the session must be CONNECT. Observed: {packet}, channel id: {this.ChannelId}, identity: {this.identity}"));
}
}
}
Я чувствую, что это будет самое лучшее место, чтобы разворачивать любые пакетные сообщения. Как только у нас будет список сообщений, мы отправим их в ProcessMessage, чтобы определить, какое это сообщение и как его обрабатывать.
Кажется, что для этого не так много информации, так как это очень новое.
Каков ваш вопрос? – CSharpRocks