Objetivo
Nesta parte veremos como criar e transferir informações utilizando um TCP Server. A idéia é criar um método personalizado de transferir as informações, buscando melhor desempenho.
Em TCP, fiz dois testes, um que realiza toda a comunicação de uma forma síncrona e outro que o cliente processa as requisições numa segunda thread. A diferença de performance dos dois é significativa.
Não consegui nestes exemplos chegar numa implementação ótima. A idéia é somente ilustrar o método de transferência e já nos dar uma idéia de que é mais complexo que os demais. Ficarei devendo por enquanto uma implementação otimizada, mas depois que publicar a primeira “rodada” de experimentos, pretendo chegar numa implementação melhor.
Servidor
Para variar, o servidor foi implementado numa task MSBuild. O código principal é o seguinte:
public override bool Execute() { Log.LogMessage("Listening to TCP Connections on port " + Port.ToString()); IPAddress ipAddress = IPAddress.Any; TcpListener tcpListener = new TcpListener(ipAddress, Port); tcpListener.Start(); tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener); while (true) Thread.Sleep(250); }
Este código somente deixa o servidor “ouvindo” numa porta (inicialmente usei 8081, tudo é parametrizável na task). Quando receber um conexão, ele começa a processar (de forma assíncrona, no resultado de BeginAcceptSocketCallback) e libera o servidor para receber outra conexão.
O código do método BeginAcceptSocketCallback segue:
private void BeginAcceptSocketCallback(IAsyncResult result) { StreamUtil u = new StreamUtil(); TcpListener tcpListener = (TcpListener)result.AsyncState; TcpClient tcpClient = tcpListener.EndAcceptTcpClient(result); tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener); Stream clientStream = tcpClient.GetStream(); int headerBytes; int count = 1; do { byte[] header = new byte[8]; headerBytes = clientStream.Read(header, 0, 8); long size = BitConverter.ToInt64(header, 0); Log.LogMessage("Processing " + count.ToString()); count++; MemoryStream ms = ReadBatch(clientStream, size); if (ms.Length > 0) WriteOutput(ms, u, clientStream); else headerBytes = 0; } while (headerBytes > 0); Log.LogMessage("Finished request."); }
Este código é executado a cada conexão recebida de um cliente. Convencionei um protocolo simplista entre cliente e servidor. O cliente manda um “long” com o tanto de dados que vai transferir, o servidor lê esse long e passa a receber a quantidade de bytes informada para dentro de uma memory stream. Em seguida, dá esse memory stream para processar pelo ProcessarClientBigRequest. O resultado também é armazenado numa memory stream. Em seguida, o servidor avisa o cliente quantos bytes vão voltar, e o cliente repete o procedimento do lado de lá.
Explicando passo a passo o método acima:
- BeginAcceptTcpClient avisa o servidor que outra conexão pode ser recebida
- No início do “do/while”, a quantidade de bytes fornecida pelo cliente é armazenada em size (recebe em char[] e converte para long)
- O método ReadBatch lê a quantidade de bytes fornecida e devolve o MemoryStream correspondente
- Se chegou algo no MemoryStream, a resposta é fornecida escrita pelo método WriteOutput
- A comunicação termina quando o cliente fala que tem “zero” bytes a transmitir
O código de ReadBatch segue:
private MemoryStream ReadBatch(Stream clientStream, long size) { MemoryStream ms = new MemoryStream(); byte[] buffer = new byte[100 * 1024]; long tmpSize = size; int read; do{ int bytesToRead = size > buffer.Length ? buffer.Length : (int)size; read = clientStream.Read(buffer, 0, bytesToRead); ms.Write(buffer, 0, read); tmpSize -= read; } while (tmpSize > 0); return ms; }
Este método é o “feijão com arroz” de trabalhar com Streams. O buffer é um local de armazenamento temporário, apenas para ler em pedaços a stream. Ler de uma vez não é uma boa idéia. Aqui o código poderia ser otimizado, de uma forma que evitasse essa MemoryStream intermediária.
A idéia aqui é simples, vir enchendo o buffer e descarregando o memory stream, até que não exista mais nada pra ler. No final, devolver o memory stream.
O código do WriteOutput segue:
private void WriteOutput(MemoryStream ms, StreamUtil u, Stream clientStream) { byte[] header = new byte[sizeof(Int64)]; ms.Seek(0, SeekOrigin.Begin); MemoryStream outputStream = new MemoryStream(); u.ProcessClientBigRequest(ConnString, ms, outputStream, false, null); outputStream.Seek(0, SeekOrigin.Begin); header = BitConverter.GetBytes(outputStream.Length); clientStream.Write(header, 0, header.Length); outputStream.CopyTo(clientStream); }
Esse método, dado um memory stream, faz o processamento (ProcessClientBigRequest) numa memory stream (pra poder saber o tamanho), em seguida, envia o tamanho e copia a memory stream de volta para o cliente.
Essa é toda a lógica do servidor. Ele pode ser executado através do batch runserver-tcptestserver.bat (executa a task MSBuild correspondente).
Cliente – Single Thread
Neste teste, seguimos a seguinte abordagem:
- O cliente manda a quantidade de bytes que vai transferir do lote, em seguida um lote de requests (Ex.: 1000 pedidos)
- O servidor processa e escreve a quantidade de bytes e a resposta
- O cliente processa a resposta, inicia o próximo request
- O processo segue, até o final do lote
O código principal do teste segue:
public override bool Execute() { Stopwatch watch = new Stopwatch(); watch.Start(); TcpClient tcpClient = new TcpClient(); tcpClient.Connect(HostName, Port); Log.LogMessage("Starting TCP transfer (single thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each"); NetworkStream stream = tcpClient.GetStream(); int count = 1; for (int i = 0; i < TotalBatches; i++) { Log.LogMessage("Processing " + count.ToString()); MemoryStream ms = GenerateRequest(count); WriteHeader(ms, stream); long totalBytes = ReadHeader(stream); MemoryStream inputStream = ReadResponse(totalBytes, stream); _util.ImportarStream(ConnString, inputStream); count += BatchSize; } stream.Write(new byte[8], 0, 8); Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds"); tcpClient.Close(); return true; }
Este código começa iniciando o relógio, a conexão TCP. Para cada lote, executa GenerateRequest para obter o xml com o request, executa WriteHeader (escreve a quantidade de bytes a transferir e o request na stream do servidor), lê a quantidade de bytes retornado pelo servidor, através de ReadHeader. Em seguida, lê a resposta através de ReadResponse e importa o resultado (ImportarStream).
O código de GenerateRequest segue:
private MemoryStream GenerateRequest(int count) { MemoryStream ms = new MemoryStream(); _util.GenerateBigRequest(ms, false, count, count + (BatchSize - 1)); ms.Seek(0, SeekOrigin.Begin); return ms; }
O código de WriteHeader segue:
private void WriteHeader(MemoryStream ms, Stream stream) { byte[] header = BitConverter.GetBytes(ms.Length); stream.Write(header, 0, header.Length); ms.CopyTo(stream); }
Sem muito segredo aqui. Envia o tamanho e copia a stream com o request na stream de rede.
O código de ReadHeader:
private long ReadHeader(Stream stream) { byte[] header = new byte[sizeof(long)]; int read = stream.Read(header, 0, header.Length); long totalBytes = BitConverter.ToInt64(header, 0); return totalBytes; }
Também sem segredo. O código de ReadResponse:
private MemoryStream ReadResponse(long responseSize, Stream stream) { MemoryStream inputStream = new MemoryStream(); long totalBytes = responseSize; byte[] buffer = new byte[100 * 1024]; int bytesToRead; int read; do { bytesToRead = totalBytes > buffer.Length ? buffer.Length : (int)totalBytes; read = stream.Read(buffer, 0, bytesToRead); inputStream.Write(buffer, 0, read); totalBytes -= read; } while (totalBytes > 0); inputStream.Seek(0, SeekOrigin.Begin); return inputStream; }
Esse é um pouquinho mais complicado. Ele armazena em totalBytes o total da resposta, calcula o quanto precisa ler (o restante, ou um buffer cheio), executa a leitura e escreve na stream temporária (inputStream). Vai decrementando totalBytes a cada leitura. Quando totalBytes chega a 0, é porque leu tudo o que precisava. A stream temporária é retornada pela função.
Cliente – Multi thread
Este teste usa uma abordagem um pouco diferente:
- O cliente inicia uma thread para processar as respostas. Ela fica lá processando eternamente.
- O cliente manda a quantidade de bytes existente no request, em seguida o request
- O cliente recebe a quantidade de bytes da resposta e a resposta.
- Invés de processá-la, o cliente joga numa queue interna e segue enviando requests e responses
- Caso o cliente tenha 10 coisas na fila pra processar ele espera antes de receber as próximas respostas.
- A thread de processamento de respostas, vai desenfileirando as respostas e processando as mesmas
A vantagem deste método é que enquanto aguarda a transferência de rede, o processador não para de trabalhar. Ainda não está na sua forma mais otimizada, mas já é um começo e dá uma diferença significativa nos tempos.
O código principal deste teste segue:
public override bool Execute() { Stopwatch watch = new Stopwatch(); watch.Start(); Thread t = new Thread(ProcessMemoryStreamQueue); t.Start(); Log.LogMessage("Starting TCP transfer (multi thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each"); TcpClient tcpClient = new TcpClient(); tcpClient.Connect(HostName, Port); NetworkStream stream = tcpClient.GetStream(); int count = 1; for (int i = 0; i < TotalBatches; i++) { Log.LogMessage("Processing " + i.ToString()); MemoryStream ms = new MemoryStream(); util.GenerateBigRequest(ms, false, count, count + (BatchSize - 1)); ms.Seek(0, SeekOrigin.Begin); WriteHeader(stream, ms); //Sends the request ms.CopyTo(stream); long totalBytes = GetBytesToRead(stream); //Sleeps if have 10 responses to process. while (memoryStreamQueue.Count >= 10) Thread.Sleep(100); MemoryStream responseStream = ReadResponse(totalBytes, stream); //Don't process response. Just queue it. The other thread will process it. lock (memoryStreamQueue) memoryStreamQueue.Enqueue(responseStream); count += BatchSize; } //Mark end of transfer. stream.Write(new byte[8], 0, 8); tcpClient.Close(); watch.Stop(); t.Abort(); Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds"); return true; } private void WriteHeader(Stream stream, Stream streamToSend) { byte[] header = BitConverter.GetBytes(streamToSend.Length); stream.Write(header, 0, header.Length); } private long GetBytesToRead(Stream stream) { byte[] header = new byte[sizeof(long)]; int read = stream.Read(header, 0, header.Length); return BitConverter.ToInt64(header, 0); } private MemoryStream ReadResponse(long responseSize, Stream stream) { MemoryStream inputStream = new MemoryStream(); byte[] buffer = new byte[100 * 1024]; int bytesToRead; int read; long totalBytes = responseSize; do { bytesToRead = totalBytes > buffer.Length ? buffer.Length : (int)totalBytes; read = stream.Read(buffer, 0, bytesToRead); inputStream.Write(buffer, 0, read); totalBytes -= read; } while (totalBytes > 0); inputStream.Seek(0, SeekOrigin.Begin); return inputStream; }
Após fazer todo o código de inicialização, da thread, relógio, conexão TCP, os lotes começam a ser processados. Os métodos WriteHeader, GetBytesToRead e ReadResponse seguem praticamente a mesma lógica do teste anterior. O que muda realmente está no trecho:
lock (memoryStreamQueue) memoryStreamQueue.Enqueue(responseStream);
Essa memoryStreamQueue é simplesmente uma Queue<MemoryStream> e o uso de lock é necessário para evitar concorrência entre as duas threads.
A thread auxiliar, que faz o processamento executa o seguinte código:
private void ProcessMemoryStreamQueue() { while (true) { if (memoryStreamQueue.Count > 0) { MemoryStream ms = null; lock (memoryStreamQueue) { ms = memoryStreamQueue.Dequeue(); } util.ImportarStream(ConnString, ms); } Thread.Sleep(200); } }
Este método é bastante simples. Fica num loop eterno (Dando dormidinhas de 200 mili para evitar um 100% CPU caso não tenha nada pra fazer). Desempilha um memory stream e faz o processamento.
Conclusão
Apesar de aparentar ser muito eficiente, este método (nesta implementação) não foi o mais performático. Obviamente, a versão multi-thread ficou mais rápida, pois otimiza um pouco mais o uso do processador.
Como percebemos também, a implementação não é lá muito simples, apesar de ser possível fazer uma “casca” de servidor e cliente abstraindo somente o conteúdo.
Código-fonte
O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.