Integrações entre Sistemas – Parte 6 – TCP Server

Objetivo

Nesta parte veremos como criar e transferir informações utilizando um TCP Server. A idéia é criar um método personalizado de transferir as informações, buscando melhor desempenho.

Em TCP, fiz dois testes, um que realiza toda a comunicação de uma forma síncrona e outro que o cliente processa as requisições numa segunda thread. A diferença de performance dos dois é significativa.

Não consegui nestes exemplos chegar numa implementação ótima. A idéia é somente ilustrar o método de transferência e já nos dar uma idéia de que é mais complexo que os demais. Ficarei devendo por enquanto uma implementação otimizada, mas depois que publicar a primeira “rodada” de experimentos, pretendo chegar numa implementação melhor.

Servidor

Para variar, o servidor foi implementado numa task MSBuild. O código principal é o seguinte:

        public override bool Execute()
        {
            Log.LogMessage("Listening to TCP Connections on port " + Port.ToString());

            IPAddress ipAddress = IPAddress.Any;
            TcpListener tcpListener = new TcpListener(ipAddress, Port);
            tcpListener.Start();
            tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);

            while (true)
                Thread.Sleep(250);                                    
        }

Este código somente deixa o servidor “ouvindo” numa porta (inicialmente usei 8081, tudo é parametrizável na task). Quando receber um conexão, ele começa a processar (de forma assíncrona, no resultado de BeginAcceptSocketCallback) e libera o servidor para receber outra conexão.

O código do método BeginAcceptSocketCallback segue:

        private void BeginAcceptSocketCallback(IAsyncResult result)
        {
            StreamUtil u = new StreamUtil();

            TcpListener tcpListener = (TcpListener)result.AsyncState;
            TcpClient tcpClient = tcpListener.EndAcceptTcpClient(result);
            
            tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);

            Stream clientStream = tcpClient.GetStream();

            int headerBytes;
            int count = 1;

            do {
                byte[] header = new byte[8];
                headerBytes = clientStream.Read(header, 0, 8);
                long size = BitConverter.ToInt64(header, 0);

                Log.LogMessage("Processing " + count.ToString());
                count++;

                MemoryStream ms = ReadBatch(clientStream, size);

                if (ms.Length > 0)               
                    WriteOutput(ms, u, clientStream);                                    
                else               
                    headerBytes = 0;                
            }
            while (headerBytes > 0);
            Log.LogMessage("Finished request.");
        }

Este código é executado a cada conexão recebida de um cliente. Convencionei um protocolo simplista entre cliente e servidor. O cliente manda um “long” com o tanto de dados que vai transferir, o servidor lê esse long e passa a receber a quantidade de bytes informada para dentro de uma memory stream. Em seguida, dá esse memory stream para processar pelo ProcessarClientBigRequest. O resultado também é armazenado numa memory stream. Em seguida, o servidor avisa o cliente quantos bytes vão voltar, e o cliente repete o procedimento do lado de lá.

Explicando passo a passo o método acima:

  • BeginAcceptTcpClient avisa o servidor que outra conexão pode ser recebida
  • No início do “do/while”, a quantidade de bytes fornecida pelo cliente é armazenada em size (recebe em char[] e converte para long)
  • O método ReadBatch lê a quantidade de bytes fornecida e devolve o MemoryStream correspondente
  • Se chegou algo no MemoryStream, a resposta é fornecida escrita pelo método WriteOutput
  • A comunicação termina quando o cliente fala que tem “zero” bytes a transmitir

O código de ReadBatch segue:

        private MemoryStream ReadBatch(Stream clientStream, long size)
        {            
            MemoryStream ms = new MemoryStream();
            byte[] buffer = new byte[100 * 1024];

            long tmpSize = size;

            int read;
            do{
                int bytesToRead = size > buffer.Length ? buffer.Length : (int)size;
                read = clientStream.Read(buffer, 0, bytesToRead);

                ms.Write(buffer, 0, read);

                tmpSize -= read;                
            }
            while (tmpSize > 0);                      

            return ms;
        }

Este método é o “feijão com arroz” de trabalhar com Streams. O buffer é um local de armazenamento temporário, apenas para ler em pedaços a stream. Ler de uma vez não é uma boa idéia. Aqui o código poderia ser otimizado, de uma forma que evitasse essa MemoryStream intermediária.

A idéia aqui é simples, vir enchendo o buffer e descarregando o memory stream, até que não exista mais nada pra ler. No final, devolver o memory stream.

