Categoria: Arquitetura

Joseph Yoder no Brasil

6a0120a85dcdae970b012877701400970c-piJoseph Yoder estará ministrando um curso de TDD e Refactoring no Brasil!

Pra quem não conhece o Yoder, ele é um dos fundadores da Refactory, Inc., ao lado do Ralph Johnson e associados como a Rebecca Wirfs-Brock.

Sim, o Ralph Johnson, é um dos quatro da Gang-of-Four, autores do Design Patterns: Elements of Reusable Object-Oriented Software, na minha opinião, uma das pedras fundamentais da nossa profissão.

Em resumo, uma oportunidade talvez única realizar esse curso e conhecer uma figura ímpar como o Yoder. As inscrições podem ser feitas neste link

Inversão de Dependências

Introdução

unclebob

Neste post, gostaria de falar um pouco de umas impressões que tive sobre o livro “Designing Object-Oriented C++ Applications Using the Booch Method”, do Robert C. Martin (aka Uncle Bob).

Comecei a ler este livro com o objetivo de “beber na fonte”, voltar lá para os primórdios do início do desenvolvimento orientado a objeto para tentar entender como chegamos na complexidade que atualmente temos e também tentar entender se essa complexidade que estamos inserindo em nome da “orientação a objetos” tem fundamento mesmo e de onde vem esse fundamento.

Não sei se poderia ter uma fonte melhor para atingir meus objetivos. Este livro é (pasmem!) de 1995 e soa bastante atual. Vamos lembrar onde estávamos em 1995. Não sei qual era a realidade de vocês, mas na minha não se falava em UML, a escola técnica que freqüentei explicava o que é DFD (diagrama de fluxo de dados) e na minha realidade imperava o desenvolvimento em dBase, Clipper, e começávamos a sonhar em desenvolver aplicações “windows” em VB (não sei nem se na versão 3 ainda) e no meu caso Delphi 1.0, 16 bits. Windows 3.1 era talvez o sistema operacional mais popular.

É nesse contexto que o Uncle Bob falava sobre alguns tópicos que pretendo resumir aqui.

Gerenciando complexidade

É engraçado ler um livro de 1995 falando de uma “crise de software” em relação à como gerir a complexidade no desenvolvimento de software. Nada poderia ser mais atual. Ele também cita o aumento da demanda aos usuários (o aumento da expectativa) comparado com a velocidade que as técnicas de desenvolvimento de software evoluem.

O Uncle Bob cita neste livro que o grande motivador do design orientado a objetos era buscar uma forma de gerenciar melhor a complexidade do software.

Por que OOD é diferente?

Este ponto me gerou uma reflexão bastante profunda em como fazemos software. Se eu pensar no começo da minha carreira, basicamente o pensamento era baseado na análise estruturada. Pensávamos nos dados e em pequenos processos que extraiam os dados para o usuário, o usuário trabalhava esses dados e devolvia para o repositório. Tudo sempre muito orientado a dados, que talvez seja uma herança do COBOL/Mainframe que carregamos até hoje.

A estrutura de uma aplicação era muito simples. Um menu, em cada menu uma tela e em cada tela uma funcionalidade. Ninguém parava de fato para pensar em como organizar esta lógica ou como gerenciar melhor as dependências. Cada funcionalidade era gerada do zero, geralmente usando algumas bibliotecas de infra-estrutura com uma conotação fortemente técnica, quando muito existia uma “herança de form” (no caso do Delphi) para agilizar o desenvolvimento e abstrair a lógica de como o usuário operava a “tela”. Pouco falava-se de classes de domínio, camada de negócio ou qualquer coisa do tipo.

A explicação deste ponto dada pelo livro é muito interessante. Vou fazer uma tradução livre da resposta dada no livro para a pergunta “Por que design orientado a objetos é diferente?”:

UncleBob-OOD

“Imagine um design estruturado que consiste em módulos dispostos através de uma árvore. A raiz da árvore é o módulo principal. Os nós da árvore são módulos utilitários que atendem funções de baixo nível. A árvore representa uma hierarquia de chamadas. Cada módulo chama o módulo que está imediatamente abaixo e é chamado por módulos imediatamente acima.

Quais módulos capturam as políticas de importância primária da aplicação? Os que estão próximos do topo, é claro. Os módulos que estão abaixo simplesmente implementam os menores detalhes. Os módulos no topo estão preocupados com os maiores problemas e implementam as maiores soluções.

Assim, quanto mais alto se está na hierarquia, mais abstratos os conceitos são, mais pertinentes eles são para o domínio do problema da aplicação e menos pertinentes para o domínio da solução da aplicação. Quanto mais abaixo os módulos estão na hierarquia, mais detalhados os conceitos se tornam, menos relevantes eles são para o domínio do problema e mais relevantes eles são para o domínio da solução.

Dessa forma, os módulos que estão no topo chamam os módulos dele. Em outras palavras, os módulos que contem as abstrações relevantes dependem dos módulos que tem detalhes irrelevantes! Isso significa que alterações feitas nos detalhes afetam as abstrações e qualquer tentativa de reusar alguma dessas abstrações carregam junto os detalhes que delas dependem.

Em OOD tentamos inverter essas dependências. Nós criamos detalhes que dependem fortemente destas abstrações. Essa inversão de dependências é a principal diferença entre OOD (design orientado a objeto) e as técnicas tradicionais. ”

Vou tentar explicar o que entendi disso com um exemplo prático de uma implementação arcaica.

Se voltarmos para a época do dBase/Clipper (com uma idéia bem DOS), tínhamos o menu principal da aplicação que chamava diretamente um programa que cadastrava o pedido. Esse programa pegava o input do usuário e jogava para um programa que gravava o pedido. Esse por sua vez, chamava uma série de subprogramas para validar CEP, endereço, CPF, CGC (nesse tempo não tinha CNPJ!). Depois esse mesmo programa gravava os dados na tabela (tabelas DBF, é claro!). Todo o processamento de quando o pedido era feito, por exemplo, sensibilização de estoque era feito chamando também um subprograma de estoque que por sua vez puxava todos seus subprogramas de validação e também sua lógica de persistência no banco de dados.

Qualquer mudança realizada na lógica de estoque, poderia quebrar a lógica do pedido. Qualquer nova necessidade do pedido como por exemplo, gerar uma nota fiscal, demandava uma alteração nos programas de pedido. Ou seja, a lógica macro demandada por este pedido estava dependendo de detalhes de implementação de pedido.

As abordagens de separação em camadas resolvem grande parte do isolamento da lógica de apresentação, negócio e persistência, porém, algumas inerentes ao domínio de negócio são pensadas aqui. Por exemplo, quando falamos de sensibilização de estoque, se pensarmos num mesmo sistema, numa mesma estrutura de base de dados, é relativamente simples pensar em fazer uma chamada e atualizar o estoque, porém, se pensarmos num sistema externo, acessível através de um web service, por exemplo, essa lógica começa a ficar mais complexa e cada vez mais o pedido acoplado nisso.

A idéia para quebrar este conceito seria fazer uma abstração da interface de estoque, de forma que com diferentes implementações (transparentes para o pedido), o comportamento de como atualizar o estoque muda, sem a necessidade do pedido mudar. Aqui é que está essa idéia de “inverter as dependências”. O pedido não depende de como o estoque está implementado.

Poderia-se pensar mais além, e fazer uma abstração para “ações” executadas no domínio após a conclusão do pedido. Toda a lógica de gerar uma nota fiscal ou sensibilizar um estoque poderia ser implementada através dessas ações.

Fiz questão de usar um exemplo baseado no domínio de negócio para não “sujarmos” o conceito com a idéia de separação em camadas.

O interessante deste exemplo, é que mesmo que usemos técnicas modernas de domain driven design, patterns e separações de camadas, poderíamos continuar não enxergando essas abstrações e deixando o pedido acoplado na nota fiscal e no estoque.

Vale lembrar que utilizei esse domínio como um exemplo. Nem sempre colocar toda essa separação aqui pode ser uma boa idéia dependendo do sistema e da flexibilidade necessária.

Open-Closed Principle

Talvez este livro seja a publicação que definiu este princípio. A idéia aqui é que a partir do momento que encontramos essa abstração e conseguimos aplicar a inversão de dependências (no sentido que a lógica macro não depende dos detalhes), conseguimos criar classes que estão fechadas para alterações e abertas para extensões. Isso é considerado um dos principais princípios do design orientado a objetos.

