Objetivo
Nesta parte veremos como criar e transferir informações utilizando um TCP Server. A idéia é criar um método personalizado de transferir as informações, buscando melhor desempenho.
Em TCP, fiz dois testes, um que realiza toda a comunicação de uma forma síncrona e outro que o cliente processa as requisições numa segunda thread. A diferença de performance dos dois é significativa.
Não consegui nestes exemplos chegar numa implementação ótima. A idéia é somente ilustrar o método de transferência e já nos dar uma idéia de que é mais complexo que os demais. Ficarei devendo por enquanto uma implementação otimizada, mas depois que publicar a primeira “rodada” de experimentos, pretendo chegar numa implementação melhor.
Servidor
Para variar, o servidor foi implementado numa task MSBuild. O código principal é o seguinte:
public override bool Execute()
{
Log.LogMessage("Listening to TCP Connections on port " + Port.ToString());
IPAddress ipAddress = IPAddress.Any;
TcpListener tcpListener = new TcpListener(ipAddress, Port);
tcpListener.Start();
tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);
while (true)
Thread.Sleep(250);
}
Este código somente deixa o servidor “ouvindo” numa porta (inicialmente usei 8081, tudo é parametrizável na task). Quando receber um conexão, ele começa a processar (de forma assíncrona, no resultado de BeginAcceptSocketCallback) e libera o servidor para receber outra conexão.
O código do método BeginAcceptSocketCallback segue:
private void BeginAcceptSocketCallback(IAsyncResult result)
{
StreamUtil u = new StreamUtil();
TcpListener tcpListener = (TcpListener)result.AsyncState;
TcpClient tcpClient = tcpListener.EndAcceptTcpClient(result);
tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);
Stream clientStream = tcpClient.GetStream();
int headerBytes;
int count = 1;
do {
byte[] header = new byte[8];
headerBytes = clientStream.Read(header, 0, 8);
long size = BitConverter.ToInt64(header, 0);
Log.LogMessage("Processing " + count.ToString());
count++;
MemoryStream ms = ReadBatch(clientStream, size);
if (ms.Length > 0)
WriteOutput(ms, u, clientStream);
else
headerBytes = 0;
}
while (headerBytes > 0);
Log.LogMessage("Finished request.");
}
Este código é executado a cada conexão recebida de um cliente. Convencionei um protocolo simplista entre cliente e servidor. O cliente manda um “long” com o tanto de dados que vai transferir, o servidor lê esse long e passa a receber a quantidade de bytes informada para dentro de uma memory stream. Em seguida, dá esse memory stream para processar pelo ProcessarClientBigRequest. O resultado também é armazenado numa memory stream. Em seguida, o servidor avisa o cliente quantos bytes vão voltar, e o cliente repete o procedimento do lado de lá.
Explicando passo a passo o método acima:
- BeginAcceptTcpClient avisa o servidor que outra conexão pode ser recebida
- No início do “do/while”, a quantidade de bytes fornecida pelo cliente é armazenada em size (recebe em char[] e converte para long)
- O método ReadBatch lê a quantidade de bytes fornecida e devolve o MemoryStream correspondente
- Se chegou algo no MemoryStream, a resposta é fornecida escrita pelo método WriteOutput
- A comunicação termina quando o cliente fala que tem “zero” bytes a transmitir
O código de ReadBatch segue:
private MemoryStream ReadBatch(Stream clientStream, long size)
{
MemoryStream ms = new MemoryStream();
byte[] buffer = new byte[100 * 1024];
long tmpSize = size;
int read;
do{
int bytesToRead = size > buffer.Length ? buffer.Length : (int)size;
read = clientStream.Read(buffer, 0, bytesToRead);
ms.Write(buffer, 0, read);
tmpSize -= read;
}
while (tmpSize > 0);
return ms;
}
Este método é o “feijão com arroz” de trabalhar com Streams. O buffer é um local de armazenamento temporário, apenas para ler em pedaços a stream. Ler de uma vez não é uma boa idéia. Aqui o código poderia ser otimizado, de uma forma que evitasse essa MemoryStream intermediária.
A idéia aqui é simples, vir enchendo o buffer e descarregando o memory stream, até que não exista mais nada pra ler. No final, devolver o memory stream.
O código do WriteOutput segue:
private void WriteOutput(MemoryStream ms, StreamUtil u, Stream clientStream)
{
byte[] header = new byte[sizeof(Int64)];
ms.Seek(0, SeekOrigin.Begin);
MemoryStream outputStream = new MemoryStream();
u.ProcessClientBigRequest(ConnString, ms, outputStream, false, null);
outputStream.Seek(0, SeekOrigin.Begin);
header = BitConverter.GetBytes(outputStream.Length);
clientStream.Write(header, 0, header.Length);
outputStream.CopyTo(clientStream);
}
Esse método, dado um memory stream, faz o processamento (ProcessClientBigRequest) numa memory stream (pra poder saber o tamanho), em seguida, envia o tamanho e copia a memory stream de volta para o cliente.
Essa é toda a lógica do servidor. Ele pode ser executado através do batch runserver-tcptestserver.bat (executa a task MSBuild correspondente).
Cliente – Single Thread
Neste teste, seguimos a seguinte abordagem:
- O cliente manda a quantidade de bytes que vai transferir do lote, em seguida um lote de requests (Ex.: 1000 pedidos)
- O servidor processa e escreve a quantidade de bytes e a resposta
- O cliente processa a resposta, inicia o próximo request
- O processo segue, até o final do lote
O código principal do teste segue:
public override bool Execute()
{
Stopwatch watch = new Stopwatch();
watch.Start();
TcpClient tcpClient = new TcpClient();
tcpClient.Connect(HostName, Port);
Log.LogMessage("Starting TCP transfer (single thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");
NetworkStream stream = tcpClient.GetStream();
int count = 1;
for (int i = 0; i < TotalBatches; i++)
{
Log.LogMessage("Processing " + count.ToString());
MemoryStream ms = GenerateRequest(count);
WriteHeader(ms, stream);
long totalBytes = ReadHeader(stream);
MemoryStream inputStream = ReadResponse(totalBytes, stream);
_util.ImportarStream(ConnString, inputStream);
count += BatchSize;
}
stream.Write(new byte[8], 0, 8);
Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");
tcpClient.Close();
return true;
}
Este código começa iniciando o relógio, a conexão TCP. Para cada lote, executa GenerateRequest para obter o xml com o request, executa WriteHeader (escreve a quantidade de bytes a transferir e o request na stream do servidor), lê a quantidade de bytes retornado pelo servidor, através de ReadHeader. Em seguida, lê a resposta através de ReadResponse e importa o resultado (ImportarStream).
O código de GenerateRequest segue:
private MemoryStream GenerateRequest(int count)
{
MemoryStream ms = new MemoryStream();
_util.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
ms.Seek(0, SeekOrigin.Begin);
return ms;
}
O código de WriteHeader segue:
private void WriteHeader(MemoryStream ms, Stream stream)
{
byte[] header = BitConverter.GetBytes(ms.Length);
stream.Write(header, 0, header.Length);
ms.CopyTo(stream);
}
Sem muito segredo aqui. Envia o tamanho e copia a stream com o request na stream de rede.
O código de ReadHeader:
private long ReadHeader(Stream stream)
{
byte[] header = new byte[sizeof(long)];
int read = stream.Read(header, 0, header.Length);
long totalBytes = BitConverter.ToInt64(header, 0);
return totalBytes;
}
Também sem segredo. O código de ReadResponse:
private MemoryStream ReadResponse(long responseSize, Stream stream)
{
MemoryStream inputStream = new MemoryStream();
long totalBytes = responseSize;
byte[] buffer = new byte[100 * 1024];
int bytesToRead;
int read;
do
{
bytesToRead = totalBytes > buffer.Length ? buffer.Length : (int)totalBytes;
read = stream.Read(buffer, 0, bytesToRead);
inputStream.Write(buffer, 0, read);
totalBytes -= read;
}
while (totalBytes > 0);
inputStream.Seek(0, SeekOrigin.Begin);
return inputStream;
}
Esse é um pouquinho mais complicado. Ele armazena em totalBytes o total da resposta, calcula o quanto precisa ler (o restante, ou um buffer cheio), executa a leitura e escreve na stream temporária (inputStream). Vai decrementando totalBytes a cada leitura. Quando totalBytes chega a 0, é porque leu tudo o que precisava. A stream temporária é retornada pela função.
Cliente – Multi thread
Este teste usa uma abordagem um pouco diferente:
- O cliente inicia uma thread para processar as respostas. Ela fica lá processando eternamente.
- O cliente manda a quantidade de bytes existente no request, em seguida o request
- O cliente recebe a quantidade de bytes da resposta e a resposta.
- Invés de processá-la, o cliente joga numa queue interna e segue enviando requests e responses
- Caso o cliente tenha 10 coisas na fila pra processar ele espera antes de receber as próximas respostas.
- A thread de processamento de respostas, vai desenfileirando as respostas e processando as mesmas
A vantagem deste método é que enquanto aguarda a transferência de rede, o processador não para de trabalhar. Ainda não está na sua forma mais otimizada, mas já é um começo e dá uma diferença significativa nos tempos.
O código principal deste teste segue:
public override bool Execute()
{
Stopwatch watch = new Stopwatch();
watch.Start();
Thread t = new Thread(ProcessMemoryStreamQueue);
t.Start();
Log.LogMessage("Starting TCP transfer (multi thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");
TcpClient tcpClient = new TcpClient();
tcpClient.Connect(HostName, Port);
NetworkStream stream = tcpClient.GetStream();
int count = 1;
for (int i = 0; i < TotalBatches; i++)
{
Log.LogMessage("Processing " + i.ToString());
MemoryStream ms = new MemoryStream();
util.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
ms.Seek(0, SeekOrigin.Begin);
WriteHeader(stream, ms);
//Sends the request
ms.CopyTo(stream);
long totalBytes = GetBytesToRead(stream);
//Sleeps if have 10 responses to process.
while (memoryStreamQueue.Count >= 10)
Thread.Sleep(100);
MemoryStream responseStream = ReadResponse(totalBytes, stream);
//Don't process response. Just queue it. The other thread will process it.
lock (memoryStreamQueue)
memoryStreamQueue.Enqueue(responseStream);
count += BatchSize;
}
//Mark end of transfer.
stream.Write(new byte[8], 0, 8);
tcpClient.Close();
watch.Stop();
t.Abort();
Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");
return true;
}
private void WriteHeader(Stream stream, Stream streamToSend)
{
byte[] header = BitConverter.GetBytes(streamToSend.Length);
stream.Write(header, 0, header.Length);
}
private long GetBytesToRead(Stream stream)
{
byte[] header = new byte[sizeof(long)];
int read = stream.Read(header, 0, header.Length);
return BitConverter.ToInt64(header, 0);
}
private MemoryStream ReadResponse(long responseSize, Stream stream)
{
MemoryStream inputStream = new MemoryStream();
byte[] buffer = new byte[100 * 1024];
int bytesToRead;
int read;
long totalBytes = responseSize;
do
{
bytesToRead = totalBytes > buffer.Length ? buffer.Length : (int)totalBytes;
read = stream.Read(buffer, 0, bytesToRead);
inputStream.Write(buffer, 0, read);
totalBytes -= read;
}
while (totalBytes > 0);
inputStream.Seek(0, SeekOrigin.Begin);
return inputStream;
}
Após fazer todo o código de inicialização, da thread, relógio, conexão TCP, os lotes começam a ser processados. Os métodos WriteHeader, GetBytesToRead e ReadResponse seguem praticamente a mesma lógica do teste anterior. O que muda realmente está no trecho:
lock (memoryStreamQueue)
memoryStreamQueue.Enqueue(responseStream);
Essa memoryStreamQueue é simplesmente uma Queue<MemoryStream> e o uso de lock é necessário para evitar concorrência entre as duas threads.
A thread auxiliar, que faz o processamento executa o seguinte código:
private void ProcessMemoryStreamQueue()
{
while (true)
{
if (memoryStreamQueue.Count > 0)
{
MemoryStream ms = null;
lock (memoryStreamQueue)
{
ms = memoryStreamQueue.Dequeue();
}
util.ImportarStream(ConnString, ms);
}
Thread.Sleep(200);
}
}
Este método é bastante simples. Fica num loop eterno (Dando dormidinhas de 200 mili para evitar um 100% CPU caso não tenha nada pra fazer). Desempilha um memory stream e faz o processamento.
Conclusão
Apesar de aparentar ser muito eficiente, este método (nesta implementação) não foi o mais performático. Obviamente, a versão multi-thread ficou mais rápida, pois otimiza um pouco mais o uso do processador.
Como percebemos também, a implementação não é lá muito simples, apesar de ser possível fazer uma “casca” de servidor e cliente abstraindo somente o conteúdo.
Código-fonte
O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.