Integrações entre Sistemas – Parte 11 – TCP Server com Task Parallel Library

Objetivo

Tcp Server… de novo??

Sim. Na verdade essa história começou logo que eu publiquei o post da parte 10. Meu amigo Alexandre Costa (aka Magoo) estava acompanhando o post depois que discutimos o assunto num café e logo que saiu, ele me mandou via twitter: “Por que você não usou a TPL?”.

Estragou meu dia. Toca a procurar sobre a tal da TPL e entender o que muda na prática.

A idéia aqui foi essa, fui atrás do que é a lib, uma forma de aplicá-la e ver o quanto agrega ao problema como um todo.

TPL ou Task Parallel Library

A Task Parallel Library é uma lib para trabalhar com paralelismo no .NET. A documentação da mesma encontra-se em: http://msdn.microsoft.com/en-us/library/dd460717.aspx.

No meu problema, percebi que tinha 2 situações em específico que ela poderia ajudar:

Vamos à prática.

Tcp Server 3

Nesta versão do cliente e servidor do TCP, o que muda na prática é:

		public void ProcessInputQueue()
		{
			while (true)
			{
				TcpClient2InputStreamContext ctx = null;
				lock (inputQueue)
				{
					if (inputQueue.Count > 0)
						ctx = inputQueue.Dequeue();					
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				Parallel.Invoke(() =>
				{
					Stream inputStream = ctx.InputStream;
					ctx.InputStream = null;
					inputStream.Seek(0, SeekOrigin.Begin);
					Log.LogMessage("Before import stream " + ctx.ID.ToString());
					StreamUtil.ImportarStream(ConnString, inputStream, Log);
					Log.LogMessage("Stream imported " + ctx.ID.ToString());
					responsesProcessed++;
				});

				if (responsesProcessed == TotalBatches)
					break;
				
			}

			processingDone.Set();
		}

Fiz outras alterações no código, colocando um ID na pergunta e devolvendo pelo servidor, para melhorar situações de debug. Eu realmente peguei uma série de bugs nesses meus exemplos relacionados a sincronização. Ainda não está totalmente estável, mas como disse anteriormente, está longe de ser uma implementação de referência para o assunto.

Tcp Server 4

Além das melhorias feitas na versão 3, na 4 eu implementei também o pattern assíncrono proposto. Na prática, tudo que é BeginRead, EndRead e BeginWrite e EndWrite são substituídos por construções na TPL. Isso tudo foi tratado no cliente e no servidor.

Na prática, não muda nada. Existe a vantagem de poder inserir um callback para tratamento de exceções (exceções em threads não param o fluxo principal e são bem chatas de debugar) e não ter que chamar o “EndRead” no callback (ele já é chamado automaticamente pela TPL).

Fica mais ou menos assim:

		private void WriteHeader(NetworkStream stream)
		{
			batchesSent++;

			if (batchesSent > TotalBatches)
			{
				//Mark end of transfer.
				byte[] b = new byte[sizeof(long)];
				b = BitConverter.GetBytes((long)0);
				stream.Write(b, 0, sizeof(long));
				sendDone.Set();
				return;
			}

			Log.LogMessage("Sending request " + batchesSent.ToString());

			TcpClient2OutputStreamContext ctx = new TcpClient2OutputStreamContext();
			ctx.OutputStream = new MemoryStream();
			ctx.ClientStream = stream;
			StreamUtil.GenerateBigRequest(ctx.OutputStream, false, recordCount, recordCount + (BatchSize - 1));
			ctx.OutputStream.Seek(0, SeekOrigin.Begin);								

			byte[] header = BitConverter.GetBytes(ctx.OutputStream.Length);

			Task.Factory.FromAsync<byte[], int, int>(stream.BeginWrite, stream.EndWrite, header, 0, header.Length, ctx).ContinueWith(BeginWriteCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);			

			recordCount += BatchSize;
		}

O trecho que compõe a chamada da TPL é o Task.Factory.FromAsync. Vou tentar explicar:

  • No FromAsync, significa que os 3 parâmetros tratados pela função serão dos tipos especificados (que são os parâmetros da função BeginWrite)
  • stream.BeginWrite é o método que será chamado, usando o pattern assíncrono
  • stream.EndWrite é o método que será chamado imediatamente antes do callback. Ele é chamamdo automaticamente pela biblioteca, por isso foi removido do callback (no caso BeginWriteCallback)
  • header, 0 e header.Length são os parâmetros que serão passados para BeginWrite
  • ctx é o objeto de estado que será devolvido também no callback.
  • Em seguida, pode-se encadear vários “ContinueWith”. O primeiro, BeginWriteCallback é de fato para tratar o processamento da Stream. O segundo será chamado somente no caso de erro, para mandar para a console o erro

Agora veremos como fica o callback:

		private void BeginWriteCallback(Task task)
		{
			TcpClient2OutputStreamContext ctx = (TcpClient2OutputStreamContext)task.AsyncState;			

			if (ctx.OutputStream.Position < ctx.OutputStream.Length)
			{
				byte[] buffer = new byte[bufferSize];
				int read = ctx.OutputStream.Read(buffer, 0, buffer.Length);
				
				Task t = Task.Factory.FromAsync<byte[], int, int>(ctx.ClientStream.BeginWrite, ctx.ClientStream.EndWrite, buffer, 0, read, ctx);
				t.ContinueWith(BeginWriteCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);		
			}
			else
			{
				WriteHeader(ctx.ClientStream);
			}
		}

O callback recebe como parâmetro a Task. A propriedade AsyncState é quem devolve o objeto de estado passado no chamada da função como parâmetro.

Outro exemplo interessante é a leitura. Como neste caso, existe o retorno da quantidade de bytes que foi lida na stream que é muito importante para o tratamento da leitura. Essa construção fica da seguinte forma:

		private void ReadHeader(NetworkStream stream)
		{
			contextCount++;
			TcpClient2InputStreamContext ctx = new TcpClient2InputStreamContext();
			ctx.ID = contextCount;
			ctx.ClientStream = stream;
			ctx.Header = new byte[sizeof(long)];
			ctx.HeaderRead = false;

			Task<int>.Factory.FromAsync<byte[], int, int>(stream.BeginRead, stream.EndRead, ctx.Header, 0, ctx.Header.Length, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); ;			
		}

Neste caso, usa-se a construção Task. Apenas isso muda. O callback fica da seguinte maneira:

		private void BeginReadCallback(Task<int> task)
		{
			TcpClient2InputStreamContext ctx = (TcpClient2InputStreamContext)task.AsyncState;
			int bytesRead = task.Result;
			Log.LogMessage("Read " + bytesRead.ToString() + " bytes");

			if (!ctx.HeaderRead)
			{
				Log.LogMessage("Reading Header");
				ctx.ResponseSize = BitConverter.ToInt64(ctx.Header, 0);
				ctx.TotalRead = 0;
				Log.LogMessage("Response Size: " + ctx.ResponseSize.ToString());
				ctx.Header = null; //I don't need this buffer anymore. 
				ctx.HeaderRead = true;
				ctx.InputStream = new MemoryStream();
				ctx.InputStream.SetLength(ctx.ResponseSize);

				if (ctx.ResponseSize > 0)
				{
					ctx.Buffer = new byte[bufferSize];
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes");

					Task<int>.Factory.FromAsync<byte[], int, int>(ctx.ClientStream.BeginRead, ctx.ClientStream.EndRead, ctx.Buffer, 0, bytesToRead,
						ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); ;					
				}
				else
					receiveDone.Set();
			}
			else
			{
				ctx.TotalRead += bytesRead;
				Log.LogMessage("Writing " + bytesRead.ToString() + " to input stream");
				ctx.InputStream.Write(ctx.Buffer, 0, bytesRead);

				if (ctx.TotalRead < ctx.ResponseSize)
				{
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes");
					ctx.Buffer = new byte[bufferSize];

					Task<int>.Factory.FromAsync<byte[], int, int>(ctx.ClientStream.BeginRead, ctx.ClientStream.EndRead, ctx.Buffer, 0, bytesToRead, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);
				}
				else
				{
					Log.LogMessage("Finished reading " + ctx.InputStream.Length.ToString() + " bytes");
					lock (inputQueue)
						inputQueue.Enqueue(ctx);

					ReadHeader(ctx.ClientStream);
				}
			}
		}

Os pontos relevantes é que o tipo recebido no callback também é Task. A forma de obter o objeto de estado é a mesma e a resposta do método BeginRead, vem pelo parâmetro task.Result. O resto, mesma coisa.

Os tempos

Agora é a parte divertida. Vou comparar as 3 coisas: O modo “roots” que eu fiz anteriormente, criando todas as threads e tratamentos manualmente (Tcp Server 2), a versão 3 que somente trata o processamento de forma paralela e a versão 4 que também implementa o pattern assíncrono. Seguem os números:

Integracoes-chart1

Integracoes-chart2

Código fonte

O código fonte está atualizado no git hub: https://github.com/ericlemes/IntegrationTests.

Ele tem uma série de coisas a melhorar, existem problemas de isolamento de responsabilidades e muita coisa a refatorar, porém, acredito que atinge o objetivo aqui proposto.

Conclusão

Como vemos nos números, as diferenças são insignificantes. Meu palpite é que a quantidade de paralelismo envolvida é muito pequena. O pesado do processamento está mesmo em tratar as streams de forma assíncrona com I/O não bloqueante (implementado no servidor 2).

Sobre a TPL, ela implementa uma sintaxe bastante elegante. Não pude estudá-la de forma exaustiva, mas a princípio não vejo nada que ela traga de novo em relação ao tratamento manual das threads.

Isso mais uma vez reforça minha idéia de aprender os conceitos na raiz. O princípio de como lidar com threads é o mesmo independente de como é a implementação. Muitas dessas bibliotecas tem por objetivo tornar a sintaxe apenas mais simples, o que pode ser as vezes uma armadilha. A simplicidade por vezes acaba escondendo o conceito que existe por tras da tecnologia fazendo com que a usemos de forma incorreta. Este exercício é bem didático neste sentido. Existe um consenso de que “sempre” paralelizar trás ganhos de performance o que na prática não se mostra verdadeiro.

Anúncios

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair / Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair / Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair / Alterar )

Foto do Google+

Você está comentando utilizando sua conta Google+. Sair / Alterar )

Conectando a %s