O código do WriteOutput segue:

        private void WriteOutput(MemoryStream ms, StreamUtil u, Stream clientStream)
        {
            byte[] header = new byte[sizeof(Int64)];

            ms.Seek(0, SeekOrigin.Begin);
            MemoryStream outputStream = new MemoryStream();
            u.ProcessClientBigRequest(ConnString, ms, outputStream, false, null);
            outputStream.Seek(0, SeekOrigin.Begin);
            header = BitConverter.GetBytes(outputStream.Length);
            clientStream.Write(header, 0, header.Length);
            outputStream.CopyTo(clientStream);            
        }

Esse método, dado um memory stream, faz o processamento (ProcessClientBigRequest) numa memory stream (pra poder saber o tamanho), em seguida, envia o tamanho e copia a memory stream de volta para o cliente.

Essa é toda a lógica do servidor. Ele pode ser executado através do batch runserver-tcptestserver.bat (executa a task MSBuild correspondente).

Cliente – Single Thread

Neste teste, seguimos a seguinte abordagem:

  • O cliente manda a quantidade de bytes que vai transferir do lote, em seguida um lote de requests (Ex.: 1000 pedidos)
  • O servidor processa e escreve a quantidade de bytes e a resposta
  • O cliente processa a resposta, inicia o próximo request
  • O processo segue, até o final do lote

O código principal do teste segue:

        public override bool Execute()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            TcpClient tcpClient = new TcpClient();
            tcpClient.Connect(HostName, Port);

            Log.LogMessage("Starting TCP transfer (single thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");            

            NetworkStream stream = tcpClient.GetStream();
            int count = 1;
            for (int i = 0; i < TotalBatches; i++)
            {
                Log.LogMessage("Processing " + count.ToString());

                MemoryStream ms = GenerateRequest(count);
                WriteHeader(ms, stream);
                long totalBytes = ReadHeader(stream);

                MemoryStream inputStream = ReadResponse(totalBytes, stream);
                _util.ImportarStream(ConnString, inputStream);

                count += BatchSize;                
            }            
            stream.Write(new byte[8], 0, 8);            

            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");
            tcpClient.Close();

            return true;
        }

Este código começa iniciando o relógio, a conexão TCP. Para cada lote, executa GenerateRequest para obter o xml com o request, executa WriteHeader (escreve a quantidade de bytes a transferir e o request na stream do servidor), lê a quantidade de bytes retornado pelo servidor, através de ReadHeader. Em seguida, lê a resposta através de ReadResponse e importa o resultado (ImportarStream).

O código de GenerateRequest segue:

        private MemoryStream GenerateRequest(int count)
        {
            MemoryStream ms = new MemoryStream();            
            _util.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
            ms.Seek(0, SeekOrigin.Begin);

            return ms;
        }

O código de WriteHeader segue:

        private void WriteHeader(MemoryStream ms, Stream stream)
        {
            byte[] header = BitConverter.GetBytes(ms.Length);
            stream.Write(header, 0, header.Length);
            ms.CopyTo(stream);
        }

Sem muito segredo aqui. Envia o tamanho e copia a stream com o request na stream de rede.

O código de ReadHeader:

        private long ReadHeader(Stream stream)
        {
            byte[] header = new byte[sizeof(long)];
            int read = stream.Read(header, 0, header.Length);
            long totalBytes = BitConverter.ToInt64(header, 0);
            return totalBytes;
        }

Também sem segredo. O código de ReadResponse:

        private MemoryStream ReadResponse(long responseSize, Stream stream)
        {
            MemoryStream inputStream = new MemoryStream();

            long totalBytes = responseSize;
            byte[] buffer = new byte[100 * 1024];
            int bytesToRead;
            int read;
            do
            {
                bytesToRead = totalBytes > buffer.Length ? buffer.Length : (int)totalBytes;
                read = stream.Read(buffer, 0, bytesToRead);
                inputStream.Write(buffer, 0, read);

                totalBytes -= read;                                                
            }
            while (totalBytes > 0);

            inputStream.Seek(0, SeekOrigin.Begin);

            return inputStream;
        }

Esse é um pouquinho mais complicado. Ele armazena em totalBytes o total da resposta, calcula o quanto precisa ler (o restante, ou um buffer cheio), executa a leitura e escreve na stream temporária (inputStream). Vai decrementando totalBytes a cada leitura. Quando totalBytes chega a 0, é porque leu tudo o que precisava. A stream temporária é retornada pela função.

Cliente – Multi thread

