MSBuildCodeMetrics 0.1.0

É com satisfação que anuncio o meu primeiro release open source: http://ericlemes.github.io/MSBuildCodeMetrics/

É um conjunto de tasks MSBuild para auxiliar na extração de métricas de repositórios de controle de versão, como linha de código, complexidade ciclomática, etc.

Atualmente existem algumas métricas que eu fiz e outras que dependem do Visual Studio para serem extraídas.

Em breve, pretendo extender as tasks para tirar métricas também do FluentCodeMetrics do Elemar Jr.

Quem quiser entender melhor pra que serve e como usar métricas, sugiro o trabalho do Leandro Daniel (que também usei como referência): http://leandrodaniel.com/index.php/code-metrics-parte-1-metricas-de-codigo-sao-aliadas-do-arquiteto/.

O projeto é open source está no git hub, logo quem quiser contribuir, fique à vontade.

Tasks MSBuild para métricas simples

Para resolver um problema pontual, codifiquei a toque de caixa duas tasks MSBuild para tirar algumas métricas de código. Provavelmente tem outra coisa por aí, melhor escrita, mas com a melhor das intenções, estou publicando para que possam usar, criticar ou colaborar.

O objetivo das tasks são:

  • CountFilesByExtension: Varre uma lista de arquivos e retorna a quantidade de arquivos por extensão
  • CountProjectsByProjectType: Varre uma lista de arquivos, abre os arquivos e identifica o tipo de projeto (Library, Windows, etc). Retorna a quantidade de projetos deste dado tipo

Código no git hub: https://github.com/ericlemes/EricLemes.MSBuildTasks

Inversão de Dependências

Introdução

unclebob

Neste post, gostaria de falar um pouco de umas impressões que tive sobre o livro “Designing Object-Oriented C++ Applications Using the Booch Method”, do Robert C. Martin (aka Uncle Bob).

Comecei a ler este livro com o objetivo de “beber na fonte”, voltar lá para os primórdios do início do desenvolvimento orientado a objeto para tentar entender como chegamos na complexidade que atualmente temos e também tentar entender se essa complexidade que estamos inserindo em nome da “orientação a objetos” tem fundamento mesmo e de onde vem esse fundamento.

Não sei se poderia ter uma fonte melhor para atingir meus objetivos. Este livro é (pasmem!) de 1995 e soa bastante atual. Vamos lembrar onde estávamos em 1995. Não sei qual era a realidade de vocês, mas na minha não se falava em UML, a escola técnica que freqüentei explicava o que é DFD (diagrama de fluxo de dados) e na minha realidade imperava o desenvolvimento em dBase, Clipper, e começávamos a sonhar em desenvolver aplicações “windows” em VB (não sei nem se na versão 3 ainda) e no meu caso Delphi 1.0, 16 bits. Windows 3.1 era talvez o sistema operacional mais popular.

É nesse contexto que o Uncle Bob falava sobre alguns tópicos que pretendo resumir aqui.

Gerenciando complexidade

É engraçado ler um livro de 1995 falando de uma “crise de software” em relação à como gerir a complexidade no desenvolvimento de software. Nada poderia ser mais atual. Ele também cita o aumento da demanda aos usuários (o aumento da expectativa) comparado com a velocidade que as técnicas de desenvolvimento de software evoluem.

O Uncle Bob cita neste livro que o grande motivador do design orientado a objetos era buscar uma forma de gerenciar melhor a complexidade do software.

Por que OOD é diferente?

Este ponto me gerou uma reflexão bastante profunda em como fazemos software. Se eu pensar no começo da minha carreira, basicamente o pensamento era baseado na análise estruturada. Pensávamos nos dados e em pequenos processos que extraiam os dados para o usuário, o usuário trabalhava esses dados e devolvia para o repositório. Tudo sempre muito orientado a dados, que talvez seja uma herança do COBOL/Mainframe que carregamos até hoje.

A estrutura de uma aplicação era muito simples. Um menu, em cada menu uma tela e em cada tela uma funcionalidade. Ninguém parava de fato para pensar em como organizar esta lógica ou como gerenciar melhor as dependências. Cada funcionalidade era gerada do zero, geralmente usando algumas bibliotecas de infra-estrutura com uma conotação fortemente técnica, quando muito existia uma “herança de form” (no caso do Delphi) para agilizar o desenvolvimento e abstrair a lógica de como o usuário operava a “tela”. Pouco falava-se de classes de domínio, camada de negócio ou qualquer coisa do tipo.

A explicação deste ponto dada pelo livro é muito interessante. Vou fazer uma tradução livre da resposta dada no livro para a pergunta “Por que design orientado a objetos é diferente?”:

UncleBob-OOD

“Imagine um design estruturado que consiste em módulos dispostos através de uma árvore. A raiz da árvore é o módulo principal. Os nós da árvore são módulos utilitários que atendem funções de baixo nível. A árvore representa uma hierarquia de chamadas. Cada módulo chama o módulo que está imediatamente abaixo e é chamado por módulos imediatamente acima.

Quais módulos capturam as políticas de importância primária da aplicação? Os que estão próximos do topo, é claro. Os módulos que estão abaixo simplesmente implementam os menores detalhes. Os módulos no topo estão preocupados com os maiores problemas e implementam as maiores soluções.

Assim, quanto mais alto se está na hierarquia, mais abstratos os conceitos são, mais pertinentes eles são para o domínio do problema da aplicação e menos pertinentes para o domínio da solução da aplicação. Quanto mais abaixo os módulos estão na hierarquia, mais detalhados os conceitos se tornam, menos relevantes eles são para o domínio do problema e mais relevantes eles são para o domínio da solução.

Dessa forma, os módulos que estão no topo chamam os módulos dele. Em outras palavras, os módulos que contem as abstrações relevantes dependem dos módulos que tem detalhes irrelevantes! Isso significa que alterações feitas nos detalhes afetam as abstrações e qualquer tentativa de reusar alguma dessas abstrações carregam junto os detalhes que delas dependem.

Em OOD tentamos inverter essas dependências. Nós criamos detalhes que dependem fortemente destas abstrações. Essa inversão de dependências é a principal diferença entre OOD (design orientado a objeto) e as técnicas tradicionais. “

Vou tentar explicar o que entendi disso com um exemplo prático de uma implementação arcaica.

Se voltarmos para a época do dBase/Clipper (com uma idéia bem DOS), tínhamos o menu principal da aplicação que chamava diretamente um programa que cadastrava o pedido. Esse programa pegava o input do usuário e jogava para um programa que gravava o pedido. Esse por sua vez, chamava uma série de subprogramas para validar CEP, endereço, CPF, CGC (nesse tempo não tinha CNPJ!). Depois esse mesmo programa gravava os dados na tabela (tabelas DBF, é claro!). Todo o processamento de quando o pedido era feito, por exemplo, sensibilização de estoque era feito chamando também um subprograma de estoque que por sua vez puxava todos seus subprogramas de validação e também sua lógica de persistência no banco de dados.

Qualquer mudança realizada na lógica de estoque, poderia quebrar a lógica do pedido. Qualquer nova necessidade do pedido como por exemplo, gerar uma nota fiscal, demandava uma alteração nos programas de pedido. Ou seja, a lógica macro demandada por este pedido estava dependendo de detalhes de implementação de pedido.

