2013-02-15 2 views
111

Я пытаюсь создать сценарий перехода на шину сообщения Redis с приложением SignalR.Использование SignalR с Redis Messagebus при сбое с использованием ConnectionStils.Connect BookSleeve()

Сначала мы попробовали простой аппаратный перегрузчик балансировки нагрузки, который просто контролировал два сервера Redis. Приложение SignalR указывало на единственную конечную точку HLB. Затем я сбой одного сервера, но не смог успешно получить какие-либо сообщения на втором сервере Redis без утилизации пула приложений SignalR. Предположительно, это связано с тем, что ему необходимо выдать команды настройки новой шине сообщений Redis.

В отношении SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBus использует код RedisConnection() Bookslee для подключения к одному Redis для pub/sub.

Я создал новый класс, RedisMessageBusCluster(), который использует ConnectionUtils.Connect() Booksleeve для подключения к одному в кластере серверов Redis.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using BookSleeve; 
using Microsoft.AspNet.SignalR.Infrastructure; 

namespace Microsoft.AspNet.SignalR.Redis 
{ 
    /// <summary> 
    /// WIP: Getting scaleout for Redis working 
    /// </summary> 
    public class RedisMessageBusCluster : ScaleoutMessageBus 
    { 
     private readonly int _db; 
     private readonly string[] _keys; 
     private RedisConnection _connection; 
     private RedisSubscriberConnection _channel; 
     private Task _connectTask; 

     private readonly TaskQueue _publishQueue = new TaskQueue(); 

     public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver) 
      : base(resolver) 
     { 
      _db = db; 
      _keys = keys.ToArray(); 

      // uses a list of connections 
      _connection = ConnectionUtils.Connect(serverList); 

      //_connection = new RedisConnection(host: server, port: port, password: password); 

      _connection.Closed += OnConnectionClosed; 
      _connection.Error += OnConnectionError; 


      // Start the connection - TODO: can remove this Open as the connection is already opened, but there's the _connectTask is used later on 
      _connectTask = _connection.Open().Then(() => 
      { 
       // Create a subscription channel in redis 
       _channel = _connection.GetOpenSubscriberChannel(); 

       // Subscribe to the registered connections 
       _channel.Subscribe(_keys, OnMessage); 

       // Dirty hack but it seems like subscribe returns before the actual 
       // subscription is properly setup in some cases 
       while (_channel.SubscriptionCount == 0) 
       { 
        Thread.Sleep(500); 
       } 
      }); 
     } 


     protected override Task Send(Message[] messages) 
     { 
      return _connectTask.Then(msgs => 
      { 
       var taskCompletionSource = new TaskCompletionSource<object>(); 

       // Group messages by source (connection id) 
       var messagesBySource = msgs.GroupBy(m => m.Source); 

       SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); 

       return taskCompletionSource.Task; 
      }, 
      messages); 
     } 

     private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource) 
     { 
      if (!enumerator.MoveNext()) 
      { 
       taskCompletionSource.TrySetResult(null); 
      } 
      else 
      { 
       IGrouping<string, Message> group = enumerator.Current; 

       // Get the channel index we're going to use for this message 
       int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; 

       string key = _keys[index]; 

       // Increment the channel number 
       _connection.Strings.Increment(_db, key) 
            .Then((id, k) => 
            { 
             var message = new RedisMessage(id, group.ToArray()); 

             return _connection.Publish(k, message.GetBytes()); 
            }, key) 
            .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource) 
            .ContinueWithNotComplete(taskCompletionSource); 
      } 
     } 

     private void OnConnectionClosed(object sender, EventArgs e) 
     { 
      // Should we auto reconnect? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) 
     { 
      // How do we bubble errors? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnMessage(string key, byte[] data) 
     { 
      // The key is the stream id (channel) 
      var message = RedisMessage.Deserialize(data); 

      _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages)); 
     } 

     protected override void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       if (_channel != null) 
       { 
        _channel.Unsubscribe(_keys); 
        _channel.Close(abort: true); 
       } 

       if (_connection != null) 
       { 
        _connection.Close(abort: true); 
       }     
      } 

      base.Dispose(disposing); 
     } 
    } 
} 

Booksleeve имеет свой собственный механизм для определения хозяина, и будет автоматически переключены на другой сервер, и теперь тестирует с SignalR.Chat.

В web.config я установил список доступных серверов:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/> 

Тогда в Application_Start():

 // Redis cluster server list 
     string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"]; 

     List<string> eventKeys = new List<string>(); 
     eventKeys.Add("SignalR.Redis.FailoverTest"); 
     GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys); 

я добавил два дополнительных методов Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys) 
{ 
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys); 
} 

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys) 
{ 
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver)); 
    resolver.Register(typeof(IMessageBus),() => bus.Value); 

    return resolver; 
} 