No exemplo acima, a partir do momento que encontramos essa abstração para o pedido, ele passa a estar fechado para alterações (não preciso mexer mais nele para implementar novas ações) e aberto para extensões (implementando novas ações, ele pode ter seu comportamento extendido).

Contextos e Métricas

Outros pontos muito legais abordados neste livro é a idéia de contextos e métricas. No livro, Uncle Bob nos trás a idéia que a partir do momento que enxergamos as principais abstrações do software, conseguimos criar contextos. Ou seja, uma abstração e todas as implementações por trás dela consistem numa importante “unidade” no software.

A proposta é que essa unidade seja usada para divisão e composição de times de desenvolvimento, ou seja, como separar as atividades. É interessante perceber naquela época a preocupação dele com isso, ou seja, como os desenvolvedores conseguem ter alguma autonomia no design e continuar trabalhando de forma colaborativa. A idéia é que a partir do momento que exista uma convenção em como visualizar as abstrações, essa divisão é possível.

Para complementar ainda ele propõe um modelo de métricas para aferir se as dependências da aplicação estão bem equilibradas ou não, se existem classes que carregam muitas dependências. Em 1995!

Fazendo um paralelo com o exemplo aqui dado, um time poderia continuar a implementação do pedido e como acessar essa interface de ações. Outros times poderiam trabalhar no estoque e na nota fiscal com alguma liberdade no design e com a garantia que as coisas se encaixarão no futuro (garantidas pela interface).

Conclusão

Quando resolvi ler uma publicação antiga, escrita 18 anos antes deste post, meu principal objetivo foi tentar entender as motivações do design orientado a objeto e cruzar com diversas discussões que participo hoje. Vejo muita complexidade sendo adicionada desnecessariamente no software em nome da orientação a objetos.

Quando vi o conceito de “inversão de dependências” do Uncle Bob, na hora me identifiquei com a forma dele pensar. Hoje vejo softwares escritos em diversas camadas, utilizando várias tecnologias modernas, mas que não conseguem enxergar essas abstrações dentro do contexto de negócio e aplicar o conceito de inversão de dependências.

Alguns exemplos dessas situações, consigo citar de forma breve:

  • Aplicações em “n” camadas, utilizando MVC, ORM e outras coisas modernas, mas que possuem o problema de rateio n vezes no modelo, escrito e replicado em diversos lugares. Por que não criar uma abstração para isso?
  • Aplicações com “n” interfaces de integração que devido à diferença de formato, reescrevem toda a lógica de extração com regra de negócio replicada em várias interfaces. Geralmente as chamadas para interfaces de integração também estão diretamente acopladas no código.

Era isso!

Integrações entre Sistemas – Parte 13 – Rabbit MQ

Objetivo