As abordagens de separação em camadas resolvem grande parte do isolamento da lógica de apresentação, negócio e persistência, porém, algumas inerentes ao domínio de negócio são pensadas aqui. Por exemplo, quando falamos de sensibilização de estoque, se pensarmos num mesmo sistema, numa mesma estrutura de base de dados, é relativamente simples pensar em fazer uma chamada e atualizar o estoque, porém, se pensarmos num sistema externo, acessível através de um web service, por exemplo, essa lógica começa a ficar mais complexa e cada vez mais o pedido acoplado nisso.

A idéia para quebrar este conceito seria fazer uma abstração da interface de estoque, de forma que com diferentes implementações (transparentes para o pedido), o comportamento de como atualizar o estoque muda, sem a necessidade do pedido mudar. Aqui é que está essa idéia de “inverter as dependências”. O pedido não depende de como o estoque está implementado.

Poderia-se pensar mais além, e fazer uma abstração para “ações” executadas no domínio após a conclusão do pedido. Toda a lógica de gerar uma nota fiscal ou sensibilizar um estoque poderia ser implementada através dessas ações.

Fiz questão de usar um exemplo baseado no domínio de negócio para não “sujarmos” o conceito com a idéia de separação em camadas.

O interessante deste exemplo, é que mesmo que usemos técnicas modernas de domain driven design, patterns e separações de camadas, poderíamos continuar não enxergando essas abstrações e deixando o pedido acoplado na nota fiscal e no estoque.

Vale lembrar que utilizei esse domínio como um exemplo. Nem sempre colocar toda essa separação aqui pode ser uma boa idéia dependendo do sistema e da flexibilidade necessária.

Open-Closed Principle

Talvez este livro seja a publicação que definiu este princípio. A idéia aqui é que a partir do momento que encontramos essa abstração e conseguimos aplicar a inversão de dependências (no sentido que a lógica macro não depende dos detalhes), conseguimos criar classes que estão fechadas para alterações e abertas para extensões. Isso é considerado um dos principais princípios do design orientado a objetos.

No exemplo acima, a partir do momento que encontramos essa abstração para o pedido, ele passa a estar fechado para alterações (não preciso mexer mais nele para implementar novas ações) e aberto para extensões (implementando novas ações, ele pode ter seu comportamento extendido).

Contextos e Métricas

Outros pontos muito legais abordados neste livro é a idéia de contextos e métricas. No livro, Uncle Bob nos trás a idéia que a partir do momento que enxergamos as principais abstrações do software, conseguimos criar contextos. Ou seja, uma abstração e todas as implementações por trás dela consistem numa importante “unidade” no software.

A proposta é que essa unidade seja usada para divisão e composição de times de desenvolvimento, ou seja, como separar as atividades. É interessante perceber naquela época a preocupação dele com isso, ou seja, como os desenvolvedores conseguem ter alguma autonomia no design e continuar trabalhando de forma colaborativa. A idéia é que a partir do momento que exista uma convenção em como visualizar as abstrações, essa divisão é possível.

Para complementar ainda ele propõe um modelo de métricas para aferir se as dependências da aplicação estão bem equilibradas ou não, se existem classes que carregam muitas dependências. Em 1995!

Fazendo um paralelo com o exemplo aqui dado, um time poderia continuar a implementação do pedido e como acessar essa interface de ações. Outros times poderiam trabalhar no estoque e na nota fiscal com alguma liberdade no design e com a garantia que as coisas se encaixarão no futuro (garantidas pela interface).

Conclusão

Quando resolvi ler uma publicação antiga, escrita 18 anos antes deste post, meu principal objetivo foi tentar entender as motivações do design orientado a objeto e cruzar com diversas discussões que participo hoje. Vejo muita complexidade sendo adicionada desnecessariamente no software em nome da orientação a objetos.

Quando vi o conceito de “inversão de dependências” do Uncle Bob, na hora me identifiquei com a forma dele pensar. Hoje vejo softwares escritos em diversas camadas, utilizando várias tecnologias modernas, mas que não conseguem enxergar essas abstrações dentro do contexto de negócio e aplicar o conceito de inversão de dependências.

Alguns exemplos dessas situações, consigo citar de forma breve:

  • Aplicações em “n” camadas, utilizando MVC, ORM e outras coisas modernas, mas que possuem o problema de rateio n vezes no modelo, escrito e replicado em diversos lugares. Por que não criar uma abstração para isso?
  • Aplicações com “n” interfaces de integração que devido à diferença de formato, reescrevem toda a lógica de extração com regra de negócio replicada em várias interfaces. Geralmente as chamadas para interfaces de integração também estão diretamente acopladas no código.

Era isso!

Integrações entre Sistemas – Parte 13 – Rabbit MQ

Objetivo