Теперь проблема в том, что когда у меня есть несколько точек останова, u ntil после добавления имени пользователя, затем отключите все точки останова, приложение работает должным образом. Тем не менее, когда точки останова отключены с самого начала, похоже, что некоторые условия гонки могут быть неудачными во время процесса соединения.

Таким образом, в RedisMessageCluster():

// Start the connection 
    _connectTask = _connection.Open().Then(() => 
    { 
     // Create a subscription channel in redis 
     _channel = _connection.GetOpenSubscriberChannel(); 

     // Subscribe to the registered connections 
     _channel.Subscribe(_keys, OnMessage); 

     // Dirty hack but it seems like subscribe returns before the actual 
     // subscription is properly setup in some cases 
     while (_channel.SubscriptionCount == 0) 
     { 
      Thread.Sleep(500); 
     } 
    }); 

Я попытался добавить как в Task.Wait, и даже дополнительный Sleep() (не показано выше), - которые ждали/и т.д., но по-прежнему получать ошибки.

Периодическая ошибка, кажется, в Booksleeve.MessageQueue.cs ~ пер 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll 
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821 
    --- End of inner exception stack trace --- 
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<--- 



public void Enqueue(RedisMessage item, bool highPri) 
{ 
    lock (stdPriority) 
    { 
     if (closed) 
     { 
      throw new InvalidOperationException("The queue is closed"); 
     } 

Если исключение закрытой очереди бросают.

Я предвижу еще одну проблему: поскольку соединение Redis сделано в Application_Start(), могут возникнуть некоторые проблемы при «повторном подключении» к другому серверу. Тем не менее, я думаю, что это действительно при использовании единственного RedisConnection(), где есть только одно соединение на выбор. Однако, с интродукцией ConnectionUtils.Connect(), я хотел бы услышать от @dfowler или других участников SignalR, как этот сценарий обрабатывается в SignalR.

+0

Я посмотрю, но: первое, что происходит, это то, что вам не нужно называть 'Open', так как ваше соединение должно * уже * быть открытым. Я не буду смотреть сразу, хотя, когда я готов к полету –

+0

Я считаю, что здесь есть два вопроса. 1) как Booksleeve имеет дело с отказоустойчивостью; 2) Как SignalR использует курсоры для отслеживания клиентов. Когда инициализируется новая шина сообщений, все курсоры из mb1 не выводятся на mb2. Поэтому при сбросе пула приложений SignalR он начнет работать - не раньше, что, очевидно, не является жизнеспособным вариантом. – ElHaix

+2

Ссылка, описывающая, как SignalR использует курсоры: http://stackoverflow.com/questions/13054592/how-does-signalr-redis-work-under-the-hood/13063449#13063449 – ElHaix

ответ

14

Команда SignalR теперь реализовала поддержку фабрики пользовательских подключений с StackExchange.Redis, преемником BookSleeve, который поддерживает избыточные соединения Redis через ConnectionMultiplexer.

Первоначальная проблема заключалась в том, что, несмотря на то, что мои собственные методы расширения в BookSleeve приняли коллекцию серверов, отказ был невозможным.

Теперь, с эволюцией BookSleeve на StackExchange.Redis, теперь мы можем создать коллекцию серверов/портов прямо в инициализации Connect.

Новая реализация намного проще, чем дороги я шел вниз, в создании метода UseRedisCluster и фоновая теперь поддерживает водопровод верно отказоустойчивость:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true"); 

StackExchange.Redis также позволяет дополнительная настройка вручную, как описано в разделе Automatic and Manual Configuration документации:

ConfigurationOptions config = new ConfigurationOptions 
{ 
    EndPoints = 
    { 
     { "redis0", 6379 }, 
     { "redis1", 6380 } 
    }, 
    CommandMap = CommandMap.Create(new HashSet<string> 
    { // EXCLUDE a few commands 
     "INFO", "CONFIG", "CLUSTER", 
     "PING", "ECHO", "CLIENT" 
    }, available: false), 
    KeepAlive = 180, 
    DefaultVersion = new Version(2, 8, 8), 
    Password = "changeme" 
}; 

в сущности, возможность инициализировать нашу масштабную среду SignalR с коллекцией серверов в настоящее время решает начальную proble м.

+0

Должен ли я вознаградить ваш ответ на 500 репрессий? ;) – nicael

+0

Ну, если вы считаете, что теперь это * ответ * :) – ElHaix

+0

@ElHaix, так как вы задали вопрос, вы, вероятно, наиболее квалифицированный, чтобы сказать, является ли ваш ответ окончательным или это просто кусок в головоломке - я предложите добавить предложение, чтобы указать, возможно ли и как оно решило вашу проблему. –