Usando SignalR com failover de messagebus Redis usando ConnectionUtils.Connect () do BookSleeve

112

Estou tentando criar um cenário de failover de barramento de mensagem Redis com um aplicativo SignalR.

No início, tentamos um failover de balanceador de carga de hardware simples, que simplesmente monitorava dois servidores Redis. O aplicativo SignalR apontou para o terminal HLB singular. Em seguida, falhei em um servidor, mas não consegui obter nenhuma mensagem com êxito no segundo servidor Redis sem reciclar o pool de aplicativos SignalR. Presumivelmente, isso ocorre porque ele precisa emitir os comandos de configuração para o novo barramento de mensagens do Redis.

A partir do SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBususa Booksleeve RedisConnection()para se conectar a um único Redis para pub / sub.

Eu criei uma nova classe, RedisMessageBusCluster()que usa Booksleeve ConnectionUtils.Connect()para se conectar a uma em um cluster de servidores Redis.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;

namespace Microsoft.AspNet.SignalR.Redis
{
    /// <summary>
    /// WIP:  Getting scaleout for Redis working
    /// </summary>
    public class RedisMessageBusCluster : ScaleoutMessageBus
    {
        private readonly int _db;
        private readonly string[] _keys;
        private RedisConnection _connection;
        private RedisSubscriberConnection _channel;
        private Task _connectTask;

        private readonly TaskQueue _publishQueue = new TaskQueue();

        public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
            : base(resolver)
        {
            _db = db;
            _keys = keys.ToArray();

            // uses a list of connections
            _connection = ConnectionUtils.Connect(serverList);

            //_connection = new RedisConnection(host: server, port: port, password: password);

            _connection.Closed += OnConnectionClosed;
            _connection.Error += OnConnectionError;


            // Start the connection - TODO:  can remove this Open as the connection is already opened, but there's the _connectTask is used later on
            _connectTask = _connection.Open().Then(() =>
            {
                // Create a subscription channel in redis
                _channel = _connection.GetOpenSubscriberChannel();

                // Subscribe to the registered connections
                _channel.Subscribe(_keys, OnMessage);

                // Dirty hack but it seems like subscribe returns before the actual
                // subscription is properly setup in some cases
                while (_channel.SubscriptionCount == 0)
                {
                    Thread.Sleep(500);
                }
            });
        }


        protected override Task Send(Message[] messages)
        {
            return _connectTask.Then(msgs =>
            {
                var taskCompletionSource = new TaskCompletionSource<object>();

                // Group messages by source (connection id)
                var messagesBySource = msgs.GroupBy(m => m.Source);

                SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);

                return taskCompletionSource.Task;
            },
            messages);
        }

        private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
        {
            if (!enumerator.MoveNext())
            {
                taskCompletionSource.TrySetResult(null);
            }
            else
            {
                IGrouping<string, Message> group = enumerator.Current;

                // Get the channel index we're going to use for this message
                int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;

                string key = _keys[index];

                // Increment the channel number
                _connection.Strings.Increment(_db, key)
                                   .Then((id, k) =>
                                   {
                                       var message = new RedisMessage(id, group.ToArray());

                                       return _connection.Publish(k, message.GetBytes());
                                   }, key)
                                   .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
                                   .ContinueWithNotComplete(taskCompletionSource);
            }
        }

        private void OnConnectionClosed(object sender, EventArgs e)
        {
            // Should we auto reconnect?
            if (true)
            {
                ;
            }
        }

        private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
        {
            // How do we bubble errors?
            if (true)
            {
                ;
            }
        }

        private void OnMessage(string key, byte[] data)
        {
            // The key is the stream id (channel)
            var message = RedisMessage.Deserialize(data);

            _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (_channel != null)
                {
                    _channel.Unsubscribe(_keys);
                    _channel.Close(abort: true);
                }

                if (_connection != null)
                {
                    _connection.Close(abort: true);
                }                
            }

            base.Dispose(disposing);
        }
    }
}

Booksleeve tem seu próprio mecanismo para determinar um mestre e fará failover automaticamente para outro servidor, e agora estou testando isso com SignalR.Chat.

Em web.config, eu defino a lista de servidores disponíveis:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

Então em Application_Start():

        // Redis cluster server list
        string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];

        List<string> eventKeys = new List<string>();
        eventKeys.Add("SignalR.Redis.FailoverTest");
        GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

Eu adicionei dois métodos adicionais para Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
    resolver.Register(typeof(IMessageBus), () => bus.Value);

    return resolver;
}

Agora o problema é que quando eu tenho vários pontos de interrupção habilitados, até que um nome de usuário tenha sido adicionado e, em seguida, desabilito todos os pontos de interrupção, o aplicativo funciona como esperado. No entanto, com os pontos de interrupção desabilitados desde o início, parece haver alguma condição de corrida que pode estar falhando durante o processo de conexão.