Durante minhas pesquisas, tomei conhecimento do Rabbit MQ (http://www.rabbitmq.com/) que trata-se de uma plataforma open-source para mensageria, disponível em diversos sistemas operacionais (Windows, Linux/Unix, Mac OS X e Amazon EC2).

Resolvi conhecê-lo e aproveitei para também incluí-lo junto aos demais benchmarks.

Instalação

Utilizei o Rabbit MQ em plataforma Windows. O processo de instalação dele não tem grande segredo. A única curiosidade é que por ele ter sido escrito em Erlang (http://erlang.org/), o runtime do Erlang precisa ser instalado.

Outro componente interessante de ser instalado (você vai precisar dele para rodar os exemplos) é o cliente .NET para o RabbitMQ. Você o encontra na página de downloads do Rabbit MQ. A documentação da API pode ser encontrada na página de documentação do Rabbit MQ. As vezes eu me surpreendo com a simplicidade e a organização de projetos open source.

Operação

O Rabbit MQ é relativamente simples. Não explorei todas as suas features, apenas o necessário para executar este teste/exemplo.

Existe um “Command Prompt” do Rabbit MQ (instalado junto com o servidor).

Alguns comandos úteis:

rabbitmqctl status

Mostra uma série de indicadores sobre o status do servidor.

rabbitmqctl environment

Mostra uma série de informações sobre o ambiente do servidor, entre elas uma das mais úteis que é a porta tcp que está sendo ouvida (default 5672).

rabbitmqctl list_queues

Lista as filas criadas.

rabbitmq-plugins enable rabbitmq_management

Habilita o plugin de gerenciamento do RabbitMQ. Uma vez que a instalação do plugin é realizada, basta reiniciar o servidor e acessá-lo através da URL http://localhost:15672/. Não esqueça de trocar localhost pelo IP ou hostname do seu servidor. O usuário default é guest/guest.

Cliente Rabbit MQ

A única coisa que fiz aqui foi pegar minha implementação de Websphere MQ e trocar as chamadas para o Rabbit MQ. Sim, eu poderia ter criado uma abstração mas como meu objetivo aqui é apenas entender como funciona e realizar alguns benchmarks, resolvi ser objetivo.

A implementação do cliente é muito simples. O método principal que executa a Task MSBuild segue:

		public override bool Execute()
		{
			Stopwatch watch = new Stopwatch();
			watch.Start();

			Log.LogMessage("Starting Rabbit MQ transfer with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");

			ConnectionFactory factory = new ConnectionFactory();
			factory.HostName = HostName;
			IConnection conn = factory.CreateConnection();
			IModel channel = conn.CreateModel();			
			channel.QueueDeclare(OutputQueueName, false, false, false, null);			
			IBasicProperties props = channel.CreateBasicProperties();
			props.ContentType = "text/xml";
			props.DeliveryMode = 2; //persistent

			inputChannel = conn.CreateModel();			
			inputChannel.QueueDeclare(InputQueueName, false, false, false, null);			

			messageCount = TotalBatches;

			System.Threading.Tasks.Task.Factory.StartNew(ProcessInputQueue);

			int count = 1;
			for (int i = 0; i < TotalBatches; i++)
			{
				MemoryStream ms = new MemoryStream();
				StreamUtil.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
				channel.BasicPublish("", OutputQueueName, props, ms.GetBuffer());

				count += BatchSize;
				Log.LogMessage("Sent " + count.ToString());
			}

			finishedEvent.WaitOne();

			watch.Stop();
			Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");
			conn.Close();

			return true;
		}

ConnectionFactory é o objeto da API que encapsula uma conexão com uma fila. A criação do channel, QueueDeclare seguem o modelinho da API do Rabbit MQ.

O método que efetivamente faz a publicação na fila é o channel.BasicPublish. Existem algumas regras para publicações com tópicos, publish/subscribe, mas para o nosso exemplo, somente o BasicPublish resolve o problema.

Este método também inicia uma Task (que internamente inicia uma segunda thread) para processar a fila de resposta para o cliente, no método ProcessInputQueue. O código dele segue:

		private void ProcessInputQueue()
		{			
			while (true)
			{
				BasicGetResult result = inputChannel.BasicGet(InputQueueName, true);
				if (result == null)
				{
					Thread.Sleep(250);
					continue;
				}
				MemoryStream ms = new MemoryStream(result.Body);
				messageCount--;

				Log.LogMessage("Waiting for more " + messageCount.ToString());

				StreamUtil.ImportarStream(ConnString, ms);

				if (messageCount <= 0)				
					break;									
			}

			finishedEvent.Set();
		}

Mesma dinâmica do Websphere MQ. Aqui fico num loop infinito, pegando as mensagens. Sempre que não tem nada a processar (result == null), dou uma pequena dormidinha na thread somente para não matar a CPU da máquina. Pega a resposta e processa. Sem grandes segredos.

Servidor RabbitMQ

O servidor segue a mesma dinâmica:

		public override bool Execute()
		{
			Log.LogMessage("Starting RabbitMQ Server");

			ConnectionFactory factory = new ConnectionFactory();
			factory.HostName = "localhost";
			IConnection conn = factory.CreateConnection();

			IModel inputChannel = conn.CreateModel();
			inputChannel.QueueDeclare(InputQueueName, false, false, false, null);

			IConnection conn2 = factory.CreateConnection();			

			IBasicProperties props = inputChannel.CreateBasicProperties();
			props.ContentType = "text/xml";
			props.DeliveryMode = 2; //persistent

			IModel outputChannel = conn2.CreateModel();			
			outputChannel.QueueDeclare(OutputQueueName, false, false, false, null);			

			int count = 0;

			while (true)
			{
				BasicGetResult result = inputChannel.BasicGet(InputQueueName, true);
				if (result == null)
				{
					Thread.Sleep(250);
					continue;
				}

				MemoryStream ms = new MemoryStream(result.Body);
				result = null; // let GC work.
				MemoryStream respStream = new MemoryStream();

				StreamUtil.ProcessClientBigRequest(ConnString, ms, respStream, false, null);

				outputChannel.BasicPublish("", OutputQueueName, props, respStream.GetBuffer());
				respStream = null; //let GC work

				count++;

				Log.LogMessage("Processed " + count.ToString());

			}

Comparativo dos resultados

Como nem tudo são rosas, a máquina que estava usando como servidor resolveu morrer e tive que reinstalar tudo novamente. Por tabela, perdi a confiança nos números que tinha executado anteriormente e acabei reexecutando todos os testes no novo ambiente, com o objetivo de manter a premissa de “mesmo payload, mesmo ambiente” que utilizei desde o início dessa série.

Mesmo assim, continuei executando 10 vezes cada teste com cada um dos volumes (20k, 50k, 500k, 1M) e o número apresentado aqui é a média dos 10, para evitar diferenças que ocorrem numa ou outra execução devido a cache de banco, paginação de memória, garbage collector, etc.

Os números são:

integracoes1

integracoes2

Como os números mostram, o desempenho do RabbitMQ não foi satisfatório. Fiquei bastante frustrado com esse resultado. Apesar de toda a simplicidade da implementação, da instalação, etc., o número ficou praticamente a mesma coisa do que usar Web Services em SOAP.

Além disso, apesar de todo o trabalho para montar um novo ambiente (que mudou de 32 para 64 bits na máquina servidora), houve uma piora significativa dos números do File Transfer. Vou investigar mais a fundo as razões desta piora dado o novo ambiente instalado.

Código fonte

O código fonte das implementações acima foi atualizado no Git Hub: https://github.com/ericlemes/IntegrationTests

Conclusão

Eu vi várias pessoas recomendando o RabbitMQ, o que me motivou a realizar esta pesquisa comparando também com o MSMQ e Websphere MQ. Acho que ainda vale a pena uma investigação mais aprofundada de como otimizar o RabbitMQ antes de simplesmente descartá-lo. Este trabalho é bastante superficial.

Mas acho que a principal conclusão que eu chego é que antes de sair utilizando uma nova tecnologia, vale a pena buscar informações sobre ela, testar, brincar, comparar. Baseado nos números aqui obtidos, numa implementação entre plataformas distintas (java/.NET por exemplo), ficaria com SOAP e numa implementação apenas em plataforma Windows, com o MSMQ.

Reorganizando conteúdo

De uns tempos pra cá, percebi que nem eu encontrava mais o conteúdo no meu blog sem dar um search no blog ou no google e fiquei um pouco incomodado com isso. Resolvi escrever um índice para ajudar a encontrar o conteúdo um pouco mais atemporal.

Acabei construindo esse índice como uma página que ficará fixa no blog, no link: http://ericlemes.com/indice/

O interessante deste processo de criar o índice é que nem eu tinha percebido quanta coisa acabei escrevendo nesse tempo todo (acho que comecei em 2009). Algumas coisas foram escritas para ajudar desenvolvedores do time que estavam começando, outras para compartilhar pensamentos, documentar descobertas, realizar pequenos benchmarks, mas o mais interessante é lembrar do começo de tudo, quando resolvi investir tempo no blog.

Naquela oportunidade sofria um pouco com a sensação de falta de conclusão das atividades. Aquele sentimento de que os projetos não terminavam e eu queria fazer coisas que tivessem começo, meio e fim. Cada post é assim. Nasce com uma idéia, uma pesquisa, as vezes fica muito maior do que eu pensava no começo, mas sempre que eu publico e depois recebo os feedbacks acabo tendo essa sensação legal de trabalho concluído.

Bom, mais um post concluído. Era isso.

Os números de 2012

Acabei de receber o relatório do WordPress com as stats de 2012. Foram 19.000 acessos em 2012. Um número relativamente baixo, mas para um blog que começou de brincadeira no qual posto pouco mais de uma vez por mês, fiquei satisfeito com o resultado.

Obrigado a todos que acompanharam, discutiram ou de alguma forma motivaram estes posts. Abraço e feliz 2013 a todos!

Integrações entre Sistemas – Parte 12 – MSMQ, Revisitado

Objetivo

Na parte 7 dessa série vimos uma implementação baseada em MSMQ que ficou muito atrás da implementação em Websphere MQ e eu tinha me comprometido a revisá-la. O objetivo deste post é apresentar essa revisão.

A princípio imaginei que o resultado ruim estava relacionado a filas transacionais, mas na prática percebi que o problema estava principalmente relacionado ao tratamento do recebimento da fila de forma assíncrona.

Melhorias realizadas

Realizei algumas melhorias pequenas no código, substituindo o loop deselegante na thread principal por um ManualResetEvent e outras melhorias menores, mas o ponto chave estava no tratamento do evento “ReceiveCompleted” da fila.

Criei uma Queue e uma segunda thread para tratar de forma paralela, não obstruindo o caminho do MQ com código de processamento e deixando o canal de recepção de mensagens livre para dar mais vazão à fila de entrada. Ficou assim:


		private void inQueue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
		{		
			
			lock (messageInputQueue)
			{
				messageInputQueue.Enqueue(e.Message);
				enqueueCount++;
				Log.LogMessage("Enqueued " + enqueueCount.ToString() + " messages");
			}
		
			if (enqueueCount < TotalBatches)
				inputQueue.BeginReceive();
		}

messageInputQueue trata-se da Queue a que me referi. É utilizado um “lock” para evitar concorrência entre a thread que empilha e a que desempilha as mensagens.

Este é o código que trata o processamento das mensagens, executado através de uma Task:

		private void ProcessInputQueue()
		{
			while (true)
			{
				Message msg = null;

				lock (messageInputQueue)
				{
					if (messageInputQueue.Count > 0)
						msg = messageInputQueue.Dequeue();
				}

				if (msg == null)
				{
					Thread.Sleep(50);
					continue;
				}
				
				StreamUtil.ImportarStream(ConnString, msg.BodyStream);
				Log.LogMessage("Processed message " + messageCount.ToString());					
				messageCount--;					

				if (messageCount <= 0)
					break;
			}
			processingDone.Set();
		}

Resultados

Abaixo segue os resultados completos inserindo também as execuções do MSMQ com filas transacionais e não-transacionais:

Integracoes-chart1

Integracoes-chart2

Como os resultados mostram, a diferença do Websphere MQ para o MSMQ não foi significativa, mas as diferenças entre a implementação anterior em MSMQ para a nova é absurdamente grande (praticamente o dobro!).

Código fonte

O código fonte completo está atualizado no Git Hub: https://github.com/ericlemes/IntegrationTests

Integrações entre Sistemas – Parte 11 – TCP Server com Task Parallel Library

Objetivo

Tcp Server… de novo??

Sim. Na verdade essa história começou logo que eu publiquei o post da parte 10. Meu amigo Alexandre Costa (aka Magoo) estava acompanhando o post depois que discutimos o assunto num café e logo que saiu, ele me mandou via twitter: “Por que você não usou a TPL?”.

Estragou meu dia. Toca a procurar sobre a tal da TPL e entender o que muda na prática.

A idéia aqui foi essa, fui atrás do que é a lib, uma forma de aplicá-la e ver o quanto agrega ao problema como um todo.

TPL ou Task Parallel Library

A Task Parallel Library é uma lib para trabalhar com paralelismo no .NET. A documentação da mesma encontra-se em: http://msdn.microsoft.com/en-us/library/dd460717.aspx.

No meu problema, percebi que tinha 2 situações em específico que ela poderia ajudar:

Vamos à prática.

Tcp Server 3

Nesta versão do cliente e servidor do TCP, o que muda na prática é:

		public void ProcessInputQueue()
		{
			while (true)
			{
				TcpClient2InputStreamContext ctx = null;
				lock (inputQueue)
				{
					if (inputQueue.Count &gt; 0)
						ctx = inputQueue.Dequeue();					
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				Parallel.Invoke(() =&gt;
				{
					Stream inputStream = ctx.InputStream;
					ctx.InputStream = null;
					inputStream.Seek(0, SeekOrigin.Begin);
					Log.LogMessage(&quot;Before import stream &quot; + ctx.ID.ToString());
					StreamUtil.ImportarStream(ConnString, inputStream, Log);
					Log.LogMessage(&quot;Stream imported &quot; + ctx.ID.ToString());
					responsesProcessed++;
				});

				if (responsesProcessed == TotalBatches)
					break;
				
			}

			processingDone.Set();
		}

Fiz outras alterações no código, colocando um ID na pergunta e devolvendo pelo servidor, para melhorar situações de debug. Eu realmente peguei uma série de bugs nesses meus exemplos relacionados a sincronização. Ainda não está totalmente estável, mas como disse anteriormente, está longe de ser uma implementação de referência para o assunto.

Tcp Server 4

Além das melhorias feitas na versão 3, na 4 eu implementei também o pattern assíncrono proposto. Na prática, tudo que é BeginRead, EndRead e BeginWrite e EndWrite são substituídos por construções na TPL. Isso tudo foi tratado no cliente e no servidor.

Na prática, não muda nada. Existe a vantagem de poder inserir um callback para tratamento de exceções (exceções em threads não param o fluxo principal e são bem chatas de debugar) e não ter que chamar o “EndRead” no callback (ele já é chamado automaticamente pela TPL).

Fica mais ou menos assim:

		private void WriteHeader(NetworkStream stream)
		{
			batchesSent++;

			if (batchesSent &gt; TotalBatches)
			{
				//Mark end of transfer.
				byte[] b = new byte[sizeof(long)];
				b = BitConverter.GetBytes((long)0);
				stream.Write(b, 0, sizeof(long));
				sendDone.Set();
				return;
			}

			Log.LogMessage(&quot;Sending request &quot; + batchesSent.ToString());

			TcpClient2OutputStreamContext ctx = new TcpClient2OutputStreamContext();
			ctx.OutputStream = new MemoryStream();
			ctx.ClientStream = stream;
			StreamUtil.GenerateBigRequest(ctx.OutputStream, false, recordCount, recordCount + (BatchSize - 1));
			ctx.OutputStream.Seek(0, SeekOrigin.Begin);								

			byte[] header = BitConverter.GetBytes(ctx.OutputStream.Length);

			Task.Factory.FromAsync&lt;byte[], int, int&gt;(stream.BeginWrite, stream.EndWrite, header, 0, header.Length, ctx).ContinueWith(BeginWriteCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);			

			recordCount += BatchSize;
		}

O trecho que compõe a chamada da TPL é o Task.Factory.FromAsync. Vou tentar explicar:

  • No FromAsync, significa que os 3 parâmetros tratados pela função serão dos tipos especificados (que são os parâmetros da função BeginWrite)
  • stream.BeginWrite é o método que será chamado, usando o pattern assíncrono
  • stream.EndWrite é o método que será chamado imediatamente antes do callback. Ele é chamamdo automaticamente pela biblioteca, por isso foi removido do callback (no caso BeginWriteCallback)
  • header, 0 e header.Length são os parâmetros que serão passados para BeginWrite
  • ctx é o objeto de estado que será devolvido também no callback.
  • Em seguida, pode-se encadear vários “ContinueWith”. O primeiro, BeginWriteCallback é de fato para tratar o processamento da Stream. O segundo será chamado somente no caso de erro, para mandar para a console o erro

Agora veremos como fica o callback:

		private void BeginWriteCallback(Task task)
		{
			TcpClient2OutputStreamContext ctx = (TcpClient2OutputStreamContext)task.AsyncState;			

			if (ctx.OutputStream.Position &lt; ctx.OutputStream.Length)
			{
				byte[] buffer = new byte[bufferSize];
				int read = ctx.OutputStream.Read(buffer, 0, buffer.Length);
				
				Task t = Task.Factory.FromAsync&lt;byte[], int, int&gt;(ctx.ClientStream.BeginWrite, ctx.ClientStream.EndWrite, buffer, 0, read, ctx);
				t.ContinueWith(BeginWriteCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);		
			}
			else
			{
				WriteHeader(ctx.ClientStream);
			}
		}

O callback recebe como parâmetro a Task. A propriedade AsyncState é quem devolve o objeto de estado passado no chamada da função como parâmetro.

Outro exemplo interessante é a leitura. Como neste caso, existe o retorno da quantidade de bytes que foi lida na stream que é muito importante para o tratamento da leitura. Essa construção fica da seguinte forma:

		private void ReadHeader(NetworkStream stream)
		{
			contextCount++;
			TcpClient2InputStreamContext ctx = new TcpClient2InputStreamContext();
			ctx.ID = contextCount;
			ctx.ClientStream = stream;
			ctx.Header = new byte[sizeof(long)];
			ctx.HeaderRead = false;

			Task&lt;int&gt;.Factory.FromAsync&lt;byte[], int, int&gt;(stream.BeginRead, stream.EndRead, ctx.Header, 0, ctx.Header.Length, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); ;			
		}

Neste caso, usa-se a construção Task. Apenas isso muda. O callback fica da seguinte maneira:

		private void BeginReadCallback(Task&lt;int&gt; task)
		{
			TcpClient2InputStreamContext ctx = (TcpClient2InputStreamContext)task.AsyncState;
			int bytesRead = task.Result;
			Log.LogMessage(&quot;Read &quot; + bytesRead.ToString() + &quot; bytes&quot;);

			if (!ctx.HeaderRead)
			{
				Log.LogMessage(&quot;Reading Header&quot;);
				ctx.ResponseSize = BitConverter.ToInt64(ctx.Header, 0);
				ctx.TotalRead = 0;
				Log.LogMessage(&quot;Response Size: &quot; + ctx.ResponseSize.ToString());
				ctx.Header = null; //I don't need this buffer anymore. 
				ctx.HeaderRead = true;
				ctx.InputStream = new MemoryStream();
				ctx.InputStream.SetLength(ctx.ResponseSize);

				if (ctx.ResponseSize &gt; 0)
				{
					ctx.Buffer = new byte[bufferSize];
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) &gt; ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage(&quot;Reading &quot; + bytesToRead.ToString() + &quot; bytes&quot;);

					Task&lt;int&gt;.Factory.FromAsync&lt;byte[], int, int&gt;(ctx.ClientStream.BeginRead, ctx.ClientStream.EndRead, ctx.Buffer, 0, bytesToRead,
						ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); ;					
				}
				else
					receiveDone.Set();
			}
			else
			{
				ctx.TotalRead += bytesRead;
				Log.LogMessage(&quot;Writing &quot; + bytesRead.ToString() + &quot; to input stream&quot;);
				ctx.InputStream.Write(ctx.Buffer, 0, bytesRead);

				if (ctx.TotalRead &lt; ctx.ResponseSize)
				{
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) &gt; ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage(&quot;Reading &quot; + bytesToRead.ToString() + &quot; bytes&quot;);
					ctx.Buffer = new byte[bufferSize];

					Task&lt;int&gt;.Factory.FromAsync&lt;byte[], int, int&gt;(ctx.ClientStream.BeginRead, ctx.ClientStream.EndRead, ctx.Buffer, 0, bytesToRead, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);
				}
				else
				{
					Log.LogMessage(&quot;Finished reading &quot; + ctx.InputStream.Length.ToString() + &quot; bytes&quot;);
					lock (inputQueue)
						inputQueue.Enqueue(ctx);

					ReadHeader(ctx.ClientStream);
				}
			}
		}

Os pontos relevantes é que o tipo recebido no callback também é Task. A forma de obter o objeto de estado é a mesma e a resposta do método BeginRead, vem pelo parâmetro task.Result. O resto, mesma coisa.

Os tempos

Agora é a parte divertida. Vou comparar as 3 coisas: O modo “roots” que eu fiz anteriormente, criando todas as threads e tratamentos manualmente (Tcp Server 2), a versão 3 que somente trata o processamento de forma paralela e a versão 4 que também implementa o pattern assíncrono. Seguem os números:

Integracoes-chart1

Integracoes-chart2

Código fonte

O código fonte está atualizado no git hub: https://github.com/ericlemes/IntegrationTests.

Ele tem uma série de coisas a melhorar, existem problemas de isolamento de responsabilidades e muita coisa a refatorar, porém, acredito que atinge o objetivo aqui proposto.

Conclusão

Como vemos nos números, as diferenças são insignificantes. Meu palpite é que a quantidade de paralelismo envolvida é muito pequena. O pesado do processamento está mesmo em tratar as streams de forma assíncrona com I/O não bloqueante (implementado no servidor 2).

Sobre a TPL, ela implementa uma sintaxe bastante elegante. Não pude estudá-la de forma exaustiva, mas a princípio não vejo nada que ela traga de novo em relação ao tratamento manual das threads.

Isso mais uma vez reforça minha idéia de aprender os conceitos na raiz. O princípio de como lidar com threads é o mesmo independente de como é a implementação. Muitas dessas bibliotecas tem por objetivo tornar a sintaxe apenas mais simples, o que pode ser as vezes uma armadilha. A simplicidade por vezes acaba escondendo o conceito que existe por tras da tecnologia fazendo com que a usemos de forma incorreta. Este exercício é bem didático neste sentido. Existe um consenso de que “sempre” paralelizar trás ganhos de performance o que na prática não se mostra verdadeiro.

Integrações entre Sistemas – Parte 10 – TCP Server, revisitado

Objetivo

Após algum tempinho sem escrever (o tempo vai ficando curto!), resolvi retormar alguns compromissos que tinha assumido. Um deles foi lá na parte quando abordei o TCP Server que percebi que a implementação estava muito longe de ser boa.

Retomei o tema, com o objetivo de “bater” o melhor número que eu tinha que era o Websphere MQ. Óbvio que o MQ tem diversas outras features melhor do que simplesmente performance, mas apenas para fins didáticos, gostaria de chegar num número melhor.

Melhorando a performance do TCP Server e Client

A primeira coisa que percebi é que o I/O síncrono e bloqueante é muito ruim. Na prática significa que a cada requisição, o cliente e o servidor estão esperando toda a transferência de dados terminar antes de começar a trabalhar. Eu queria evitar isso.

Por sorte, a própria API de Socket e Streams da framework do .NET permite trabalhar com I/O não bloqueante. A idéia é simples, ao dar um BeginRead, o código continua a executar e quando a transferência terminou um callback é chamado.

Na teoria é muito simples, porém, na implementação, debugar e implementar processamentos totalmente assíncronos não são lá tarefas muito simples.

O segundo principal ponto de otimização foi separar em threads diferentes o processamento das informações da transferência delas, visando tornar a espera por I/O a menor possível. Veremos como isso se traduz em código.

TCP Client, nova implementação

Antes de começar a falar em código, peço desculpas pela bagunça. Tem muito a refatorar neste código ainda, mas resolvi publicar mesmo assim, pois o ótimo é inimigo do bom. Aos poucos vou refinando para chegar numa implementação boa. Ainda não considero isso uma implementação de referência (está bem longe disso ainda!).

Pra variar um pouquinho, tudo foi implementado em MSBuild, logo, se tem dúvidas, vale a pena visitar os posts que explicam os conceitos do MSBuild.

O Execute da task do TCP Client segue:


		public override bool Execute()
		{
			Stopwatch watch = new Stopwatch();
			watch.Start();			

			Log.LogMessage("Starting TCP transfer (multi thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");

			TcpClient tcpClient = new TcpClient();
			tcpClient.Connect(HostName, Port);
			NetworkStream stream = tcpClient.GetStream();			

			WriteHeader(stream);
			ReadHeader(stream);

			Thread t = new Thread(ProcessInputQueue);
			t.Start();

			sendDone.WaitOne();
			receiveDone.WaitOne();

			tcpClient.Close();
			watch.Stop();

			processingDone.WaitOne();

			Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

			return true;
		}

Aqui praticamente nada muda. O código somente conecta no servidor TCP, WriteHeader e ReadHeader iniciam a transferência assíncrona (veremos em detalhes), iniciamos a segunda thread para processar as respostas do servidor. sendDone, receiveDone são “ManualResetEvent”, uma forma mais elegante de fazer a thread principal esperar o término do processamento pelas demais. Basicamente sendDone marca o término do envio das informações para o servidor, receiveDone o retorno das respostas e processingDone o término do processamento.

O ManualResetEvent, funciona de uma forma muito simples. Sempre que você criar uma nova instância dele, você o marca como “false” se deseja que o mesmo comece travado e no caso de encontrar um “WaitOne” no meio do caminho a execução fica travada até que outra thread dê um “Set” no objeto, liberando a execução do método principal. É isso que fazemos por aqui.

Sim, programação multi thread é bem mais complexa e envolve objetos de sincronização.

Agora detalharemos o método WriteHeader:


		private void WriteHeader(NetworkStream stream)
		{
			batchesSent++;

			if (batchesSent > TotalBatches)
			{
				//Mark end of transfer.
				byte[] b = new byte[sizeof(long)];
				b = BitConverter.GetBytes((long)0);
				stream.Write(b, 0, sizeof(long));
				sendDone.Set();
				return;
			}

			Log.LogMessage("Sending request " + batchesSent.ToString());

			TcpClient2OutputStreamContext ctx = new TcpClient2OutputStreamContext();
			ctx.OutputStream = new MemoryStream();
			ctx.ClientStream = stream;
			util.GenerateBigRequest(ctx.OutputStream, false, recordCount, recordCount + (BatchSize - 1));
			ctx.OutputStream.Seek(0, SeekOrigin.Begin);			

			byte[] header = BitConverter.GetBytes(ctx.OutputStream.Length);
			stream.BeginWrite(header, 0, header.Length, BeginWriteCallback, ctx);			

			recordCount += BatchSize;			
		}

A idéia do WriteHeader é iniciar a transferência de um batch de informações. Aqui apenas está o controle de quantos lotezinhos já foram enviados. Quando chegamos no último, mandamos um request cujos 8 primeiros bytes são “0″, marcando o término da transferência.

Como a transferência é toda assíncrona, ela segue este padrão do BeginWrite com callbacks. A lógica é simples. Vou mandar um pedaço de uma stream e quando ela chegar no servidor, o método BeginWriteCallback será chamado.

Mas como eu saberei continuar a transferência? A resposta está num objeto de estado que é passado para o callback. Esse objeto é a classe TcpClient2OutputStreamContext (desculpe pelo nome, não refatorei ainda!).

A instância desse objeto carrega todo o contexto necessário para continuar a transferência, ou seja, a Stream que estou enviando (gerada em GenerateBigRequest), a Stream na qual estou escrevendo (ClientStream). Lembrando que o código passa por BeginWrite e segue a vida, ou seja, não trava a execução.

Assim que a informação chegar no servidor, o método BeginWriteCallback é executado:


		private void BeginWriteCallback(IAsyncResult result)
		{
			TcpClient2OutputStreamContext ctx = (TcpClient2OutputStreamContext)result.AsyncState;
			ctx.ClientStream.EndWrite(result);			

			if (ctx.OutputStream.Position < ctx.OutputStream.Length)
			{
				byte[] buffer = new byte[bufferSize];
				int read = ctx.OutputStream.Read(buffer, 0, buffer.Length);
				ctx.ClientStream.BeginWrite(buffer, 0, read, BeginWriteCallback, ctx);
			}
			else
			{
				WriteHeader(ctx.ClientStream);
			}
		}


A primeira linha, obtém de volta o contexto. Esse é um “pattern” do framework para streams assíncronas, inclusive a chamada do EndWrite.

Então nesse caso, se a stream de saída ainda não chegou no fim, outro “chunk” (ou naco) é enviado. Se a stream chegou ao fim, parto pro próximo lote, chamando WriteHeader novamente.

O interessante do modelo assíncrono é que enquanto eu estou transferindo as informações do BeginWrite para o servidor, a execução continua, de forma que leituras pela resposta acontecem quase que simultaneamente com as escritas. Nada impede que antes de eu receber a chamada do callback, ocorra depois de um ReadHeader, ou BeginReadCallback que veremos a seguir:


		private void ReadHeader(NetworkStream stream)
		{
			contextCount++;
			TcpClient2InputStreamContext ctx = new TcpClient2InputStreamContext();
			ctx.ID = contextCount;
			ctx.ClientStream = stream;
			ctx.Header = new byte[sizeof(long)];
			ctx.HeaderRead = false;
			
			stream.BeginRead(ctx.Header, 0, ctx.Header.Length, BeginReadCallback, ctx);
		}

A idéia do ReadHeader é ler um “long” com o tamanho da stream que virá em seguida e a partir daí começar a ler a Stream como um todo. Isso significa que do lado do servidor já houve um processamento e ele já respondeu o tamanho da resposta para o cliente.

Aqui o modelo assíncrono é um pouquinho mais chato. O TcpClient2InputStreamContext amarra um buffer no mesmo (no caso a propriedade Header). Quando o callback for chamado, o conteúdo recebido do servidor será inserido neste buffer.

		private void BeginReadCallback(IAsyncResult result)
		{
			TcpClient2InputStreamContext ctx = (TcpClient2InputStreamContext)result.AsyncState;
			int bytesRead = ctx.ClientStream.EndRead(result);
			Log.LogMessage("Read " + bytesRead.ToString() + " bytes");
			
			if (!ctx.HeaderRead)
			{
				Log.LogMessage("Reading Header");
				ctx.ResponseSize = BitConverter.ToInt64(ctx.Header, 0);
				ctx.TotalRead = 0;
				Log.LogMessage("Response Size: " + ctx.ResponseSize.ToString());
				ctx.Header = null; //I don't need this buffer anymore. 
				ctx.HeaderRead = true;
				ctx.InputStream = new MemoryStream();
				ctx.InputStream.SetLength(ctx.ResponseSize);

				if (ctx.ResponseSize > 0)
				{
					ctx.Buffer = new byte[bufferSize];
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes");
					ctx.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
				}
				else
					receiveDone.Set();
			}
			else
			{
				ctx.TotalRead += bytesRead;
				Log.LogMessage("Writing " + bytesRead.ToString() + " to input stream");
				ctx.InputStream.Write(ctx.Buffer, 0, bytesRead);				

				if (ctx.TotalRead < ctx.ResponseSize)					
				{					
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes");
					ctx.Buffer = new byte[bufferSize];
					ctx.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
				}
				else
				{
					Log.LogMessage("Finished reading " + ctx.InputStream.Length.ToString() + " bytes");
					lock (inputQueue)
						inputQueue.Enqueue(ctx);					

					ReadHeader(ctx.ClientStream);
				}
			}														
		}

O BeginReadCallback (tá uma bagunça, eu sei!) faz o seguinte processamento: Primeiro ele verifica se já leu o “header” (os 8 bytes iniciais que indicam o tamanho da Stream). Se não leu, ele faz a leitura e inicializa todo o contexto (input stream, ou seja, onde será gravada a entrada, tamanho da resposta, etc). Se neste momendo recebeu uma resposta com 0 bytes, indica que acabou a transferência (receiveDone.Set). Se a resposta tem tamanho, ele inicia a leitura da stream do servidor, inicializando um buffer (a leitura é em pedaços) e chamando o BeginRead. Existe uma continha ali somente para saber o quanto ler do servidor (o tamanho máximo do buffer, ou o tamanho de bytes restantes a receber).

Quando o callback é chamado e o Header já está lido, é checado se a resposta já chegou ao fim. Caso já tenha chegado, o cliente joga na “inputQueue” (uma fila interna) o contexto, para que seja processado pela segunda thread (método ProcessInputQueue). Veja que existe um lock para evitar concorrência entre essas duas threads brigando pela queue. Quando ainda não chegou ao fim, somente mais um teco da stream é lida.

Agora pra finalizar, explicamos o método ProcessInputQueue:


		public void ProcessInputQueue()
		{
			while (true)
			{
				TcpClient2InputStreamContext ctx = null;
				lock (inputQueue)
				{
					if (inputQueue.Count > 0)
						ctx = inputQueue.Dequeue();
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				ctx.InputStream.Seek(0, SeekOrigin.Begin);
				Log.LogMessage("Before import stream " + ctx.ID.ToString());
				util.ImportarStream(ConnString, ctx.InputStream);
				Log.LogMessage("Stream imported " + ctx.ID.ToString());

				responsesProcessed++;
				if (responsesProcessed == TotalBatches)
					break;
			}

			processingDone.Set();
		}
	}

A primeira coisa que estranha deste método é o fato dele ficar num loop infinito. Isso ocorre porque se ele terminar de executar, a thread que o executa também finaliza e eu não quero isso. O trabalho dele é simples. Ele desempilha e se tem algo a processar, ele processa (util.ImportarStream).

Ele conta quanto já processou, quando percebe que chegou no total de lotes que tinha a processar, finaliza e libera a thread principal (processingDone.Set()). Tem também a tradicional dormidinha caso não tenha nada a processar para evitar 100% de CPU num dos cores.

Tcp Server, nova implementação

O Execute do nosso TCP Server, tem a seguinte implementação:


		public override bool Execute()
		{
			Log.LogMessage("Listening to TCP Connections on port " + Port.ToString());

			IPAddress ipAddress = IPAddress.Any;
			TcpListener tcpListener = new TcpListener(ipAddress, Port);
			tcpListener.Start();
			tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);

			serverFinished.WaitOne();
			return true;
		}


A idéia aqui é somente ficar esperando eternamente por um novo cliente. Quando este cliente conectar, BeginAcceptSocket é chamado.


		private void BeginAcceptSocketCallback(IAsyncResult result)
		{
			Log.LogMessage("Received socket connection");

			streamUtil = new StreamUtil();

			TcpListener tcpListener = (TcpListener)result.AsyncState;
			TcpClient tcpClient = tcpListener.EndAcceptTcpClient(result);			
			NetworkStream clientStream = tcpClient.GetStream();

			ConnectionContext ctx = new ConnectionContext(clientStream, ConnString, this);
			ctx.StartProcessingInputQueue();
			BeginRead(ctx);

			tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);
		}


