Mês: setembro 2012

Integrações entre Sistemas – Parte 10 – TCP Server, revisitado

Objetivo

Após algum tempinho sem escrever (o tempo vai ficando curto!), resolvi retormar alguns compromissos que tinha assumido. Um deles foi lá na parte quando abordei o TCP Server que percebi que a implementação estava muito longe de ser boa.

Retomei o tema, com o objetivo de “bater” o melhor número que eu tinha que era o Websphere MQ. Óbvio que o MQ tem diversas outras features melhor do que simplesmente performance, mas apenas para fins didáticos, gostaria de chegar num número melhor.

Melhorando a performance do TCP Server e Client

A primeira coisa que percebi é que o I/O síncrono e bloqueante é muito ruim. Na prática significa que a cada requisição, o cliente e o servidor estão esperando toda a transferência de dados terminar antes de começar a trabalhar. Eu queria evitar isso.

Por sorte, a própria API de Socket e Streams da framework do .NET permite trabalhar com I/O não bloqueante. A idéia é simples, ao dar um BeginRead, o código continua a executar e quando a transferência terminou um callback é chamado.

Na teoria é muito simples, porém, na implementação, debugar e implementar processamentos totalmente assíncronos não são lá tarefas muito simples.

O segundo principal ponto de otimização foi separar em threads diferentes o processamento das informações da transferência delas, visando tornar a espera por I/O a menor possível. Veremos como isso se traduz em código.

TCP Client, nova implementação

Antes de começar a falar em código, peço desculpas pela bagunça. Tem muito a refatorar neste código ainda, mas resolvi publicar mesmo assim, pois o ótimo é inimigo do bom. Aos poucos vou refinando para chegar numa implementação boa. Ainda não considero isso uma implementação de referência (está bem longe disso ainda!).

Pra variar um pouquinho, tudo foi implementado em MSBuild, logo, se tem dúvidas, vale a pena visitar os posts que explicam os conceitos do MSBuild.

O Execute da task do TCP Client segue:


		public override bool Execute()
		{
			Stopwatch watch = new Stopwatch();
			watch.Start();			

			Log.LogMessage("Starting TCP transfer (multi thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");

			TcpClient tcpClient = new TcpClient();
			tcpClient.Connect(HostName, Port);
			NetworkStream stream = tcpClient.GetStream();			

			WriteHeader(stream);
			ReadHeader(stream);

			Thread t = new Thread(ProcessInputQueue);
			t.Start();

			sendDone.WaitOne();
			receiveDone.WaitOne();

			tcpClient.Close();
			watch.Stop();

			processingDone.WaitOne();

			Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

			return true;
		}

Aqui praticamente nada muda. O código somente conecta no servidor TCP, WriteHeader e ReadHeader iniciam a transferência assíncrona (veremos em detalhes), iniciamos a segunda thread para processar as respostas do servidor. sendDone, receiveDone são “ManualResetEvent”, uma forma mais elegante de fazer a thread principal esperar o término do processamento pelas demais. Basicamente sendDone marca o término do envio das informações para o servidor, receiveDone o retorno das respostas e processingDone o término do processamento.

O ManualResetEvent, funciona de uma forma muito simples. Sempre que você criar uma nova instância dele, você o marca como “false” se deseja que o mesmo comece travado e no caso de encontrar um “WaitOne” no meio do caminho a execução fica travada até que outra thread dê um “Set” no objeto, liberando a execução do método principal. É isso que fazemos por aqui.

Sim, programação multi thread é bem mais complexa e envolve objetos de sincronização.

Agora detalharemos o método WriteHeader:


		private void WriteHeader(NetworkStream stream)
		{
			batchesSent++;

			if (batchesSent > TotalBatches)
			{
				//Mark end of transfer.
				byte[] b = new byte[sizeof(long)];
				b = BitConverter.GetBytes((long)0);
				stream.Write(b, 0, sizeof(long));
				sendDone.Set();
				return;
			}

			Log.LogMessage("Sending request " + batchesSent.ToString());

			TcpClient2OutputStreamContext ctx = new TcpClient2OutputStreamContext();
			ctx.OutputStream = new MemoryStream();
			ctx.ClientStream = stream;
			util.GenerateBigRequest(ctx.OutputStream, false, recordCount, recordCount + (BatchSize - 1));
			ctx.OutputStream.Seek(0, SeekOrigin.Begin);			

			byte[] header = BitConverter.GetBytes(ctx.OutputStream.Length);
			stream.BeginWrite(header, 0, header.Length, BeginWriteCallback, ctx);			

			recordCount += BatchSize;			
		}