Este teste usa uma abordagem um pouco diferente:

  • O cliente inicia uma thread para processar as respostas. Ela fica lá processando eternamente.
  • O cliente manda a quantidade de bytes existente no request, em seguida o request
  • O cliente recebe a quantidade de bytes da resposta e a resposta.
  • Invés de processá-la, o cliente joga numa queue interna e segue enviando requests e responses
  • Caso o cliente tenha 10 coisas na fila pra processar ele espera antes de receber as próximas respostas.
  • A thread de processamento de respostas, vai desenfileirando as respostas e processando as mesmas

A vantagem deste método é que enquanto aguarda a transferência de rede, o processador não para de trabalhar. Ainda não está na sua forma mais otimizada, mas já é um começo e dá uma diferença significativa nos tempos.

O código principal deste teste segue:

        public override bool Execute()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            Thread t = new Thread(ProcessMemoryStreamQueue);
            t.Start();

            Log.LogMessage("Starting TCP transfer (multi thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");            

            TcpClient tcpClient = new TcpClient();
            tcpClient.Connect(HostName, Port);
            NetworkStream stream = tcpClient.GetStream();

            int count = 1;
            for (int i = 0; i < TotalBatches; i++)
            {
                Log.LogMessage("Processing " + i.ToString());

                MemoryStream ms = new MemoryStream();
                
                util.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
                ms.Seek(0, SeekOrigin.Begin);

                WriteHeader(stream, ms);
                
                //Sends the request
                ms.CopyTo(stream);

                long totalBytes = GetBytesToRead(stream);
                
                //Sleeps if have 10 responses to process. 
                while (memoryStreamQueue.Count >= 10)
                    Thread.Sleep(100);

                MemoryStream responseStream = ReadResponse(totalBytes, stream);
                //Don't process response. Just queue it. The other thread will process it.
                lock (memoryStreamQueue)
                    memoryStreamQueue.Enqueue(responseStream);

                count += BatchSize;                
            }
            //Mark end of transfer.
            stream.Write(new byte[8], 0, 8);
                       
            tcpClient.Close();
            watch.Stop();
            t.Abort();

            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

            return true;
        }
		
		        private void WriteHeader(Stream stream, Stream streamToSend)
        {
            byte[] header = BitConverter.GetBytes(streamToSend.Length);
            stream.Write(header, 0, header.Length);
        }

        private long GetBytesToRead(Stream stream)
        {
            byte[] header = new byte[sizeof(long)];
            int read = stream.Read(header, 0, header.Length);
            return BitConverter.ToInt64(header, 0);
        }

        private MemoryStream ReadResponse(long responseSize, Stream stream)
        {
            MemoryStream inputStream = new MemoryStream();
            byte[] buffer = new byte[100 * 1024];
            int bytesToRead;
            int read;
            long totalBytes = responseSize;

            do
            {
                bytesToRead =  totalBytes > buffer.Length ? buffer.Length : (int)totalBytes;
                read = stream.Read(buffer, 0, bytesToRead);

                inputStream.Write(buffer, 0, read);
                totalBytes -= read;
            }
            while (totalBytes > 0);
                        
            inputStream.Seek(0, SeekOrigin.Begin);
            return inputStream;
        }

Após fazer todo o código de inicialização, da thread, relógio, conexão TCP, os lotes começam a ser processados. Os métodos WriteHeader, GetBytesToRead e ReadResponse seguem praticamente a mesma lógica do teste anterior. O que muda realmente está no trecho:

       lock (memoryStreamQueue)
                    memoryStreamQueue.Enqueue(responseStream);

Essa memoryStreamQueue é simplesmente uma Queue<MemoryStream> e o uso de lock é necessário para evitar concorrência entre as duas threads.

A thread auxiliar, que faz o processamento executa o seguinte código:

        private void ProcessMemoryStreamQueue()
        {
            while (true)
            {
                if (memoryStreamQueue.Count > 0)
                {
                    MemoryStream ms = null;
                    lock (memoryStreamQueue)
                    {
                        ms = memoryStreamQueue.Dequeue();                        
                    }
                    util.ImportarStream(ConnString, ms);                    
                }
                Thread.Sleep(200);
            }
        }

Este método é bastante simples. Fica num loop eterno (Dando dormidinhas de 200 mili para evitar um 100% CPU caso não tenha nada pra fazer). Desempilha um memory stream e faz o processamento.

Conclusão

Apesar de aparentar ser muito eficiente, este método (nesta implementação) não foi o mais performático. Obviamente, a versão multi-thread ficou mais rápida, pois otimiza um pouco mais o uso do processador.

Como percebemos também, a implementação não é lá muito simples, apesar de ser possível fazer uma “casca” de servidor e cliente abstraindo somente o conteúdo.

Código-fonte

O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s