Durante minhas pesquisas, tomei conhecimento do Rabbit MQ (http://www.rabbitmq.com/) que trata-se de uma plataforma open-source para mensageria, disponível em diversos sistemas operacionais (Windows, Linux/Unix, Mac OS X e Amazon EC2).

Resolvi conhecê-lo e aproveitei para também incluí-lo junto aos demais benchmarks.

Instalação

Utilizei o Rabbit MQ em plataforma Windows. O processo de instalação dele não tem grande segredo. A única curiosidade é que por ele ter sido escrito em Erlang (http://erlang.org/), o runtime do Erlang precisa ser instalado.

Outro componente interessante de ser instalado (você vai precisar dele para rodar os exemplos) é o cliente .NET para o RabbitMQ. Você o encontra na página de downloads do Rabbit MQ. A documentação da API pode ser encontrada na página de documentação do Rabbit MQ. As vezes eu me surpreendo com a simplicidade e a organização de projetos open source.

Operação

O Rabbit MQ é relativamente simples. Não explorei todas as suas features, apenas o necessário para executar este teste/exemplo.

Existe um “Command Prompt” do Rabbit MQ (instalado junto com o servidor).

Alguns comandos úteis:

rabbitmqctl status

Mostra uma série de indicadores sobre o status do servidor.

rabbitmqctl environment

Mostra uma série de informações sobre o ambiente do servidor, entre elas uma das mais úteis que é a porta tcp que está sendo ouvida (default 5672).

rabbitmqctl list_queues

Lista as filas criadas.

rabbitmq-plugins enable rabbitmq_management

Habilita o plugin de gerenciamento do RabbitMQ. Uma vez que a instalação do plugin é realizada, basta reiniciar o servidor e acessá-lo através da URL http://localhost:15672/. Não esqueça de trocar localhost pelo IP ou hostname do seu servidor. O usuário default é guest/guest.

Cliente Rabbit MQ

A única coisa que fiz aqui foi pegar minha implementação de Websphere MQ e trocar as chamadas para o Rabbit MQ. Sim, eu poderia ter criado uma abstração mas como meu objetivo aqui é apenas entender como funciona e realizar alguns benchmarks, resolvi ser objetivo.

A implementação do cliente é muito simples. O método principal que executa a Task MSBuild segue:

		public override bool Execute()
		{
			Stopwatch watch = new Stopwatch();
			watch.Start();

			Log.LogMessage("Starting Rabbit MQ transfer with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");

			ConnectionFactory factory = new ConnectionFactory();
			factory.HostName = HostName;
			IConnection conn = factory.CreateConnection();
			IModel channel = conn.CreateModel();			
			channel.QueueDeclare(OutputQueueName, false, false, false, null);			
			IBasicProperties props = channel.CreateBasicProperties();
			props.ContentType = "text/xml";
			props.DeliveryMode = 2; //persistent

			inputChannel = conn.CreateModel();			
			inputChannel.QueueDeclare(InputQueueName, false, false, false, null);			

			messageCount = TotalBatches;

			System.Threading.Tasks.Task.Factory.StartNew(ProcessInputQueue);

			int count = 1;
			for (int i = 0; i < TotalBatches; i++)
			{
				MemoryStream ms = new MemoryStream();
				StreamUtil.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
				channel.BasicPublish("", OutputQueueName, props, ms.GetBuffer());

				count += BatchSize;
				Log.LogMessage("Sent " + count.ToString());
			}

			finishedEvent.WaitOne();

			watch.Stop();
			Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");
			conn.Close();

			return true;
		}

ConnectionFactory é o objeto da API que encapsula uma conexão com uma fila. A criação do channel, QueueDeclare seguem o modelinho da API do Rabbit MQ.

O método que efetivamente faz a publicação na fila é o channel.BasicPublish. Existem algumas regras para publicações com tópicos, publish/subscribe, mas para o nosso exemplo, somente o BasicPublish resolve o problema.

Este método também inicia uma Task (que internamente inicia uma segunda thread) para processar a fila de resposta para o cliente, no método ProcessInputQueue. O código dele segue:

		private void ProcessInputQueue()
		{			
			while (true)
			{
				BasicGetResult result = inputChannel.BasicGet(InputQueueName, true);
				if (result == null)
				{
					Thread.Sleep(250);
					continue;
				}
				MemoryStream ms = new MemoryStream(result.Body);
				messageCount--;

				Log.LogMessage("Waiting for more " + messageCount.ToString());

				StreamUtil.ImportarStream(ConnString, ms);

				if (messageCount <= 0)				
					break;									
			}

			finishedEvent.Set();
		}

Mesma dinâmica do Websphere MQ. Aqui fico num loop infinito, pegando as mensagens. Sempre que não tem nada a processar (result == null), dou uma pequena dormidinha na thread somente para não matar a CPU da máquina. Pega a resposta e processa. Sem grandes segredos.

Servidor RabbitMQ

O servidor segue a mesma dinâmica:

		public override bool Execute()
		{
			Log.LogMessage("Starting RabbitMQ Server");

			ConnectionFactory factory = new ConnectionFactory();
			factory.HostName = "localhost";
			IConnection conn = factory.CreateConnection();

			IModel inputChannel = conn.CreateModel();
			inputChannel.QueueDeclare(InputQueueName, false, false, false, null);

			IConnection conn2 = factory.CreateConnection();			

			IBasicProperties props = inputChannel.CreateBasicProperties();
			props.ContentType = "text/xml";
			props.DeliveryMode = 2; //persistent

			IModel outputChannel = conn2.CreateModel();			
			outputChannel.QueueDeclare(OutputQueueName, false, false, false, null);			

			int count = 0;

			while (true)
			{
				BasicGetResult result = inputChannel.BasicGet(InputQueueName, true);
				if (result == null)
				{
					Thread.Sleep(250);
					continue;
				}

				MemoryStream ms = new MemoryStream(result.Body);
				result = null; // let GC work.
				MemoryStream respStream = new MemoryStream();

				StreamUtil.ProcessClientBigRequest(ConnString, ms, respStream, false, null);

				outputChannel.BasicPublish("", OutputQueueName, props, respStream.GetBuffer());
				respStream = null; //let GC work

				count++;

				Log.LogMessage("Processed " + count.ToString());

			}

Comparativo dos resultados

Como nem tudo são rosas, a máquina que estava usando como servidor resolveu morrer e tive que reinstalar tudo novamente. Por tabela, perdi a confiança nos números que tinha executado anteriormente e acabei reexecutando todos os testes no novo ambiente, com o objetivo de manter a premissa de “mesmo payload, mesmo ambiente” que utilizei desde o início dessa série.

Mesmo assim, continuei executando 10 vezes cada teste com cada um dos volumes (20k, 50k, 500k, 1M) e o número apresentado aqui é a média dos 10, para evitar diferenças que ocorrem numa ou outra execução devido a cache de banco, paginação de memória, garbage collector, etc.

Os números são:

integracoes1

integracoes2

Como os números mostram, o desempenho do RabbitMQ não foi satisfatório. Fiquei bastante frustrado com esse resultado. Apesar de toda a simplicidade da implementação, da instalação, etc., o número ficou praticamente a mesma coisa do que usar Web Services em SOAP.

Além disso, apesar de todo o trabalho para montar um novo ambiente (que mudou de 32 para 64 bits na máquina servidora), houve uma piora significativa dos números do File Transfer. Vou investigar mais a fundo as razões desta piora dado o novo ambiente instalado.

Código fonte

O código fonte das implementações acima foi atualizado no Git Hub: https://github.com/ericlemes/IntegrationTests

Conclusão

Eu vi várias pessoas recomendando o RabbitMQ, o que me motivou a realizar esta pesquisa comparando também com o MSMQ e Websphere MQ. Acho que ainda vale a pena uma investigação mais aprofundada de como otimizar o RabbitMQ antes de simplesmente descartá-lo. Este trabalho é bastante superficial.

Mas acho que a principal conclusão que eu chego é que antes de sair utilizando uma nova tecnologia, vale a pena buscar informações sobre ela, testar, brincar, comparar. Baseado nos números aqui obtidos, numa implementação entre plataformas distintas (java/.NET por exemplo), ficaria com SOAP e numa implementação apenas em plataforma Windows, com o MSMQ.

Integrações entre Sistemas – Parte 12 – MSMQ, Revisitado

Objetivo

Na parte 7 dessa série vimos uma implementação baseada em MSMQ que ficou muito atrás da implementação em Websphere MQ e eu tinha me comprometido a revisá-la. O objetivo deste post é apresentar essa revisão.

A princípio imaginei que o resultado ruim estava relacionado a filas transacionais, mas na prática percebi que o problema estava principalmente relacionado ao tratamento do recebimento da fila de forma assíncrona.

Melhorias realizadas

Realizei algumas melhorias pequenas no código, substituindo o loop deselegante na thread principal por um ManualResetEvent e outras melhorias menores, mas o ponto chave estava no tratamento do evento “ReceiveCompleted” da fila.

Criei uma Queue e uma segunda thread para tratar de forma paralela, não obstruindo o caminho do MQ com código de processamento e deixando o canal de recepção de mensagens livre para dar mais vazão à fila de entrada. Ficou assim:


		private void inQueue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
		{		
			
			lock (messageInputQueue)
			{
				messageInputQueue.Enqueue(e.Message);
				enqueueCount++;
				Log.LogMessage("Enqueued " + enqueueCount.ToString() + " messages");
			}
		
			if (enqueueCount < TotalBatches)
				inputQueue.BeginReceive();
		}

messageInputQueue trata-se da Queue a que me referi. É utilizado um “lock” para evitar concorrência entre a thread que empilha e a que desempilha as mensagens.

Este é o código que trata o processamento das mensagens, executado através de uma Task:

		private void ProcessInputQueue()
		{
			while (true)
			{
				Message msg = null;

				lock (messageInputQueue)
				{
					if (messageInputQueue.Count > 0)
						msg = messageInputQueue.Dequeue();
				}

				if (msg == null)
				{
					Thread.Sleep(50);
					continue;
				}
				
				StreamUtil.ImportarStream(ConnString, msg.BodyStream);
				Log.LogMessage("Processed message " + messageCount.ToString());					
				messageCount--;					

				if (messageCount <= 0)
					break;
			}
			processingDone.Set();
		}

Resultados

Abaixo segue os resultados completos inserindo também as execuções do MSMQ com filas transacionais e não-transacionais:

Integracoes-chart1

Integracoes-chart2

Como os resultados mostram, a diferença do Websphere MQ para o MSMQ não foi significativa, mas as diferenças entre a implementação anterior em MSMQ para a nova é absurdamente grande (praticamente o dobro!).

Código fonte

O código fonte completo está atualizado no Git Hub: https://github.com/ericlemes/IntegrationTests

Integrações entre Sistemas – Parte 11 – TCP Server com Task Parallel Library

Objetivo

Tcp Server… de novo??

Sim. Na verdade essa história começou logo que eu publiquei o post da parte 10. Meu amigo Alexandre Costa (aka Magoo) estava acompanhando o post depois que discutimos o assunto num café e logo que saiu, ele me mandou via twitter: “Por que você não usou a TPL?”.

Estragou meu dia. Toca a procurar sobre a tal da TPL e entender o que muda na prática.

A idéia aqui foi essa, fui atrás do que é a lib, uma forma de aplicá-la e ver o quanto agrega ao problema como um todo.

TPL ou Task Parallel Library

A Task Parallel Library é uma lib para trabalhar com paralelismo no .NET. A documentação da mesma encontra-se em: http://msdn.microsoft.com/en-us/library/dd460717.aspx.

No meu problema, percebi que tinha 2 situações em específico que ela poderia ajudar:

Vamos à prática.

Tcp Server 3

Nesta versão do cliente e servidor do TCP, o que muda na prática é:

		public void ProcessInputQueue()
		{
			while (true)
			{
				TcpClient2InputStreamContext ctx = null;
				lock (inputQueue)
				{
					if (inputQueue.Count &gt; 0)
						ctx = inputQueue.Dequeue();					
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				Parallel.Invoke(() =&gt;
				{
					Stream inputStream = ctx.InputStream;
					ctx.InputStream = null;
					inputStream.Seek(0, SeekOrigin.Begin);
					Log.LogMessage(&quot;Before import stream &quot; + ctx.ID.ToString());
					StreamUtil.ImportarStream(ConnString, inputStream, Log);
					Log.LogMessage(&quot;Stream imported &quot; + ctx.ID.ToString());
					responsesProcessed++;
				});

				if (responsesProcessed == TotalBatches)
					break;
				
			}

			processingDone.Set();
		}

Fiz outras alterações no código, colocando um ID na pergunta e devolvendo pelo servidor, para melhorar situações de debug. Eu realmente peguei uma série de bugs nesses meus exemplos relacionados a sincronização. Ainda não está totalmente estável, mas como disse anteriormente, está longe de ser uma implementação de referência para o assunto.

Tcp Server 4

Além das melhorias feitas na versão 3, na 4 eu implementei também o pattern assíncrono proposto. Na prática, tudo que é BeginRead, EndRead e BeginWrite e EndWrite são substituídos por construções na TPL. Isso tudo foi tratado no cliente e no servidor.

Na prática, não muda nada. Existe a vantagem de poder inserir um callback para tratamento de exceções (exceções em threads não param o fluxo principal e são bem chatas de debugar) e não ter que chamar o “EndRead” no callback (ele já é chamado automaticamente pela TPL).

Fica mais ou menos assim:

		private void WriteHeader(NetworkStream stream)
		{
			batchesSent++;

			if (batchesSent &gt; TotalBatches)
			{
				//Mark end of transfer.
				byte[] b = new byte[sizeof(long)];
				b = BitConverter.GetBytes((long)0);
				stream.Write(b, 0, sizeof(long));
				sendDone.Set();
				return;
			}

			Log.LogMessage(&quot;Sending request &quot; + batchesSent.ToString());

			TcpClient2OutputStreamContext ctx = new TcpClient2OutputStreamContext();
			ctx.OutputStream = new MemoryStream();
			ctx.ClientStream = stream;
			StreamUtil.GenerateBigRequest(ctx.OutputStream, false, recordCount, recordCount + (BatchSize - 1));
			ctx.OutputStream.Seek(0, SeekOrigin.Begin);								

			byte[] header = BitConverter.GetBytes(ctx.OutputStream.Length);

			Task.Factory.FromAsync&lt;byte[], int, int&gt;(stream.BeginWrite, stream.EndWrite, header, 0, header.Length, ctx).ContinueWith(BeginWriteCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);			

			recordCount += BatchSize;
		}

O trecho que compõe a chamada da TPL é o Task.Factory.FromAsync. Vou tentar explicar:

  • No FromAsync, significa que os 3 parâmetros tratados pela função serão dos tipos especificados (que são os parâmetros da função BeginWrite)
  • stream.BeginWrite é o método que será chamado, usando o pattern assíncrono
  • stream.EndWrite é o método que será chamado imediatamente antes do callback. Ele é chamamdo automaticamente pela biblioteca, por isso foi removido do callback (no caso BeginWriteCallback)
  • header, 0 e header.Length são os parâmetros que serão passados para BeginWrite
  • ctx é o objeto de estado que será devolvido também no callback.
  • Em seguida, pode-se encadear vários “ContinueWith”. O primeiro, BeginWriteCallback é de fato para tratar o processamento da Stream. O segundo será chamado somente no caso de erro, para mandar para a console o erro

Agora veremos como fica o callback:

		private void BeginWriteCallback(Task task)
		{
			TcpClient2OutputStreamContext ctx = (TcpClient2OutputStreamContext)task.AsyncState;			

			if (ctx.OutputStream.Position &lt; ctx.OutputStream.Length)
			{
				byte[] buffer = new byte[bufferSize];
				int read = ctx.OutputStream.Read(buffer, 0, buffer.Length);
				
				Task t = Task.Factory.FromAsync&lt;byte[], int, int&gt;(ctx.ClientStream.BeginWrite, ctx.ClientStream.EndWrite, buffer, 0, read, ctx);
				t.ContinueWith(BeginWriteCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);		
			}
			else
			{
				WriteHeader(ctx.ClientStream);
			}
		}

O callback recebe como parâmetro a Task. A propriedade AsyncState é quem devolve o objeto de estado passado no chamada da função como parâmetro.

Outro exemplo interessante é a leitura. Como neste caso, existe o retorno da quantidade de bytes que foi lida na stream que é muito importante para o tratamento da leitura. Essa construção fica da seguinte forma:

		private void ReadHeader(NetworkStream stream)
		{
			contextCount++;
			TcpClient2InputStreamContext ctx = new TcpClient2InputStreamContext();
			ctx.ID = contextCount;
			ctx.ClientStream = stream;
			ctx.Header = new byte[sizeof(long)];
			ctx.HeaderRead = false;

			Task&lt;int&gt;.Factory.FromAsync&lt;byte[], int, int&gt;(stream.BeginRead, stream.EndRead, ctx.Header, 0, ctx.Header.Length, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); ;			
		}

Neste caso, usa-se a construção Task. Apenas isso muda. O callback fica da seguinte maneira:

		private void BeginReadCallback(Task&lt;int&gt; task)
		{
			TcpClient2InputStreamContext ctx = (TcpClient2InputStreamContext)task.AsyncState;
			int bytesRead = task.Result;
			Log.LogMessage(&quot;Read &quot; + bytesRead.ToString() + &quot; bytes&quot;);

			if (!ctx.HeaderRead)
			{
				Log.LogMessage(&quot;Reading Header&quot;);
				ctx.ResponseSize = BitConverter.ToInt64(ctx.Header, 0);
				ctx.TotalRead = 0;
				Log.LogMessage(&quot;Response Size: &quot; + ctx.ResponseSize.ToString());
				ctx.Header = null; //I don't need this buffer anymore. 
				ctx.HeaderRead = true;
				ctx.InputStream = new MemoryStream();
				ctx.InputStream.SetLength(ctx.ResponseSize);

				if (ctx.ResponseSize &gt; 0)
				{
					ctx.Buffer = new byte[bufferSize];
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) &gt; ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage(&quot;Reading &quot; + bytesToRead.ToString() + &quot; bytes&quot;);

					Task&lt;int&gt;.Factory.FromAsync&lt;byte[], int, int&gt;(ctx.ClientStream.BeginRead, ctx.ClientStream.EndRead, ctx.Buffer, 0, bytesToRead,
						ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); ;					
				}
				else
					receiveDone.Set();
			}
			else
			{
				ctx.TotalRead += bytesRead;
				Log.LogMessage(&quot;Writing &quot; + bytesRead.ToString() + &quot; to input stream&quot;);
				ctx.InputStream.Write(ctx.Buffer, 0, bytesRead);

				if (ctx.TotalRead &lt; ctx.ResponseSize)
				{
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) &gt; ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage(&quot;Reading &quot; + bytesToRead.ToString() + &quot; bytes&quot;);
					ctx.Buffer = new byte[bufferSize];

					Task&lt;int&gt;.Factory.FromAsync&lt;byte[], int, int&gt;(ctx.ClientStream.BeginRead, ctx.ClientStream.EndRead, ctx.Buffer, 0, bytesToRead, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted);
				}
				else
				{
					Log.LogMessage(&quot;Finished reading &quot; + ctx.InputStream.Length.ToString() + &quot; bytes&quot;);
					lock (inputQueue)
						inputQueue.Enqueue(ctx);

					ReadHeader(ctx.ClientStream);
				}
			}
		}

Os pontos relevantes é que o tipo recebido no callback também é Task. A forma de obter o objeto de estado é a mesma e a resposta do método BeginRead, vem pelo parâmetro task.Result. O resto, mesma coisa.

Os tempos

Agora é a parte divertida. Vou comparar as 3 coisas: O modo “roots” que eu fiz anteriormente, criando todas as threads e tratamentos manualmente (Tcp Server 2), a versão 3 que somente trata o processamento de forma paralela e a versão 4 que também implementa o pattern assíncrono. Seguem os números:

Integracoes-chart1

Integracoes-chart2

Código fonte

O código fonte está atualizado no git hub: https://github.com/ericlemes/IntegrationTests.

Ele tem uma série de coisas a melhorar, existem problemas de isolamento de responsabilidades e muita coisa a refatorar, porém, acredito que atinge o objetivo aqui proposto.

Conclusão

Como vemos nos números, as diferenças são insignificantes. Meu palpite é que a quantidade de paralelismo envolvida é muito pequena. O pesado do processamento está mesmo em tratar as streams de forma assíncrona com I/O não bloqueante (implementado no servidor 2).

Sobre a TPL, ela implementa uma sintaxe bastante elegante. Não pude estudá-la de forma exaustiva, mas a princípio não vejo nada que ela traga de novo em relação ao tratamento manual das threads.

Isso mais uma vez reforça minha idéia de aprender os conceitos na raiz. O princípio de como lidar com threads é o mesmo independente de como é a implementação. Muitas dessas bibliotecas tem por objetivo tornar a sintaxe apenas mais simples, o que pode ser as vezes uma armadilha. A simplicidade por vezes acaba escondendo o conceito que existe por tras da tecnologia fazendo com que a usemos de forma incorreta. Este exercício é bem didático neste sentido. Existe um consenso de que “sempre” paralelizar trás ganhos de performance o que na prática não se mostra verdadeiro.

Integrações entre Sistemas – Parte 10 – TCP Server, revisitado

Objetivo

Após algum tempinho sem escrever (o tempo vai ficando curto!), resolvi retormar alguns compromissos que tinha assumido. Um deles foi lá na parte quando abordei o TCP Server que percebi que a implementação estava muito longe de ser boa.

Retomei o tema, com o objetivo de “bater” o melhor número que eu tinha que era o Websphere MQ. Óbvio que o MQ tem diversas outras features melhor do que simplesmente performance, mas apenas para fins didáticos, gostaria de chegar num número melhor.

Melhorando a performance do TCP Server e Client

A primeira coisa que percebi é que o I/O síncrono e bloqueante é muito ruim. Na prática significa que a cada requisição, o cliente e o servidor estão esperando toda a transferência de dados terminar antes de começar a trabalhar. Eu queria evitar isso.

Por sorte, a própria API de Socket e Streams da framework do .NET permite trabalhar com I/O não bloqueante. A idéia é simples, ao dar um BeginRead, o código continua a executar e quando a transferência terminou um callback é chamado.

Na teoria é muito simples, porém, na implementação, debugar e implementar processamentos totalmente assíncronos não são lá tarefas muito simples.

O segundo principal ponto de otimização foi separar em threads diferentes o processamento das informações da transferência delas, visando tornar a espera por I/O a menor possível. Veremos como isso se traduz em código.

TCP Client, nova implementação

Antes de começar a falar em código, peço desculpas pela bagunça. Tem muito a refatorar neste código ainda, mas resolvi publicar mesmo assim, pois o ótimo é inimigo do bom. Aos poucos vou refinando para chegar numa implementação boa. Ainda não considero isso uma implementação de referência (está bem longe disso ainda!).

Pra variar um pouquinho, tudo foi implementado em MSBuild, logo, se tem dúvidas, vale a pena visitar os posts que explicam os conceitos do MSBuild.

O Execute da task do TCP Client segue:


		public override bool Execute()
		{
			Stopwatch watch = new Stopwatch();
			watch.Start();			

			Log.LogMessage("Starting TCP transfer (multi thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");

			TcpClient tcpClient = new TcpClient();
			tcpClient.Connect(HostName, Port);
			NetworkStream stream = tcpClient.GetStream();			

			WriteHeader(stream);
			ReadHeader(stream);

			Thread t = new Thread(ProcessInputQueue);
			t.Start();

			sendDone.WaitOne();
			receiveDone.WaitOne();

			tcpClient.Close();
			watch.Stop();

			processingDone.WaitOne();

			Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

			return true;
		}

Aqui praticamente nada muda. O código somente conecta no servidor TCP, WriteHeader e ReadHeader iniciam a transferência assíncrona (veremos em detalhes), iniciamos a segunda thread para processar as respostas do servidor. sendDone, receiveDone são “ManualResetEvent”, uma forma mais elegante de fazer a thread principal esperar o término do processamento pelas demais. Basicamente sendDone marca o término do envio das informações para o servidor, receiveDone o retorno das respostas e processingDone o término do processamento.

O ManualResetEvent, funciona de uma forma muito simples. Sempre que você criar uma nova instância dele, você o marca como “false” se deseja que o mesmo comece travado e no caso de encontrar um “WaitOne” no meio do caminho a execução fica travada até que outra thread dê um “Set” no objeto, liberando a execução do método principal. É isso que fazemos por aqui.

Sim, programação multi thread é bem mais complexa e envolve objetos de sincronização.

Agora detalharemos o método WriteHeader:


		private void WriteHeader(NetworkStream stream)
		{
			batchesSent++;

			if (batchesSent > TotalBatches)
			{
				//Mark end of transfer.
				byte[] b = new byte[sizeof(long)];
				b = BitConverter.GetBytes((long)0);
				stream.Write(b, 0, sizeof(long));
				sendDone.Set();
				return;
			}

			Log.LogMessage("Sending request " + batchesSent.ToString());

			TcpClient2OutputStreamContext ctx = new TcpClient2OutputStreamContext();
			ctx.OutputStream = new MemoryStream();
			ctx.ClientStream = stream;
			util.GenerateBigRequest(ctx.OutputStream, false, recordCount, recordCount + (BatchSize - 1));
			ctx.OutputStream.Seek(0, SeekOrigin.Begin);			

			byte[] header = BitConverter.GetBytes(ctx.OutputStream.Length);
			stream.BeginWrite(header, 0, header.Length, BeginWriteCallback, ctx);			

			recordCount += BatchSize;			
		}

A idéia do WriteHeader é iniciar a transferência de um batch de informações. Aqui apenas está o controle de quantos lotezinhos já foram enviados. Quando chegamos no último, mandamos um request cujos 8 primeiros bytes são “0”, marcando o término da transferência.

Como a transferência é toda assíncrona, ela segue este padrão do BeginWrite com callbacks. A lógica é simples. Vou mandar um pedaço de uma stream e quando ela chegar no servidor, o método BeginWriteCallback será chamado.

Mas como eu saberei continuar a transferência? A resposta está num objeto de estado que é passado para o callback. Esse objeto é a classe TcpClient2OutputStreamContext (desculpe pelo nome, não refatorei ainda!).

A instância desse objeto carrega todo o contexto necessário para continuar a transferência, ou seja, a Stream que estou enviando (gerada em GenerateBigRequest), a Stream na qual estou escrevendo (ClientStream). Lembrando que o código passa por BeginWrite e segue a vida, ou seja, não trava a execução.

Assim que a informação chegar no servidor, o método BeginWriteCallback é executado:


		private void BeginWriteCallback(IAsyncResult result)
		{
			TcpClient2OutputStreamContext ctx = (TcpClient2OutputStreamContext)result.AsyncState;
			ctx.ClientStream.EndWrite(result);			

			if (ctx.OutputStream.Position < ctx.OutputStream.Length)
			{
				byte[] buffer = new byte[bufferSize];
				int read = ctx.OutputStream.Read(buffer, 0, buffer.Length);
				ctx.ClientStream.BeginWrite(buffer, 0, read, BeginWriteCallback, ctx);
			}
			else
			{
				WriteHeader(ctx.ClientStream);
			}
		}


A primeira linha, obtém de volta o contexto. Esse é um “pattern” do framework para streams assíncronas, inclusive a chamada do EndWrite.

Então nesse caso, se a stream de saída ainda não chegou no fim, outro “chunk” (ou naco) é enviado. Se a stream chegou ao fim, parto pro próximo lote, chamando WriteHeader novamente.

O interessante do modelo assíncrono é que enquanto eu estou transferindo as informações do BeginWrite para o servidor, a execução continua, de forma que leituras pela resposta acontecem quase que simultaneamente com as escritas. Nada impede que antes de eu receber a chamada do callback, ocorra depois de um ReadHeader, ou BeginReadCallback que veremos a seguir:


		private void ReadHeader(NetworkStream stream)
		{
			contextCount++;
			TcpClient2InputStreamContext ctx = new TcpClient2InputStreamContext();
			ctx.ID = contextCount;
			ctx.ClientStream = stream;
			ctx.Header = new byte[sizeof(long)];
			ctx.HeaderRead = false;
			
			stream.BeginRead(ctx.Header, 0, ctx.Header.Length, BeginReadCallback, ctx);
		}

A idéia do ReadHeader é ler um “long” com o tamanho da stream que virá em seguida e a partir daí começar a ler a Stream como um todo. Isso significa que do lado do servidor já houve um processamento e ele já respondeu o tamanho da resposta para o cliente.

Aqui o modelo assíncrono é um pouquinho mais chato. O TcpClient2InputStreamContext amarra um buffer no mesmo (no caso a propriedade Header). Quando o callback for chamado, o conteúdo recebido do servidor será inserido neste buffer.

		private void BeginReadCallback(IAsyncResult result)
		{
			TcpClient2InputStreamContext ctx = (TcpClient2InputStreamContext)result.AsyncState;
			int bytesRead = ctx.ClientStream.EndRead(result);
			Log.LogMessage("Read " + bytesRead.ToString() + " bytes");
			
			if (!ctx.HeaderRead)
			{
				Log.LogMessage("Reading Header");
				ctx.ResponseSize = BitConverter.ToInt64(ctx.Header, 0);
				ctx.TotalRead = 0;
				Log.LogMessage("Response Size: " + ctx.ResponseSize.ToString());
				ctx.Header = null; //I don't need this buffer anymore. 
				ctx.HeaderRead = true;
				ctx.InputStream = new MemoryStream();
				ctx.InputStream.SetLength(ctx.ResponseSize);

				if (ctx.ResponseSize > 0)
				{
					ctx.Buffer = new byte[bufferSize];
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes");
					ctx.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
				}
				else
					receiveDone.Set();
			}
			else
			{
				ctx.TotalRead += bytesRead;
				Log.LogMessage("Writing " + bytesRead.ToString() + " to input stream");
				ctx.InputStream.Write(ctx.Buffer, 0, bytesRead);				

				if (ctx.TotalRead < ctx.ResponseSize)					
				{					
					int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead);
					Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes");
					ctx.Buffer = new byte[bufferSize];
					ctx.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
				}
				else
				{
					Log.LogMessage("Finished reading " + ctx.InputStream.Length.ToString() + " bytes");
					lock (inputQueue)
						inputQueue.Enqueue(ctx);					

					ReadHeader(ctx.ClientStream);
				}
			}														
		}

O BeginReadCallback (tá uma bagunça, eu sei!) faz o seguinte processamento: Primeiro ele verifica se já leu o “header” (os 8 bytes iniciais que indicam o tamanho da Stream). Se não leu, ele faz a leitura e inicializa todo o contexto (input stream, ou seja, onde será gravada a entrada, tamanho da resposta, etc). Se neste momendo recebeu uma resposta com 0 bytes, indica que acabou a transferência (receiveDone.Set). Se a resposta tem tamanho, ele inicia a leitura da stream do servidor, inicializando um buffer (a leitura é em pedaços) e chamando o BeginRead. Existe uma continha ali somente para saber o quanto ler do servidor (o tamanho máximo do buffer, ou o tamanho de bytes restantes a receber).

Quando o callback é chamado e o Header já está lido, é checado se a resposta já chegou ao fim. Caso já tenha chegado, o cliente joga na “inputQueue” (uma fila interna) o contexto, para que seja processado pela segunda thread (método ProcessInputQueue). Veja que existe um lock para evitar concorrência entre essas duas threads brigando pela queue. Quando ainda não chegou ao fim, somente mais um teco da stream é lida.

Agora pra finalizar, explicamos o método ProcessInputQueue:


		public void ProcessInputQueue()
		{
			while (true)
			{
				TcpClient2InputStreamContext ctx = null;
				lock (inputQueue)
				{
					if (inputQueue.Count > 0)
						ctx = inputQueue.Dequeue();
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				ctx.InputStream.Seek(0, SeekOrigin.Begin);
				Log.LogMessage("Before import stream " + ctx.ID.ToString());
				util.ImportarStream(ConnString, ctx.InputStream);
				Log.LogMessage("Stream imported " + ctx.ID.ToString());

				responsesProcessed++;
				if (responsesProcessed == TotalBatches)
					break;
			}

			processingDone.Set();
		}
	}

A primeira coisa que estranha deste método é o fato dele ficar num loop infinito. Isso ocorre porque se ele terminar de executar, a thread que o executa também finaliza e eu não quero isso. O trabalho dele é simples. Ele desempilha e se tem algo a processar, ele processa (util.ImportarStream).

Ele conta quanto já processou, quando percebe que chegou no total de lotes que tinha a processar, finaliza e libera a thread principal (processingDone.Set()). Tem também a tradicional dormidinha caso não tenha nada a processar para evitar 100% de CPU num dos cores.

Tcp Server, nova implementação

O Execute do nosso TCP Server, tem a seguinte implementação:


		public override bool Execute()
		{
			Log.LogMessage("Listening to TCP Connections on port " + Port.ToString());

			IPAddress ipAddress = IPAddress.Any;
			TcpListener tcpListener = new TcpListener(ipAddress, Port);
			tcpListener.Start();
			tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);

			serverFinished.WaitOne();
			return true;
		}