A idéia do WriteHeader é iniciar a transferência de um batch de informações. Aqui apenas está o controle de quantos lotezinhos já foram enviados. Quando chegamos no último, mandamos um request cujos 8 primeiros bytes são “0”, marcando o término da transferência.

Como a transferência é toda assíncrona, ela segue este padrão do BeginWrite com callbacks. A lógica é simples. Vou mandar um pedaço de uma stream e quando ela chegar no servidor, o método BeginWriteCallback será chamado.

Mas como eu saberei continuar a transferência? A resposta está num objeto de estado que é passado para o callback. Esse objeto é a classe TcpClient2OutputStreamContext (desculpe pelo nome, não refatorei ainda!).

A instância desse objeto carrega todo o contexto necessário para continuar a transferência, ou seja, a Stream que estou enviando (gerada em GenerateBigRequest), a Stream na qual estou escrevendo (ClientStream). Lembrando que o código passa por BeginWrite e segue a vida, ou seja, não trava a execução.

Assim que a informação chegar no servidor, o método BeginWriteCallback é executado:


		private void BeginWriteCallback(IAsyncResult result)
		{
			TcpClient2OutputStreamContext ctx = (TcpClient2OutputStreamContext)result.AsyncState;
			ctx.ClientStream.EndWrite(result);			

			if (ctx.OutputStream.Position < ctx.OutputStream.Length)
			{
				byte[] buffer = new byte[bufferSize];
				int read = ctx.OutputStream.Read(buffer, 0, buffer.Length);
				ctx.ClientStream.BeginWrite(buffer, 0, read, BeginWriteCallback, ctx);
			}
			else
			{
				WriteHeader(ctx.ClientStream);
			}
		}


A primeira linha, obtém de volta o contexto. Esse é um “pattern” do framework para streams assíncronas, inclusive a chamada do EndWrite.

Então nesse caso, se a stream de saída ainda não chegou no fim, outro “chunk” (ou naco) é enviado. Se a stream chegou ao fim, parto pro próximo lote, chamando WriteHeader novamente.

O interessante do modelo assíncrono é que enquanto eu estou transferindo as informações do BeginWrite para o servidor, a execução continua, de forma que leituras pela resposta acontecem quase que simultaneamente com as escritas. Nada impede que antes de eu receber a chamada do callback, ocorra depois de um ReadHeader, ou BeginReadCallback que veremos a seguir:


		private void ReadHeader(NetworkStream stream)
		{
			contextCount++;
			TcpClient2InputStreamContext ctx = new TcpClient2InputStreamContext();
			ctx.ID = contextCount;
			ctx.ClientStream = stream;
			ctx.Header = new byte[sizeof(long)];
			ctx.HeaderRead = false;
			
			stream.BeginRead(ctx.Header, 0, ctx.Header.Length, BeginReadCallback, ctx);
		}

A idéia do ReadHeader é ler um “long” com o tamanho da stream que virá em seguida e a partir daí começar a ler a Stream como um todo. Isso significa que do lado do servidor já houve um processamento e ele já respondeu o tamanho da resposta para o cliente.

Aqui o modelo assíncrono é um pouquinho mais chato. O TcpClient2InputStreamContext amarra um buffer no mesmo (no caso a propriedade Header). Quando o callback for chamado, o conteúdo recebido do servidor será inserido neste buffer.

		private void BeginReadCallback(IAsyncResult result)
		{
			TcpClient2InputStreamContext ctx = (TcpClient2InputStreamContext)result.AsyncState;
			int bytesRead = ctx.ClientStream.EndRead(result);
			Log.LogMessage("Read " + bytesRead.ToString() + " bytes");
			
			if (!ctx.HeaderRead)
			{
				Log.LogMessage("Reading Header");
				ctx.ResponseSize = BitConverter.ToInt64(ctx.Header, 0);
				ctx.TotalRead = 0;
				Log.LogMessage("Response Size: " + ctx.ResponseSize.ToString());
				ctx.Header = null; //I don't need this buffer anymore. 
				ctx.HeaderRead = true;
				ctx.InputStream = new MemoryStream();
				ctx.InputStream.SetLength(ctx.ResponseSize);

				if (ctx.ResponseSize > 0)
				{
					ctx.Buffer = new byte[bufferSize];
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes");
					ctx.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
				}
				else
					receiveDone.Set();
			}
			else
			{
				ctx.TotalRead += bytesRead;
				Log.LogMessage("Writing " + bytesRead.ToString() + " to input stream");
				ctx.InputStream.Write(ctx.Buffer, 0, bytesRead);				

				if (ctx.TotalRead < ctx.ResponseSize)					
				{					
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes");
					ctx.Buffer = new byte[bufferSize];
					ctx.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
				}
				else
				{
					Log.LogMessage("Finished reading " + ctx.InputStream.Length.ToString() + " bytes");
					lock (inputQueue)
						inputQueue.Enqueue(ctx);					

					ReadHeader(ctx.ClientStream);
				}
			}														
		}

