2017-02-22 53 views
0

Извините, если у этого вопроса уже есть ответ, но если это так, я не могу найти его на этом сайте.Обработка запросов из нескольких потоков в одном потоке - .NET Core

Во-первых - этот вопрос является специфичным для .NET ядра (v1.1.0 на момент написания)

У меня есть сборка третьей стороны, которая будет обрабатывать только запросы, сделанные к нему, если они происходят в том же потоке , Эта сборка представляет собой библиотеку RabbitMQ и подробную информацию о проблеме, которая может быть или не быть релевантной can be found here. В принципе - у меня есть несколько потоков, которые потенциально могут вызвать эту сборку, но если запросы происходят из разных потоков, то это исключение.

Так что, чтобы обойти это, я пытаюсь создать поток, который заблокирован, поэтому не истекает - и должен каким-то образом иметь все вызовы этой сборки, обработанные в этом потоке.

Моя первая попытка состояла в том, чтобы создать мероприятие и подписаться на мероприятие в заблокированной теме. то любая другая нить начнет - вызовет это событие, которое, как я думал, может быть поднято на правильной нити, чтобы я мог выполнить свое желание иметь запросы на сборку третьей стороны, обрабатываемые в одном потоке.

я теперь (больно так) понимаю, что it is not possible in .NET core to begin-invoke an event in .Net core :(

Пример, который демонстрирует проблему:

public class Program 
{ 
    public static void Main(string[] args) 
    { 
     Program program = new Program(); 
     ManualResetEvent resetEvent = new ManualResetEvent(false); 
     program.StartNewThreadToBeCalled(resetEvent); 

     program.CallBobMultipleTimes(resetEvent); 

     Console.ReadLine(); 
    } 

    private void CallBobMultipleTimes(ManualResetEvent resetEvent) 
    { 
     resetEvent.WaitOne(); 
     for(int i=0 ; i<100 ; i++) 
      ThreadPool.QueueUserWorkItem(x=>CallBob(null, null)); //Can't BeginInvoke in .NET Core 
    } 

    private void StartNewThreadToBeCalled(ManualResetEvent resetEvent) 
    { 
     ThreadPool.QueueUserWorkItem(x=> 
     { 
      Bob bob = new Bob(); 

      CallBob += (obj, e)=> bob.InvokeMeOnOneThreadOnly(); 
      resetEvent.Set();     
      ManualResetEvent mre = new ManualResetEvent(false); 
      mre.WaitOne(); //Real implementation will block forever - this should be the only thread handles the processing of requests. 
     }); 
    } 

    public event EventHandler CallBob; 
} 

public class Bob 
{ 
    private List<int> ThreadIdentifiers = new List<int>(); 
    private static object SyncObject = new object(); 


    public void InvokeMeOnOneThreadOnly() 
    { 
     lock(SyncObject) 
     { 
      int currentThreadId = Thread.CurrentThread.ManagedThreadId; 
      if(ThreadIdentifiers.Any()) 
      { 
       if(!ThreadIdentifiers.Contains(currentThreadId)) 
        Console.WriteLine("Don't call me from multiple threads!"); 
      } 
      else 
       ThreadIdentifiers.Add(currentThreadId); 
     } 
    } 
} 

Я получил эту работу, создавая оболочку вокруг ConcurrentQueue, который оповещает, когда добавляется что-то Я потом обрабатываю это событие на своем специальном потоке сборок третьей стороны и выбираю запросы с этой очереди до тех пор, пока он не исчерпан ... не уверен, почему это работает, когда другой способ не будет ?!

Есть ли лучший способ (чем тот, который я нашел) для меня обрабатывать несколько запросов из нескольких разных потоков в одном потоке в ядре .NET?

+0

Поскольку очередь стандартная реализация потребительских производителей рисунка это не очень удивительно, у ou've получил его работу ... Нет никакого способа узнать, почему «другой способ» не работает, поскольку нет четкого объяснения того, что «другой способ» («путем создания делегатов» - это не один). Если вам нужен ответ, вам нужно прояснить, что вы пробовали для «другого пути» или объяснить, каким образом вы хотите улучшить решение очереди - в текущем состоянии сообщение слишком велико. –

+0

@Alexei Я собирался уйти с работы, когда я разместил - так что не было времени, чтобы получить более подробный пост, я попытаюсь расширить пример кода, когда я получу некоторое свободное время позже. – Jay

+0

@AlexeiLevenkov Я обновил фрагмент кода, который объясняет возникшую у меня проблему. Забыв что-нибудь еще, что я попробовал, теперь возникает вопрос о моей первоначальной проблеме, репо, которая сводит на нет код, который у меня есть к исходной проблеме, и объясняет, что я сделал, чтобы он работал. Мой вопрос действительно - каков правильный способ сделать эту работу? То, что я сделал с concurrentQueue, кажется взломанным. – Jay

ответ

2

Сложно сказать, что вы пытаетесь сделать из кода. Но другой способ - использовать BlockingCollection<T>.Фоновый поток может добавить значения в эту коллекцию и один поток мог сидеть и попытаться получить значения из коллекции, например:

while(continueProcessingCollection) 
{ 
    if(blockingCollection.TryTake(out value, TimeSpan.FromSeconds(myTimeout))) 
    { 
     // process value, decide to toggle continueProcessingCollection 
    } 
} 

фона нить может добавить новые значения в коллекцию блокирующей:

blockingCollection.Add(newValue); 

И, где-то нужно создать переменную BlockingCollection:

var blockingCollection = new BlockingCollection<MyType>(); 
+0

Я только что посмотрел это на MSDN, так что я понимаю (теперь), что если вы вызовете TryTake(), он будет блокироваться до тех пор, пока в коллекцию не будет добавлено что-то? Затем, когда что-то добавляется, оно будет разблокировано и код будет продолжен? – Jay

+0

@jay он вернет true, если в коллекции есть что-то, или false, если оно тайм-аут. Вам нужно знать, какие критерии остановить цикл (тайм-аут, количество обработанных вещей и т. Д.). Но он будет блокировать только до тех пор, пока ваш тайм-аут (я использовал 1 секунду для моего примера). –

+0

Тогда это именно то, что мне нужно - намного лучше, чем моя хакерская параллельная очередь. Благодаря! – Jay

1

delegateBeginInvoke использует threadpool запланировать callback, который не решит вашу проблему. Это в основном проблема Producer–consumer с несколькими производителями и одним потребителем. .NET core содержит some structures реализация IProducerConsumerCollection<T> вы можете использовать для решения этой проблемы.

Не путайте с Control.BeginInvoke (winform) и Dispatcher.BeginInvoke (WPF), которые ставят очередь обратного вызова в потоке графического интерфейса.

+0

Я посмотрю на этот шаблон, поскольку я не знал, что для этого есть интерфейс .net. Спасибо, я дам вам знать, как я нахожусь – Jay