Quando a conexão chega, um ConnectionContext é instanciado. Essa classe gerencia uma conexão no servidor. Ela inicia o processamento de sua “input queue” (veremos em detalhes mais a frente), inicia a leitura da stream do servidor (BeginRead) e deixa o servidor esperando outra conexão.

A leitura segue a mesma idéia do cliente, primeiro lendo os 8 primeiros bytes (tamanho do request) e em seguida lendo o conteúdo, através de um BeginRead:


		public void BeginRead(ConnectionContext connectionContext)
		{
			InputStreamContext ctx = new InputStreamContext(connectionContext, bufferSize);						
			ctx.ConnectionContext.ClientStream.BeginRead(ctx.Header, 0, sizeof(long), BeginReadCallback, ctx);					

		private void BeginReadCallback(IAsyncResult result)
		{			
			InputStreamContext ctx = result.AsyncState as InputStreamContext;
			int bytesRead = ctx.ConnectionContext.ClientStream.EndRead(result);			

			if (!ctx.HeaderRead)
			{
				readCount++;
				Log.LogMessage("Reading " + readCount.ToString());
				ctx.ProcessHeaderAfterRead();

				if (ctx.RemainingBytes == 0)
					EnqueueEmptyHeader(ctx);
				else
					ReadChunk(ctx);				
			}
			else
			{
				ctx.ProcessChunkAfterRead(bytesRead);

				if (ctx.RemainingBytes == 0)
					ProcessRequest(ctx);
				else
					ReadChunk(ctx);
			}		
		}
		}