A idéia aqui é somente ficar esperando eternamente por um novo cliente. Quando este cliente conectar, BeginAcceptSocket é chamado.


		private void BeginAcceptSocketCallback(IAsyncResult result)
		{
			Log.LogMessage("Received socket connection");

			streamUtil = new StreamUtil();

			TcpListener tcpListener = (TcpListener)result.AsyncState;
			TcpClient tcpClient = tcpListener.EndAcceptTcpClient(result);			
			NetworkStream clientStream = tcpClient.GetStream();

			ConnectionContext ctx = new ConnectionContext(clientStream, ConnString, this);
			ctx.StartProcessingInputQueue();
			BeginRead(ctx);

			tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);
		}


Quando a conexão chega, um ConnectionContext é instanciado. Essa classe gerencia uma conexão no servidor. Ela inicia o processamento de sua “input queue” (veremos em detalhes mais a frente), inicia a leitura da stream do servidor (BeginRead) e deixa o servidor esperando outra conexão.

A leitura segue a mesma idéia do cliente, primeiro lendo os 8 primeiros bytes (tamanho do request) e em seguida lendo o conteúdo, através de um BeginRead:


		public void BeginRead(ConnectionContext connectionContext)
		{
			InputStreamContext ctx = new InputStreamContext(connectionContext, bufferSize);						
			ctx.ConnectionContext.ClientStream.BeginRead(ctx.Header, 0, sizeof(long), BeginReadCallback, ctx);					

		private void BeginReadCallback(IAsyncResult result)
		{			
			InputStreamContext ctx = result.AsyncState as InputStreamContext;
			int bytesRead = ctx.ConnectionContext.ClientStream.EndRead(result);			

			if (!ctx.HeaderRead)
			{
				readCount++;
				Log.LogMessage("Reading " + readCount.ToString());
				ctx.ProcessHeaderAfterRead();

				if (ctx.RemainingBytes == 0)
					EnqueueEmptyHeader(ctx);
				else
					ReadChunk(ctx);				
			}
			else
			{
				ctx.ProcessChunkAfterRead(bytesRead);

				if (ctx.RemainingBytes == 0)
					ProcessRequest(ctx);
				else
					ReadChunk(ctx);
			}		
		}
		}

