2010-12-03 7 views
10

Я экспериментирую с ZeroMQ и пытаюсь получить что-то работает. Моя первая мысль заключалась в том, чтобы настроить REP/REQ с помощью транспорта inproc, чтобы увидеть, могу ли я отправлять сообщения между двумя потоками. Большая часть приведенного ниже кода взята из примеров clzmq, но, похоже, не работает.Использование ZeroMQ с C# с транспортом inproc

И сервер, и клиент связаны с транспортом, но когда клиент пытается сделать Send, он блокируется и просто сидит там. У меня нет опыта ZeroMQ, поэтому я не уверен, где искать в первую очередь, любая помощь будет принята с благодарностью. Вот виновный (оскорбительным) код:

using System; 
using System.Diagnostics; 
using System.Threading; 
using NUnit.Framework; 
using ZMQ; 

namespace PostBox 
{ 
    [TestFixture] 
    public class Class1 
    { 

     private const string Address = "inproc://test"; 
     private const uint MessageSize = 10; 
     private const int RoundtripCount = 100; 

     [Test] 
     public void Should() 
     { 
      var clientThread = new Thread(StartClient); 
      clientThread.Start(); 

      var serverThread = new Thread(StartServer); 
      serverThread.Start(); 

      clientThread.Join(); 
      serverThread.Join(); 

      Console.WriteLine("Done with life"); 
     } 

     private void StartServer() 
     { 


      // Initialise 0MQ infrastructure 
      using (var ctx = new Context(1)) 
      { 
       using (var skt = ctx.Socket(SocketType.REP)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Server has bound"); 

        // Bounce the messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         var msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 
         skt.Send(msg); 
        } 
        Thread.Sleep(1000); 
       } 
      } 

      Console.WriteLine("Done with server"); 
     } 

     private void StartClient() 
     { 
      Thread.Sleep(2000); 

      // Initialise 0MQ infrastructure 
      using (var ctx = new Context(1)) 
      { 
       using (var skt = ctx.Socket(SocketType.REQ)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Client has bound"); 

        // Create a message to send. 
        var msg = new byte[MessageSize]; 

        // Start measuring the time. 
        var watch = new Stopwatch(); 
        watch.Start(); 

        // Start sending messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         skt.Send(msg); 
         msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 

         Console.Write("."); 
        } 

        // Stop measuring the time. 
        watch.Stop(); 
        var elapsedTime = watch.ElapsedTicks; 

        // Print out the test parameters. 
        Console.WriteLine("message size: " + MessageSize + " [B]"); 
        Console.WriteLine("roundtrip count: " + RoundtripCount); 

        // Compute and print out the latency. 
        var latency = (double)(elapsedTime)/RoundtripCount/2 * 
         1000000/Stopwatch.Frequency; 
        Console.WriteLine("Your average latency is {0} [us]", 
         latency.ToString("f2")); 
       } 
      } 

      Console.WriteLine("Done with client"); 
     } 

    } 
} 

Edit:

Я получил эту работу с помощью ниже ответ, но он также потребовал, чтобы я изменить Bind к Connect, что имеет смысл, когда вы думаете об этом, поскольку у нас есть привязка сервера к локальному транспорту и клиент, подключающийся к удаленному транспорту. Вот обновленный код:

using System; 
using System.Diagnostics; 
using System.Threading; 
using NUnit.Framework; 
using ZMQ; 

namespace PostBox 
{ 
    [TestFixture] 
    public class Class1 
    { 

     private const string Address = "inproc://test"; 
     private const uint MessageSize = 10; 
     private const int RoundtripCount = 100; 

     private static Context ctx; 

     [Test] 
     public void Should() 
     { 
      using (ctx = new Context(1)) 
      { 
       var clientThread = new Thread(StartClient); 
       clientThread.Start(); 

       var serverThread = new Thread(StartServer); 
       serverThread.Start(); 

       clientThread.Join(); 
       serverThread.Join(); 

       Console.WriteLine("Done with life"); 
      } 
     } 

     private void StartServer() 
     { 
      try 
      { 
       using (var skt = ctx.Socket(SocketType.REP)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Server has bound"); 

        // Bounce the messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         var msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 
         skt.Send(msg); 
        } 
        Thread.Sleep(1000); 
       } 

       Console.WriteLine("Done with server"); 
      } 
      catch (System.Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 

     private void StartClient() 
     { 
      Thread.Sleep(2000); 

      try 
      { 
       // Initialise 0MQ infrastructure 
       using (var skt = ctx.Socket(SocketType.REQ)) 
       { 
        skt.Connect(Address); 

        Console.WriteLine("Client has bound"); 

        // Create a message to send. 
        var msg = new byte[MessageSize]; 

        // Start measuring the time. 
        var watch = new Stopwatch(); 
        watch.Start(); 

        // Start sending messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         skt.Send(msg); 
         msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 

         Console.Write("."); 
        } 

        // Stop measuring the time. 
        watch.Stop(); 
        var elapsedTime = watch.ElapsedTicks; 

        // Print out the test parameters. 
        Console.WriteLine("message size: " + MessageSize + " [B]"); 
        Console.WriteLine("roundtrip count: " + RoundtripCount); 

        // Compute and print out the latency. 
        var latency = (double)(elapsedTime)/RoundtripCount/2 * 
            1000000/Stopwatch.Frequency; 
        Console.WriteLine("Your average latency is {0} [us]", 
             latency.ToString("f2")); 
       } 

       Console.WriteLine("Done with client"); 
      } 
      catch (System.Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 

    } 
} 

ответ

14

Я считаю, что оба потока должны использовать один и тот же контекст. Руководство Zeromq рекомендует не использовать более одного контекста в процессе. Создайте контекст, разделите этот контекст между обоими потоками. Это должно сработать.

От http://zguide.zeromq.org/chapter:all

Вы должны создать «контекст» объект для вашего процесса, и передать в всех потоков. Контекст собирает состояние ØMQ. Чтобы создать соединение через транспортный поток inproc: как сервер, так и клиентский поток должен совместно использовать тот же объект контекста.

+0

Это было очень полезно, спасибо! – jonnii

2

Только один конец может привязываться к другому, необходимо подключиться, вы можете иметь несколько соединений.