O BeginReadCallback (tá uma bagunça, eu sei!) faz o seguinte processamento: Primeiro ele verifica se já leu o “header” (os 8 bytes iniciais que indicam o tamanho da Stream). Se não leu, ele faz a leitura e inicializa todo o contexto (input stream, ou seja, onde será gravada a entrada, tamanho da resposta, etc). Se neste momendo recebeu uma resposta com 0 bytes, indica que acabou a transferência (receiveDone.Set). Se a resposta tem tamanho, ele inicia a leitura da stream do servidor, inicializando um buffer (a leitura é em pedaços) e chamando o BeginRead. Existe uma continha ali somente para saber o quanto ler do servidor (o tamanho máximo do buffer, ou o tamanho de bytes restantes a receber).

Quando o callback é chamado e o Header já está lido, é checado se a resposta já chegou ao fim. Caso já tenha chegado, o cliente joga na “inputQueue” (uma fila interna) o contexto, para que seja processado pela segunda thread (método ProcessInputQueue). Veja que existe um lock para evitar concorrência entre essas duas threads brigando pela queue. Quando ainda não chegou ao fim, somente mais um teco da stream é lida.

Agora pra finalizar, explicamos o método ProcessInputQueue:


		public void ProcessInputQueue()
		{
			while (true)
			{
				TcpClient2InputStreamContext ctx = null;
				lock (inputQueue)
				{
					if (inputQueue.Count > 0)
						ctx = inputQueue.Dequeue();
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				ctx.InputStream.Seek(0, SeekOrigin.Begin);
				Log.LogMessage("Before import stream " + ctx.ID.ToString());
				util.ImportarStream(ConnString, ctx.InputStream);
				Log.LogMessage("Stream imported " + ctx.ID.ToString());

				responsesProcessed++;
				if (responsesProcessed == TotalBatches)
					break;
			}

			processingDone.Set();
		}
	}

A primeira coisa que estranha deste método é o fato dele ficar num loop infinito. Isso ocorre porque se ele terminar de executar, a thread que o executa também finaliza e eu não quero isso. O trabalho dele é simples. Ele desempilha e se tem algo a processar, ele processa (util.ImportarStream).

Ele conta quanto já processou, quando percebe que chegou no total de lotes que tinha a processar, finaliza e libera a thread principal (processingDone.Set()). Tem também a tradicional dormidinha caso não tenha nada a processar para evitar 100% de CPU num dos cores.

Tcp Server, nova implementação

O Execute do nosso TCP Server, tem a seguinte implementação:


		public override bool Execute()
		{
			Log.LogMessage("Listening to TCP Connections on port " + Port.ToString());

			IPAddress ipAddress = IPAddress.Any;
			TcpListener tcpListener = new TcpListener(ipAddress, Port);
			tcpListener.Start();
			tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);

			serverFinished.WaitOne();
			return true;
		}


A idéia aqui é somente ficar esperando eternamente por um novo cliente. Quando este cliente conectar, BeginAcceptSocket é chamado.


		private void BeginAcceptSocketCallback(IAsyncResult result)
		{
			Log.LogMessage("Received socket connection");

			streamUtil = new StreamUtil();

			TcpListener tcpListener = (TcpListener)result.AsyncState;
			TcpClient tcpClient = tcpListener.EndAcceptTcpClient(result);			
			NetworkStream clientStream = tcpClient.GetStream();

			ConnectionContext ctx = new ConnectionContext(clientStream, ConnString, this);
			ctx.StartProcessingInputQueue();
			BeginRead(ctx);

			tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);
		}


Quando a conexão chega, um ConnectionContext é instanciado. Essa classe gerencia uma conexão no servidor. Ela inicia o processamento de sua “input queue” (veremos em detalhes mais a frente), inicia a leitura da stream do servidor (BeginRead) e deixa o servidor esperando outra conexão.