O BeginReadCallback segue uma idéia muito similar. Se ainda não li o cabeçalho, leio o mesmo. Se recebi um cabeçalho com 0 bytes, terminou a transferência e eu insiro um header vazio para avisar o cliente que não tenho mais nada a escrever. Se tenho algo a ler, leio um chunk.

Se eu já li o cabeçalho chamo ProcessChunkAfterRead (ele somente joga para a stream interna dele o conteúdo do buffer). Se terminou a leitura, chamo ProcessRequest, se ainda não terminou, chamo ReadChunk (para pegar mais um pedaço).

Aqui seguem os códigos para EnqueueEmptyHeader, ReadChunk e ProcessRequest:


		private void EnqueueEmptyHeader(InputStreamContext ctx)
		{
			Log.LogMessage("Writing empty header");
			InputStreamContext inputContext = new InputStreamContext(ctx.ConnectionContext, bufferSize);
			inputContext.EmptyResponse = true;
			ctx.ConnectionContext.InputQueue.Enqueue(inputContext);					
		}

		private void ReadChunk(InputStreamContext ctx)
		{			
			int bytesToRead = ctx.RemainingBytes > ctx.Buffer.Length ? ctx.Buffer.Length : (int)ctx.RemainingBytes;
			ctx.ConnectionContext.ClientStream.BeginRead(ctx.Buffer, 0, bytesToRead, BeginReadCallback, ctx);
		}

		private void ProcessRequest(InputStreamContext ctx)
		{
			lock (ctx.ConnectionContext)
				ctx.ConnectionContext.InputQueue.Enqueue(ctx);			
			BeginRead(ctx.ConnectionContext);			
		}

