Depois de fazer alguns socket
programas assíncronos de "baixo ou baixo nível" (mais ou menos) anos atrás (no modo EAP ) e recentemente mover o "up" para um (APM)TcpListener
(Asynchronous Programming Model ) tentando mudar para (Padrão assíncrono baseado em tarefas (TAP) ). Eu já tive isso com a necessidade de me preocupar com todo esse 'encanamento de baixo nível'. Então eu estava pensando; por que não tentar ( Extensões Reativas ), pois pode se encaixar melhor no domínio do meu problema.async/await
RX
Muito código que escrevo tem a ver com muitos clientes que se conectam pelo Tcp ao meu aplicativo, que iniciam uma comunicação bidirecional (assíncrona). O cliente ou servidor pode, a qualquer momento, decidir que uma mensagem precisa ser enviada e fazê-lo; portanto, essa não é sua request/response
configuração clássica, mas mais uma "linha" bidirecional e em tempo real aberta para ambas as partes enviarem o que quiserem , quando quiserem. (Se alguém tiver um nome decente para descrever isso, ficarei feliz em ouvi-lo!).
O "protocolo" difere por aplicativo (e não é realmente relevante para a minha pergunta). No entanto, tenho uma pergunta inicial:
- Dado que apenas um "servidor" está em execução, mas ele deve acompanhar muitas (geralmente milhares) de conexões (por exemplo, clientes), cada uma tendo (por falta de uma descrição melhor) sua própria "máquina de estado" para acompanhar suas informações internas. estados etc, qual abordagem você prefere? EAP / TAP / APM? O RX seria considerado uma opção? Se não, por que?
Portanto, preciso trabalhar com o Async, pois a) não é um protocolo de solicitação / resposta, portanto, não posso ter um thread / cliente em uma chamada de bloqueio "aguardando mensagem" ou em chamada de envio de mensagem (no entanto, se o envio for bloqueando apenas para esse cliente, eu poderia viver com ele) eb) preciso lidar com muitas conexões simultâneas. Não vejo como fazer isso (de maneira confiável) usando o bloqueio de chamadas.
A maioria dos meus aplicativos está relacionada ao VoiP; ser ele SIP mensagens de coeficientes SIP ou PABX (relacionado) mensagens de aplicativos como freeswitch / OpenSIPS etc. mas você pode, na sua forma mais simples, tente imaginar um "chat" servidor tentando lidar com muitos clientes "chat". A maioria dos protocolos é baseada em texto (ASCII).
Então, depois de ter implementado muitas permutações diferentes das técnicas mencionadas, gostaria de simplificar meu trabalho criando um objeto que eu possa instanciar, informe sobre o que IPEndpoint
ouvir e diga sempre que algo de interesse estiver acontecendo (o que geralmente use events for, portanto, alguns EAP geralmente são misturados com as outras duas técnicas). A turma não deve se incomodar em tentar 'entender' o protocolo; deve apenas lidar com seqüências de entrada / saída. E, assim, de olho no RX, esperando que (no final) simplifique o trabalho, criei um novo "violino" do zero:
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;
class Program
{
static void Main(string[] args)
{
var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
f.Start();
Console.ReadKey();
f.Stop();
Console.ReadKey();
}
}
public class FiddleServer
{
private TcpListener _listener;
private ConcurrentDictionary<ulong, FiddleClient> _clients;
private ulong _currentid = 0;
public IPEndPoint LocalEP { get; private set; }
public FiddleServer(IPEndPoint localEP)
{
this.LocalEP = localEP;
_clients = new ConcurrentDictionary<ulong, FiddleClient>();
}
public void Start()
{
_listener = new TcpListener(this.LocalEP);
_listener.Start();
Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
//OnNext
tcpclient =>
{
//Create new FSClient with unique ID
var fsclient = new FiddleClient(_currentid++, tcpclient);
//Keep track of clients
_clients.TryAdd(fsclient.ClientId, fsclient);
//Initialize connection
fsclient.Send("connect\n\n");
Console.WriteLine("Client {0} accepted", fsclient.ClientId);
},
//OnError
ex =>
{
},
//OnComplete
() =>
{
Console.WriteLine("Client connection initialized");
//Accept new connections
_listener.AcceptTcpClientAsync();
}
);
Console.WriteLine("Started");
}
public void Stop()
{
_listener.Stop();
Console.WriteLine("Stopped");
}
public void Send(ulong clientid, string rawmessage)
{
FiddleClient fsclient;
if (_clients.TryGetValue(clientid, out fsclient))
{
fsclient.Send(rawmessage);
}
}
}
public class FiddleClient
{
private TcpClient _tcpclient;
public ulong ClientId { get; private set; }
public FiddleClient(ulong id, TcpClient tcpclient)
{
this.ClientId = id;
_tcpclient = tcpclient;
}
public void Send(string rawmessage)
{
Console.WriteLine("Sending {0}", rawmessage);
var data = Encoding.ASCII.GetBytes(rawmessage);
_tcpclient.GetStream().WriteAsync(data, 0, data.Length); //Write vs WriteAsync?
}
}
Estou ciente de que, neste "violino", há um pequeno detalhe específico da implementação; neste caso, estou trabalhando com o FreeSwitch ESL, para que o problema "connect\n\n"
, ao refatorar para uma abordagem mais genérica, seja removido.
Também estou ciente de que preciso refatorar os métodos anônimos para métodos de instância privada na classe Server; Só não tenho certeza de qual convenção (por exemplo, " OnSomething
" por exemplo) usar para os nomes dos métodos?
Esta é a minha base / ponto de partida / fundamento (que precisa de alguns "ajustes"). Eu tenho algumas perguntas sobre isso:
- Veja a pergunta "1" acima
- Estou no caminho certo? Ou minhas decisões de "design" são injustas?
- Em termos de concorrência: isso lidaria com milhares de clientes (analisando / manipulando as mensagens reais de lado)
- Sobre exceções: não sei como obter exceções geradas nos clientes "up" para o servidor ("RX-wise"); qual seria um bom caminho?
- Agora posso obter qualquer cliente conectado da minha classe de servidor (usando-o
ClientId
), supondo que eu exponha os clientes de uma maneira ou de outra e chame os métodos diretamente. Também posso chamar métodos por meio da classe Server (por exemplo, oSend(clientId, rawmessage)
método (enquanto a última abordagem seria um método de "conveniência" para obter rapidamente uma mensagem para o outro lado)). - Não tenho muita certeza de onde (e como) ir a partir daqui:
- a) Eu preciso lidar com mensagens recebidas; como eu configuraria isso? Posso obter o fluxo de curso, mas onde lidaria com a recuperação dos bytes recebidos? Acho que preciso de algum tipo de "ObservableStream" - algo em que possa me inscrever? Eu colocaria isso no
FiddleClient
orFiddleServer
? - b) Supondo que eu queira evitar o uso de eventos até que essas
FiddleClient
/FiddleServer
classes sejam implementadas mais especificamente para adaptar seu tratamento de protocolo específico à aplicação, etc. usando mais classesFooClient
/ específicasFooServer
: como eu passaria da obtenção dos dados nas classes 'Fiddle' subjacentes para suas contrapartes mais específicas?
- a) Eu preciso lidar com mensagens recebidas; como eu configuraria isso? Posso obter o fluxo de curso, mas onde lidaria com a recuperação dos bytes recebidos? Acho que preciso de algum tipo de "ObservableStream" - algo em que possa me inscrever? Eu colocaria isso no
Artigos / links que eu já li / procurei / usei como referência:
- Consumir um soquete usando extensão reativa
- Criando e assinando seqüências observáveis simples
- Como adicionar ou associar o contexto a cada Conexão / Sessão ObservableTcpListener
- Programação assíncrona com o Reactive Framework e a Task Parallel Library - Parte 1 , 2 e 3 .
- Usando extensões reativas (Rx) para programação de soquetes é prático?
- Um servidor Web orientado ao .NET Rx e um servidor Web orientado ao .NET Rx, faça 2
- Alguns tutoriais de Rx
Respostas:
Depois de ler essa afirmação, pensei imediatamente em "atores". Os atores são muito parecidos com os objetos, exceto que eles têm apenas uma única entrada na qual você passa a mensagem (em vez de chamar diretamente os métodos do objeto) e eles operam de forma assíncrona. Em um exemplo muito simplificado ... você criaria o ator e enviaria uma mensagem com o IPEndpoint e o endereço do ator para o qual enviar o resultado. Ele apaga e funciona em segundo plano. Você só ouve quando "algo de interesse" acontece. Você pode instanciar quantos atores precisar para lidar com a carga.
Eu não estou familiarizado com nenhuma biblioteca de atores em .Net, embora eu saiba que existem alguns. Eu estou familiarizado com a biblioteca TPL Dataflow (haverá uma seção sobre ela no meu livro http://DataflowBook.com ) e deve ser fácil implementar um modelo de ator simples com essa biblioteca.
fonte