A leitura segue a mesma idéia do cliente, primeiro lendo os 8 primeiros bytes (tamanho do request) e em seguida lendo o conteúdo, através de um BeginRead:


		public void BeginRead(ConnectionContext connectionContext)
		{
			InputStreamContext ctx = new InputStreamContext(connectionContext, bufferSize);						
			ctx.ConnectionContext.ClientStream.BeginRead(ctx.Header, 0, sizeof(long), BeginReadCallback, ctx);					

		private void BeginReadCallback(IAsyncResult result)
		{			
			InputStreamContext ctx = result.AsyncState as InputStreamContext;
			int bytesRead = ctx.ConnectionContext.ClientStream.EndRead(result);			

			if (!ctx.HeaderRead)
			{
				readCount++;
				Log.LogMessage("Reading " + readCount.ToString());
				ctx.ProcessHeaderAfterRead();

				if (ctx.RemainingBytes == 0)
					EnqueueEmptyHeader(ctx);
				else
					ReadChunk(ctx);				
			}
			else
			{
				ctx.ProcessChunkAfterRead(bytesRead);

				if (ctx.RemainingBytes == 0)
					ProcessRequest(ctx);
				else
					ReadChunk(ctx);
			}		
		}
		}

O BeginReadCallback segue uma idéia muito similar. Se ainda não li o cabeçalho, leio o mesmo. Se recebi um cabeçalho com 0 bytes, terminou a transferência e eu insiro um header vazio para avisar o cliente que não tenho mais nada a escrever. Se tenho algo a ler, leio um chunk.

Se eu já li o cabeçalho chamo ProcessChunkAfterRead (ele somente joga para a stream interna dele o conteúdo do buffer). Se terminou a leitura, chamo ProcessRequest, se ainda não terminou, chamo ReadChunk (para pegar mais um pedaço).

Aqui seguem os códigos para EnqueueEmptyHeader, ReadChunk e ProcessRequest:


		private void EnqueueEmptyHeader(InputStreamContext ctx)
		{
			Log.LogMessage("Writing empty header");
			InputStreamContext inputContext = new InputStreamContext(ctx.ConnectionContext, bufferSize);
			inputContext.EmptyResponse = true;
			ctx.ConnectionContext.InputQueue.Enqueue(inputContext);					
		}

		private void ReadChunk(InputStreamContext ctx)
		{			
			int bytesToRead = ctx.RemainingBytes > ctx.Buffer.Length ? ctx.Buffer.Length : (int)ctx.RemainingBytes;
			ctx.ConnectionContext.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
		}

		private void ProcessRequest(InputStreamContext ctx)
		{
			lock (ctx.ConnectionContext)
				ctx.ConnectionContext.InputQueue.Enqueue(ctx);			
			BeginRead(ctx.ConnectionContext);			
		}

Vemos aqui o ProcessRequest na verdade não processa nada. Somente joga o contexto a ser processado numa queue que será processada numa segunda Thread em ProcessInputQueue.

Os métodos a seguir estão na classe InputStreamContext. Confesso que as responsabilidades das classes não estão tão claras como eu gostaria. Ainda cabe bastante refatoração aqui.


		public void ProcessHeaderAfterRead()
		{
			this.headerRead = true;
			this.remainingBytes = BitConverter.ToInt64(this.Header, 0);
		}

		public void ProcessChunkAfterRead(int bytesRead)
		{
			this.remainingBytes -= bytesRead;
			this.RequestStream.Write(this.Buffer, 0, bytesRead);
		}

Basicamente estes métodos inicializam as propriedades internas baseados no conteúdo do buffer recebido. O “AfterRead” em seus nomes são propositais que são atualizações que só são realizadas após a leitura ter sido finalizada (callback do BeginRead).