Vemos aqui o ProcessRequest na verdade não processa nada. Somente joga o contexto a ser processado numa queue que será processada numa segunda Thread em ProcessInputQueue.

Os métodos a seguir estão na classe InputStreamContext. Confesso que as responsabilidades das classes não estão tão claras como eu gostaria. Ainda cabe bastante refatoração aqui.


		public void ProcessHeaderAfterRead()
		{
			this.headerRead = true;
			this.remainingBytes = BitConverter.ToInt64(this.Header, 0);
		}

		public void ProcessChunkAfterRead(int bytesRead)
		{
			this.remainingBytes -= bytesRead;
			this.RequestStream.Write(this.Buffer, 0, bytesRead);
		}

Basicamente estes métodos inicializam as propriedades internas baseados no conteúdo do buffer recebido. O “AfterRead” em seus nomes são propositais que são atualizações que só são realizadas após a leitura ter sido finalizada (callback do BeginRead).

A seguir, explicamos como funciona o processamento do request. Ao receber a conexão, o método da classe ConnectionContext StartProcessingInputQueue é chamado. Este método na verdade somente inicia uma segunda thread que fica monitorando a input queue:


		public void StartProcessingInputQueue()
		{
			inputQueueThread = new Thread(ProcessInputQueue);
			inputQueueThread.Start();
		}

		public void ProcessInputQueue()
		{
			while (true)
			{
				InputStreamContext ctx = null;
				lock (this)
				{
					if (inputQueue.Count > 0)
						ctx = inputQueue.Dequeue();
				}

				if (ctx == null)
				{
					Thread.Sleep(250);
					continue;
				}

				OutputStreamContext outputContext = new OutputStreamContext(this);
				if (ctx.EmptyResponse)
				{
					outputContext.EmptyResponse = true;
					byte[] buffer = BitConverter.GetBytes((long)0);
					ClientStream.BeginWrite(buffer, 0, buffer.Length, tcpServer.BeginWriteCallback, outputContext);
				}
				else
				{
					ctx.RequestStream.Seek(0, SeekOrigin.Begin);
					streamUtil.ProcessClientBigRequest(connString, ctx.RequestStream, outputContext.OutputStream, false, null);
					outputContext.OutputStream.Seek(0, SeekOrigin.Begin);

					byte[] buffer = BitConverter.GetBytes(outputContext.OutputStream.Length);
					ClientStream.BeginWrite(buffer, 0, buffer.Length, tcpServer.BeginWriteCallback, outputContext);

					outputContext.WriteCompleted.WaitOne();
				}	
			}
		}