O BeginReadCallback segue uma idéia muito similar. Se ainda não li o cabeçalho, leio o mesmo. Se recebi um cabeçalho com 0 bytes, terminou a transferência e eu insiro um header vazio para avisar o cliente que não tenho mais nada a escrever. Se tenho algo a ler, leio um chunk.

Se eu já li o cabeçalho chamo ProcessChunkAfterRead (ele somente joga para a stream interna dele o conteúdo do buffer). Se terminou a leitura, chamo ProcessRequest, se ainda não terminou, chamo ReadChunk (para pegar mais um pedaço).

Aqui seguem os códigos para EnqueueEmptyHeader, ReadChunk e ProcessRequest:


		private void EnqueueEmptyHeader(InputStreamContext ctx)
		{
			Log.LogMessage("Writing empty header");
			InputStreamContext inputContext = new InputStreamContext(ctx.ConnectionContext, bufferSize);
			inputContext.EmptyResponse = true;
			ctx.ConnectionContext.InputQueue.Enqueue(inputContext);					
		}

		private void ReadChunk(InputStreamContext ctx)
		{			
			int bytesToRead = ctx.RemainingBytes > ctx.Buffer.Length ? ctx.Buffer.Length : (int)ctx.RemainingBytes;
			ctx.ConnectionContext.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
		}

		private void ProcessRequest(InputStreamContext ctx)
		{
			lock (ctx.ConnectionContext)
				ctx.ConnectionContext.InputQueue.Enqueue(ctx);			
			BeginRead(ctx.ConnectionContext);			
		}