Assim, em RedisMessageCluster():

    // Start the connection
    _connectTask = _connection.Open().Then(() =>
    {
        // Create a subscription channel in redis
        _channel = _connection.GetOpenSubscriberChannel();

        // Subscribe to the registered connections
        _channel.Subscribe(_keys, OnMessage);

        // Dirty hack but it seems like subscribe returns before the actual
        // subscription is properly setup in some cases
        while (_channel.SubscriptionCount == 0)
        {
            Thread.Sleep(500);
        }
    });

Tentei adicionar um Task.Waite até um adicional Sleep()(não mostrado acima) - que estavam esperando / etc, mas ainda obtinham erros.

O erro recorrente parece estar em Booksleeve.MessageQueue.cs~ ln 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
   --- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---



public void Enqueue(RedisMessage item, bool highPri)
{
    lock (stdPriority)
    {
        if (closed)
        {
            throw new InvalidOperationException("The queue is closed");
        }

Onde uma exceção de fila fechada está sendo lançada.

Prevejo outro problema: como a conexão do Redis é feita Application_Start(), pode haver alguns problemas ao "reconectar" a outro servidor. No entanto, acho que isso é válido quando se usa o singular RedisConnection(), onde há apenas uma conexão para escolher. No entanto, com a instrução de ConnectionUtils.Connect(), gostaria de ouvir de @dfowlerou de outros caras do SignalR como esse cenário é tratado no SignalR.

ElHaix
fonte
Vou dar uma olhada, mas: a primeira coisa que ocorre é que você não precisa ligar Openporque a conexão que você tem deve estar aberta. Não vou conseguir olhar imediatamente, pois estou me preparando para um voo
Marc Gravell
Eu acredito que há duas questões aqui. 1) como Booksleeve está lidando com um failover; 2) Como SignalR usa cursores para rastrear clientes. Quando um novo barramento de mensagem é inicializado, todos os cursores de mb1 não existem em mb2. Portanto, ao redefinir o pool de aplicativos SignalR, ele começará a funcionar - não antes, o que obviamente não é uma opção viável.
ElHaix
2
Link que descreve como SignalR usa cursores: stackoverflow.com/questions/13054592/…
ElHaix
Tente usar a versão mais recente do barramento de mensagem redis. Ele oferece suporte à transmissão de uma fábrica de conexões e à tentativa de conexão novamente quando o servidor fica inativo.
davidfowl
Você tem um link para as notas de lançamento? Obrigado.
ElHaix de

Respostas:

13

A equipe SignalR agora implementou suporte para uma fábrica de conexão personalizada com StackExchange.Redis , o sucessor do BookSleeve, que oferece suporte a conexões Redis redundantes por meio de ConnectionMultiplexer.

O problema inicial encontrado foi que, apesar de criar meus próprios métodos de extensão no BookSleeve para aceitar uma coleção de servidores, o failover não foi possível.

Agora, com a evolução do BookSleeve para StackExchange.Redis, podemos configurar a coleção de servidores / portas logo na Connectinicialização.

A nova implementação é muito mais simples do que o caminho que eu estava trilhando, na criação de um UseRedisClustermétodo, e o pluming de back-end agora oferece suporte a failover verdadeiro:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis também permite configuração manual adicional, conforme descrito na Automatic and Manual Configurationseção da documentação:

ConfigurationOptions config = new ConfigurationOptions
{
    EndPoints =
    {
        { "redis0", 6379 },
        { "redis1", 6380 }
    },
    CommandMap = CommandMap.Create(new HashSet<string>
    { // EXCLUDE a few commands
        "INFO", "CONFIG", "CLUSTER",
        "PING", "ECHO", "CLIENT"
    }, available: false),
    KeepAlive = 180,
    DefaultVersion = new Version(2, 8, 8),
    Password = "changeme"
};

Em essência, a capacidade de inicializar nosso ambiente de scale-out SignalR com uma coleção de servidores agora resolve o problema inicial.

ElHaix
fonte
Devo recompensar sua resposta com recompensa de 500 representantes? ;)
nicael
Bem, se você acredita que essa é a resposta :)
ElHaix
@ElHaix, já que você fez a pergunta, provavelmente você está mais qualificado para dizer se sua resposta é conclusiva ou se é apenas uma peça do quebra-cabeça - sugiro adicionar uma frase para indicar se e possivelmente como isso resolveu seu problema
Lars Höppner
Assim? Prêmio recompensa? Ou posso esperar até que atraia mais atenção.
nicael
Estou faltando alguma coisa ou está apenas em um branch de recurso, não no pacote nuget principal (2.1)? Além disso, parece que no branch bug- stackexchange ( github.com/SignalR/SignalR/tree/bug-stackexchange/src/… ), ainda não há uma maneira na classe RedisScaleoutConfiguration de fornecer seu próprio multiplexador.
Steve