Esta segunda thread tem um comportamento muito similar à thread do cliente que monitora a input queue. O que vale ressaltar aqui é o “WriteCompleted” que é mais um manual reset event. Isso gera um efeito muito divertido pois como as respostas são devolvidas em pedaços de forma assíncrona. Nada impede que dois requests comecem a ser devolvidos simultaneamente para o cliente, por isso esse reset event, espera um aviso do BeginWriteCallback para saber quando pode começar a descarregar o segundo request.

A ausência deste controle gera um efeito interessante, pois invés de mandar o primeiro pedaço do primeiro request, depois o segundo pedaço do primeiro request e assim sucessivamente, o servidor acaba mandando o primeiro pedaço do primeiro request, o primeiro pedaço do segundo request, depois o segundo pedaço do primeiro request e assim sucessivamente. Aqui é um problema e também uma oportunidade de otimização. Acredito que se criar o cliente preparado para receber mais de um request quase que simultaneamente haverá um maior consumo de memória, porém melhor aproveitamento da CPU (menos espera) por parte do servidor. Divertido, né?

A lógica de descarregamento através do BeginWrite é a mesma. A única confusão é que na tentativa de isolar melhor as responsabilidades das classes (tarefa que ainda não conclui), o callback ficou na classe TcpTestServer2 e o processamento da fila na ConnectionContext.

Aqui segue o código de BeginWriteCallback:


		public void BeginWriteCallback(IAsyncResult result)
		{
			OutputStreamContext ctx = result.AsyncState as OutputStreamContext;
			ctx.ConnectionContext.ClientStream.EndWrite(result);

			if (ctx.EmptyResponse)
				return;

			byte[] buffer = new byte[bufferSize];
			int bytesToWrite = ctx.OutputStream.Read(buffer, 0, buffer.Length);

			if (bytesToWrite > 0)
			{
				ctx.ConnectionContext.ClientStream.BeginWrite(buffer, 0, bytesToWrite, BeginWriteCallback, ctx);
			}
			else
			{
				Log.LogMessage("Finished writing ");
				ctx.WriteCompleted.Set();
			}
		}
	}

A idéia é a mesma. Enquanto não chega no fim da stream, manda um pedaço. Quando chega, avisa o método ProcessInputQueue que ele pode descarregar a próxima stream (utilizando o WriteCompleted para fazer a sincronização).

Conclusão

Agora é a parte divertida. Segue o comparativo desta abordagem contra as demais estudadas até aqui:

Como vemos, meu primeiro objetivo que era quebrar o número do Websphere MQ foi atingido. Conseguimos com esta abordagem transferir 1 milhão de registros em 355,43 segundos ou 5,92 minutos.

A abordagem utilizada para a medição foi a mesma, utilizando o mesmo volume de dados e as mesmas máquinas. 10 execuções do novo método foram executadas e o número apresentado é a média dos 10.

Os ponto que acho importante mencionar é que a abordagem é válida, porém a implementação ainda precisa ser melhorada, principalmente no que tange a recuperação de falhas. Como a comunicação é toda assíncrona é preciso estabelecer um mecanismo mais eficiente de conversa entre cliente e servidor para conseguir continuar de onde parou no caso de uma quebra de conexão.

Em relação aos outros métodos para 1 milhão de registors, temos o seguinte comparativo:

Método Tempo em segundos (1 milhão de registros) % Diferença absoluta
Tcp Client 2 (multi thread) 355,43
Websphere MQ 417,36 17,42 61,93
Http Request 428,95 20,68 73,52
File Transfer (extração simples) 495,27 39,34 139,84
File Transfer (request/response) 657,09 84,87 301,66
Tcp Client (multi thread) 671,37 88,89 315,94
Net TCP (lotes) 788,20 121,76 432,77
MSMQ 836,12 135,24 480,69
SOAP (lotes) 1059,69 198,14 704,26
TCP Client (single thread) 1079,72 203,78 724,29

O novo servidor foi ~17% mais rápido do que a abordagem utilizando Websphere MQ.

Peço desculpas pelo post meio “work in progress”, mas achei melhor publicar assim mesmo do que esperar mais alguns meses pra finalizar toda a idéia.

Código fonte

O código fonte ainda está bem bagunçado, mas está no GitHub: https://github.com/ericlemes/IntegrationTests.

Integrações entre Sistemas – Parte 9 – Estatísticas

Objetivo

Após algum tempo criando estes exercícios de diferentes métodos de integração, resolvi executar uma bateria de testes em todos eles para tentar chegar num comparativo de tempos e prós e contras de cada um deles.

Foram alguns dias, de máquinas superaquecendo e desligando, ajustes nos códigos criados para conseguir chegar nestes resultados. Espero que gostem.

O Método

Para chegar nestes resultados, executei os exemplos criados nas partes 1 a 8 com diferentes quantidades de dados: 20.000, 50.000, 500.000 e 1 milhão de registros. Para eliminar o problema de cache de bancos de dados e outras pequenas variações que podem acontecer nesse tipo de teste, executei cada um deles 10 vezes e tirei uma média dos tempos.

Em todos os métodos sempre foi utilizada a mesma massa de dados e as mesmas máquinas, somente para tentarmos observar a diferença de tempos de cada um dos métodos.

Demorou algo em torno de 4 dias pra conseguir rodar tudo, mas valeu a pena.

Tempos de execução

Separei os resultados em 2 gráficos para simplificar a visualização e comparação dos resultados:

O método File Transfer (extração simples) está no gráfico somente para referência. Ele não possui a abordagem request/response necessária para o cenário que estamos exercitando. Somente coloquei ele nos testes para termos uma base da diferença de uma extração simples para um request/response. Percebemos que a diferença dá em torno de 35%.

Outra comparação interessante está no método de Tcp Client. A abordagem single thread é de 55 a 80% mais lenta que a abordagem multi thread.

Mais um número assustador é a comparação Websphere MQ x MSMQ. Usei filas transacionais no MSMQ e filas persistentes no Websphere MQ. O MSMQ ficou de 85 a 100% mais lento que o Websphere MQ. Neste caso, prefiro acreditar que minha implementação está ruim. Pretendo investigar mais. Outra coisa que pode ter dado diferença neste caso é que o Websphere MQ utiliza-se de uma segunda thread para processar.

Outra comparação clássica é net.tcp x SOAP. No cenário de requests pequenos (sem trabalhar com pequenos lotes), a diferença é gigante. No caso da separação por pequenos lotes de 1000, o SOAP mostrou-se de 26 a 34% mais lento.

Para compararmos os métodos mais rápidos, vou separar a comparação por volumes, começando por 20.000 registros:

Método Tempo em segundos (20K registros) % Diferença absoluta
Websphere MQ 8,92
Http Request 9,93 11,39 1,02
Tcp Client (multi thread) 10,68 19,80 1,77
File Transfer (extração simples) 14,72 65,06 5,80
MSMQ 15,94 78,80 7,03
Net TCP (lotes) 17,95 101,29 9,03
File Transfer (request/response) 19,26 115,98 10,34
SOAP (lotes) 22,40 151,26 13,49
TCP Client (single thread) 27,80 211,83 18,89

O percentual representa o quanto o método em questão é mais lento em relação ao mais rápido, por exemplo, o cenário de File Transfer (request/response) se mostrou 115% mais lento que o cenário usando Websphere MQ (mais que o dobro do tempo).

Agora os resultados para 50.000 registros:

Método Tempo em segundos (50K registros) % Diferença absoluta
Http Request 22,02
File Transfer (extração simples) 23,35 6,04 1,33
Websphere MQ 24,61 11,76 2,59
Tcp Client (multi thread) 28,05 27,36 6,03
File Transfer (request/response) 32,63 48,18 10,61
Net TCP (lotes) 41,73 89,49 19,71
MSMQ 45,59 107,02 23,57
TCP Client (single thread) 52,32 137,60 30,30
SOAP (lotes) 52,79 139,71 30,77

Novamente vemos que conforme o volume aumenta, alguns métodos melhoram de desempenho e outros pioram. O aumento dos registros também faz com que a diferença percentual entre os métodos fique menos significativa, mas a diferença absoluta de alguma forma significativa. Por exemplo, a diferença em segundos de file transfer para http é de 10s para 50.000 registros.