Vemos aqui o ProcessRequest na verdade não processa nada. Somente joga o contexto a ser processado numa queue que será processada numa segunda Thread em ProcessInputQueue.

Os métodos a seguir estão na classe InputStreamContext. Confesso que as responsabilidades das classes não estão tão claras como eu gostaria. Ainda cabe bastante refatoração aqui.


		public void ProcessHeaderAfterRead()
		{
			this.headerRead = true;
			this.remainingBytes = BitConverter.ToInt64(this.Header, 0);
		}

		public void ProcessChunkAfterRead(int bytesRead)
		{
			this.remainingBytes -= bytesRead;
			this.RequestStream.Write(this.Buffer, 0, bytesRead);
		}

Basicamente estes métodos inicializam as propriedades internas baseados no conteúdo do buffer recebido. O “AfterRead” em seus nomes são propositais que são atualizações que só são realizadas após a leitura ter sido finalizada (callback do BeginRead).

A seguir, explicamos como funciona o processamento do request. Ao receber a conexão, o método da classe ConnectionContext StartProcessingInputQueue é chamado. Este método na verdade somente inicia uma segunda thread que fica monitorando a input queue:


		public void StartProcessingInputQueue()
		{
			inputQueueThread = new Thread(ProcessInputQueue);
			inputQueueThread.Start();
		}

		public void ProcessInputQueue()
		{
			while (true)
			{
				InputStreamContext ctx = null;
				lock (this)
				{
					if (inputQueue.Count > 0)
						ctx = inputQueue.Dequeue();
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				OutputStreamContext outputContext = new OutputStreamContext(this);
				if (ctx.EmptyResponse)
				{
					outputContext.EmptyResponse = true;
					byte[] buffer = BitConverter.GetBytes((long)0);
					ClientStream.BeginWrite(buffer, 0, buffer.Length, tcpServer.BeginWriteCallback, outputContext);
				}
				else
				{
					ctx.RequestStream.Seek(0, SeekOrigin.Begin);
					streamUtil.ProcessClientBigRequest(connString, ctx.RequestStream, outputContext.OutputStream, false, null);
					outputContext.OutputStream.Seek(0, SeekOrigin.Begin);

					byte[] buffer = BitConverter.GetBytes(outputContext.OutputStream.Length);
					ClientStream.BeginWrite(buffer, 0, buffer.Length, tcpServer.BeginWriteCallback, outputContext);

					outputContext.WriteCompleted.WaitOne();
				}	
			}
		}

Esta segunda thread tem um comportamento muito similar à thread do cliente que monitora a input queue. O que vale ressaltar aqui é o “WriteCompleted” que é mais um manual reset event. Isso gera um efeito muito divertido pois como as respostas são devolvidas em pedaços de forma assíncrona. Nada impede que dois requests comecem a ser devolvidos simultaneamente para o cliente, por isso esse reset event, espera um aviso do BeginWriteCallback para saber quando pode começar a descarregar o segundo request.

A ausência deste controle gera um efeito interessante, pois invés de mandar o primeiro pedaço do primeiro request, depois o segundo pedaço do primeiro request e assim sucessivamente, o servidor acaba mandando o primeiro pedaço do primeiro request, o primeiro pedaço do segundo request, depois o segundo pedaço do primeiro request e assim sucessivamente. Aqui é um problema e também uma oportunidade de otimização. Acredito que se criar o cliente preparado para receber mais de um request quase que simultaneamente haverá um maior consumo de memória, porém melhor aproveitamento da CPU (menos espera) por parte do servidor. Divertido, né?

A lógica de descarregamento através do BeginWrite é a mesma. A única confusão é que na tentativa de isolar melhor as responsabilidades das classes (tarefa que ainda não conclui), o callback ficou na classe TcpTestServer2 e o processamento da fila na ConnectionContext.

Aqui segue o código de BeginWriteCallback:


		public void BeginWriteCallback(IAsyncResult result)
		{
			OutputStreamContext ctx = result.AsyncState as OutputStreamContext;
			ctx.ConnectionContext.ClientStream.EndWrite(result);

			if (ctx.EmptyResponse)
				return;

			byte[] buffer = new byte[bufferSize];
			int bytesToWrite = ctx.OutputStream.Read(buffer, 0, buffer.Length);

			if (bytesToWrite > 0)
			{
				ctx.ConnectionContext.ClientStream.BeginWrite(buffer, 0, bytesToWrite, BeginWriteCallback, ctx);
			}
			else
			{
				Log.LogMessage("Finished writing ");
				ctx.WriteCompleted.Set();
			}
		}
	}