A seguir, explicamos como funciona o processamento do request. Ao receber a conexão, o método da classe ConnectionContext StartProcessingInputQueue é chamado. Este método na verdade somente inicia uma segunda thread que fica monitorando a input queue:


		public void StartProcessingInputQueue()
		{
			inputQueueThread = new Thread(ProcessInputQueue);
			inputQueueThread.Start();
		}

		public void ProcessInputQueue()
		{
			while (true)
			{
				InputStreamContext ctx = null;
				lock (this)
				{
					if (inputQueue.Count > 0)
						ctx = inputQueue.Dequeue();
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				OutputStreamContext outputContext = new OutputStreamContext(this);
				if (ctx.EmptyResponse)
				{
					outputContext.EmptyResponse = true;
					byte[] buffer = BitConverter.GetBytes((long)0);
					ClientStream.BeginWrite(buffer, 0, buffer.Length, tcpServer.BeginWriteCallback, outputContext);
				}
				else
				{
					ctx.RequestStream.Seek(0, SeekOrigin.Begin);
					streamUtil.ProcessClientBigRequest(connString, ctx.RequestStream, outputContext.OutputStream, false, null);
					outputContext.OutputStream.Seek(0, SeekOrigin.Begin);

					byte[] buffer = BitConverter.GetBytes(outputContext.OutputStream.Length);
					ClientStream.BeginWrite(buffer, 0, buffer.Length, tcpServer.BeginWriteCallback, outputContext);

					outputContext.WriteCompleted.WaitOne();
				}	
			}
		}

Esta segunda thread tem um comportamento muito similar à thread do cliente que monitora a input queue. O que vale ressaltar aqui é o “WriteCompleted” que é mais um manual reset event. Isso gera um efeito muito divertido pois como as respostas são devolvidas em pedaços de forma assíncrona. Nada impede que dois requests comecem a ser devolvidos simultaneamente para o cliente, por isso esse reset event, espera um aviso do BeginWriteCallback para saber quando pode começar a descarregar o segundo request.

A ausência deste controle gera um efeito interessante, pois invés de mandar o primeiro pedaço do primeiro request, depois o segundo pedaço do primeiro request e assim sucessivamente, o servidor acaba mandando o primeiro pedaço do primeiro request, o primeiro pedaço do segundo request, depois o segundo pedaço do primeiro request e assim sucessivamente. Aqui é um problema e também uma oportunidade de otimização. Acredito que se criar o cliente preparado para receber mais de um request quase que simultaneamente haverá um maior consumo de memória, porém melhor aproveitamento da CPU (menos espera) por parte do servidor. Divertido, né?

A lógica de descarregamento através do BeginWrite é a mesma. A única confusão é que na tentativa de isolar melhor as responsabilidades das classes (tarefa que ainda não conclui), o callback ficou na classe TcpTestServer2 e o processamento da fila na ConnectionContext.

Aqui segue o código de BeginWriteCallback:


		public void BeginWriteCallback(IAsyncResult result)
		{
			OutputStreamContext ctx = result.AsyncState as OutputStreamContext;
			ctx.ConnectionContext.ClientStream.EndWrite(result);

			if (ctx.EmptyResponse)
				return;

			byte[] buffer = new byte[bufferSize];
			int bytesToWrite = ctx.OutputStream.Read(buffer, 0, buffer.Length);

			if (bytesToWrite > 0)
			{
				ctx.ConnectionContext.ClientStream.BeginWrite(buffer, 0, bytesToWrite, BeginWriteCallback, ctx);
			}
			else
			{
				Log.LogMessage("Finished writing ");
				ctx.WriteCompleted.Set();
			}
		}
	}

A idéia é a mesma. Enquanto não chega no fim da stream, manda um pedaço. Quando chega, avisa o método ProcessInputQueue que ele pode descarregar a próxima stream (utilizando o WriteCompleted para fazer a sincronização).

Conclusão

Agora é a parte divertida. Segue o comparativo desta abordagem contra as demais estudadas até aqui:

Como vemos, meu primeiro objetivo que era quebrar o número do Websphere MQ foi atingido. Conseguimos com esta abordagem transferir 1 milhão de registros em 355,43 segundos ou 5,92 minutos.

A abordagem utilizada para a medição foi a mesma, utilizando o mesmo volume de dados e as mesmas máquinas. 10 execuções do novo método foram executadas e o número apresentado é a média dos 10.

Os ponto que acho importante mencionar é que a abordagem é válida, porém a implementação ainda precisa ser melhorada, principalmente no que tange a recuperação de falhas. Como a comunicação é toda assíncrona é preciso estabelecer um mecanismo mais eficiente de conversa entre cliente e servidor para conseguir continuar de onde parou no caso de uma quebra de conexão.

Em relação aos outros métodos para 1 milhão de registors, temos o seguinte comparativo:

Método Tempo em segundos (1 milhão de registros) % Diferença absoluta
Tcp Client 2 (multi thread) 355,43
Websphere MQ 417,36 17,42 61,93
Http Request 428,95 20,68 73,52
File Transfer (extração simples) 495,27 39,34 139,84
File Transfer (request/response) 657,09 84,87 301,66
Tcp Client (multi thread) 671,37 88,89 315,94
Net TCP (lotes) 788,20 121,76 432,77
MSMQ 836,12 135,24 480,69
SOAP (lotes) 1059,69 198,14 704,26
TCP Client (single thread) 1079,72 203,78 724,29

O novo servidor foi ~17% mais rápido do que a abordagem utilizando Websphere MQ.

Peço desculpas pelo post meio “work in progress”, mas achei melhor publicar assim mesmo do que esperar mais alguns meses pra finalizar toda a idéia.

Código fonte

O código fonte ainda está bem bagunçado, mas está no GitHub: https://github.com/ericlemes/IntegrationTests.