Resultados para 500.000 registros:

Método Tempo em segundos (500K registros) % Diferença absoluta
Http Request 230,19
Websphere MQ 233,63 1,50 3,44
File Transfer (extração simples) 250,76 8,94 20,57
File Transfer (request/response) 347,30 50,88 117,12
Tcp Client (multi thread) 351,39 52,65 121,20
Net TCP (lotes) 404,00 75,51 173,82
MSMQ 443,33 92,60 213,15
SOAP (lotes) 533,62 131,82 303,43
TCP Client (single thread) 545,48 136,97 315,29

Agora percebemos que a medida que o volume aumenta o Websphere MQ começa a ter um desempenho melhor, mas surpreendentemente apesar do consumo absurdo de memória e gerando muita paginação no sistema operacional, o método Http Request ainda funciona muito bem. Acredito que com muita concorrência ele não desempenhará tão bem assim devido ao alto consumo de memória.

Por último, os resultados para 1 milhão de registros:

Método Tempo em segundos (1 milhão de registros) % Diferença absoluta
Websphere MQ 417,36
Http Request 428,95 2,78 11,59
File Transfer (extração simples) 495,27 18,67 77,91
File Transfer (request/response) 657,09 57,44 239,73
Tcp Client (multi thread) 671,37 60,86 254,01
Net TCP (lotes) 788,20 88,85 370,84
MSMQ 836,12 100,34 418,76
SOAP (lotes) 1059,69 153,90 642,33
TCP Client (single thread) 1079,72 158,70 662,36

Agora, com alto volume, percebemos que o Websphere MQ atinge o topo novamente, ficando novamente em 2º lugar o Http Request e o método até então “preferido” pela maioria, File Transfer fica em 3º lugar. Eu já suspeitava disso, mas ele ficou em 3º.

File Transfer

Acredito que file transfer ainda seja o método mais utilizado para transferência de grandes massas de informação entre 2 sistemas. Para muitos, ainda é o método mais rápido. Com os resultados obtidos através destes testes (usando 1 milhão de registros como base de comparação) e mantendo somente os métodos mais populares, chegamos nas seguintes diferenças:

Método Tempo em segundos (1 milhão registros) % Diferença absoluta (s) Diferança absoluta (min)
File Transfer (request/response) 657,09
Websphere MQ 417,36 -36,48 -239,73 -4,00
Net TCP (lotes) 788,20 19,95 131,11 2,19
SOAP (lotes) 1059,69 61,27 402,61 6,71

Essa tabela compara os métodos sempre em relação a file transfer. Websphere MQ é 36% mais rápido enquanto o SOAP é 61% mais lento que file transfer, mas se compararmos em tempos absolutos, temos uma variação de 6,71 minutos do SOAP para file transfer em 1 milhão de registros!

Será que a diferença de tempo vale todo o trabalho adicionado pelo método de file transfer, desde a sincronização dos arquivos, até a necessidade de customizar a entrega da informação para cada sistema diferente que solicite o arquivo? Na minha opinião não. Se a questão é desempenho, partir para mensageria é uma idéia melhor. Se a questão é velocidade de desenvolvimento, ficaria com net.tcp ou SOAP.

Prós e contras

Esse tipo de comparativo é muito ruim e subjetivo, mas resolvi montar alguns critérios, para analisarmos cada um dos métodos sob diferentes perspectivas:

Critério Websphere MQ Http Request File Transfer Tcp Client Net TCP SOAP MSMQ
Desempenho Bom Bom Regular Regular Regular Ruim Ruim
Facilidade de implementação Regular Regular Ruim Ruim Bom Bom Regular
Aderência a SOA Bom Ruim Ruim Ruim Ruim Bom Bom
Necessidade de parse de conteúdo Ruim Ruim Ruim Ruim Bom Bom Ruim
Abrangência de plataformas (baixa plataforma) Bom Bom Bom Bom Ruim Bom Ruim
Abrangência de plataformas (considerando alta plataforma) Bom Ruim Regular Regular Ruim Ruim Ruim
Acoplamento entre origem e destino Regular Ruim Ruim Ruim Bom Bom Regular
Custo com licenças de uso Ruim Bom Bom Bom Bom Bom Bom
Aderência a padrões de mercado (conteúdo da mensagem) Ruim Ruim Ruim Ruim Ruim Bom Ruim
Dependência de disponibilidade do sistema destino Bom Ruim Bom Ruim Ruim Ruim Bom
  • Desempenho: Este critério é baseado nos tempos aqui discutidos
  • Facilidade de implementação: Neste critério, estou considerando complexidades mais da mecânica de transferência do que de formato de arquivos. No Websphere MQ por exemplo, é necessário tratar a comunicação de forma assícrona, que adiciona alguma complexidade. No caso de File Transfer é necessário criar uma mecânica de sinalização para não pergar o arquivo em escrita e em leitura e ainda no caso de TCP, criar todo um protocolo para se estabelecer como as informações serão transferidas, problemas que em SOAP, por exemplo não existem.
  • Aderência a SOA: Todos os métodos aqui tratam comunicação ponto a ponto. Em arquiteturas mais desenvolvidas e numa malha grande de sistemas, sabemos que comunicação ponto a ponto torna o ambiente quase impossível de se gerenciar. No caso de MQ, SOAP é fácil implementar uma ESB na comunicação sem ter que mexer praticamente nada em provedor e consumidor de serviço.
  • Necessidade de parse de conteúdo: Sempre que falamos de parse de conteúdo manualmente, estamos sujeitos a erros. Hoje existem parsers de XML, schemas e diversas ferramentas para isso, mas é sempre uma fonte de erro. Quando falamos em alta plataforma, por exemplo, isso já pode não ser muito trivial. Alguns métodos como SOAP e Net.tcp removem essa complexidade.
  • Abrangência de plataformas (baixa plataforma): Esse critério trata-se da facilidade de utilizar esse método de integração em diferentes plataformas (Unix, Java, etc). Websphere MQ tem implementações para várias plataformas, o que já não é verdade para MSMQ (apenas Windows). File Transfer, HTTP e TCP são comuns em outras plataformas. Net.tcp que é proprietário da Microsoft também é um issue neste caso.
  • Abrangência de plataformas (considerando alta): Alta plataforma é algo extremamente fechado. Websphere MQ é uma excelente solução neste caso pois existe implementação para mainframe. É possível usar file transfer e TCP, mas a complexidade é bastante alta. Em ambos os casos, existe a preocupação na conversão ASCII/EBCDIC.
  • Acoplamento entre origem e destino: Falando de acoplamento, me refiro à necessidade de ambos os lados assumirem uma série de premissas em relação a formato, protocolo, conteúdo, etc. Os únicos métodos que abstraem isso melhor são SOAP e Net.tcp, devido à existencia de um WSDL que evita uma série de problemas em relação à premissas de como o conteúdo será formatado.
  • Custo com licenças de uso: Este critério pesa mais para o Websphere MQ que é um produto da IBM com custo de licenciamento.
  • Aderência a padrões de mercado: Neste critério está contemplada a facilidade de interoperabilidade com outras tecnologias. A única realmente padrão utilizada aqui é o SOAP
  • Dependência de disponibilidade do sistema destino: Este critério trata-se do quanto o sistema origem depende do sistema destino. Em métodos como SOAP, TCP, Http Request, a espera da resposta do sistema é imediata, devido à natureza síncrona da comunicação. Em caso de falha do sistema destino, o sistema origem falha também. Quando falamos de MQ, MSMQ ou File Transfer, uma vez que o arquivo ou mensagem esteja disponível para o sistema destino, o sistema origem não para no caso de falhas. Quando ocorre recuperação das falhas, as respostas são enviadas e o processamento segue, com menor necessidade de intervenção manual.

Código Fonte

O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.

Conclusão

Como podemos observar, integração entre sistemas com alto volume é algo que deve ser pensado e planejado com calma. Em relação a todos os métodos apresentados, a grande supresa pra mim ficou no Websphere MQ. Não esperava que o desempenho fosse tão bom.

Me parece que para um cenário “enterprise”, com várias plataformas, sistemas, a associação do MQ como transporte com um middleware como barramento (ESB) é a solução para a grande maioria dos problemas de integração, com bom desempenho e baixo acoplamento.

Mesmo o padrão SOAP apresentando várias vantagens, o desempenho pode ser um ponto importante em muitos casos. No trade-off do desempenho do MQ contra a simplicidade de desenvolvimento do SOAP, a princípio fico com o desempenho do MQ na maioria dos casos.