A idéia é a mesma. Enquanto não chega no fim da stream, manda um pedaço. Quando chega, avisa o método ProcessInputQueue que ele pode descarregar a próxima stream (utilizando o WriteCompleted para fazer a sincronização).

Conclusão

Agora é a parte divertida. Segue o comparativo desta abordagem contra as demais estudadas até aqui:

Como vemos, meu primeiro objetivo que era quebrar o número do Websphere MQ foi atingido. Conseguimos com esta abordagem transferir 1 milhão de registros em 355,43 segundos ou 5,92 minutos.

A abordagem utilizada para a medição foi a mesma, utilizando o mesmo volume de dados e as mesmas máquinas. 10 execuções do novo método foram executadas e o número apresentado é a média dos 10.

Os ponto que acho importante mencionar é que a abordagem é válida, porém a implementação ainda precisa ser melhorada, principalmente no que tange a recuperação de falhas. Como a comunicação é toda assíncrona é preciso estabelecer um mecanismo mais eficiente de conversa entre cliente e servidor para conseguir continuar de onde parou no caso de uma quebra de conexão.

Em relação aos outros métodos para 1 milhão de registors, temos o seguinte comparativo:

Método Tempo em segundos (1 milhão de registros) % Diferença absoluta
Tcp Client 2 (multi thread) 355,43
Websphere MQ 417,36 17,42 61,93
Http Request 428,95 20,68 73,52
File Transfer (extração simples) 495,27 39,34 139,84
File Transfer (request/response) 657,09 84,87 301,66
Tcp Client (multi thread) 671,37 88,89 315,94
Net TCP (lotes) 788,20 121,76 432,77
MSMQ 836,12 135,24 480,69
SOAP (lotes) 1059,69 198,14 704,26
TCP Client (single thread) 1079,72 203,78 724,29

O novo servidor foi ~17% mais rápido do que a abordagem utilizando Websphere MQ.

Peço desculpas pelo post meio “work in progress”, mas achei melhor publicar assim mesmo do que esperar mais alguns meses pra finalizar toda a idéia.

Código fonte

O código fonte ainda está bem bagunçado, mas está no GitHub: https://github.com/ericlemes/IntegrationTests.

Slides da Palestra no TDC 2012

Pessoal,

Seguem os slides usados na apresentação do TDC2012: TDC2012

Seguir

Obtenha todo post novo entregue na sua caixa de entrada.

Junte-se a 347 outros seguidores