Mês: março 2013

Integrações entre Sistemas – Parte 13 – Rabbit MQ

Objetivo

Durante minhas pesquisas, tomei conhecimento do Rabbit MQ (http://www.rabbitmq.com/) que trata-se de uma plataforma open-source para mensageria, disponível em diversos sistemas operacionais (Windows, Linux/Unix, Mac OS X e Amazon EC2).

Resolvi conhecê-lo e aproveitei para também incluí-lo junto aos demais benchmarks.

Instalação

Utilizei o Rabbit MQ em plataforma Windows. O processo de instalação dele não tem grande segredo. A única curiosidade é que por ele ter sido escrito em Erlang (http://erlang.org/), o runtime do Erlang precisa ser instalado.

Outro componente interessante de ser instalado (você vai precisar dele para rodar os exemplos) é o cliente .NET para o RabbitMQ. Você o encontra na página de downloads do Rabbit MQ. A documentação da API pode ser encontrada na página de documentação do Rabbit MQ. As vezes eu me surpreendo com a simplicidade e a organização de projetos open source.

Operação

O Rabbit MQ é relativamente simples. Não explorei todas as suas features, apenas o necessário para executar este teste/exemplo.

Existe um “Command Prompt” do Rabbit MQ (instalado junto com o servidor).

Alguns comandos úteis:

rabbitmqctl status

Mostra uma série de indicadores sobre o status do servidor.

rabbitmqctl environment

Mostra uma série de informações sobre o ambiente do servidor, entre elas uma das mais úteis que é a porta tcp que está sendo ouvida (default 5672).

rabbitmqctl list_queues

Lista as filas criadas.

rabbitmq-plugins enable rabbitmq_management

Habilita o plugin de gerenciamento do RabbitMQ. Uma vez que a instalação do plugin é realizada, basta reiniciar o servidor e acessá-lo através da URL http://localhost:15672/. Não esqueça de trocar localhost pelo IP ou hostname do seu servidor. O usuário default é guest/guest.

Cliente Rabbit MQ

A única coisa que fiz aqui foi pegar minha implementação de Websphere MQ e trocar as chamadas para o Rabbit MQ. Sim, eu poderia ter criado uma abstração mas como meu objetivo aqui é apenas entender como funciona e realizar alguns benchmarks, resolvi ser objetivo.

A implementação do cliente é muito simples. O método principal que executa a Task MSBuild segue:

		public override bool Execute()
		{
			Stopwatch watch = new Stopwatch();
			watch.Start();

			Log.LogMessage("Starting Rabbit MQ transfer with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");

			ConnectionFactory factory = new ConnectionFactory();
			factory.HostName = HostName;
			IConnection conn = factory.CreateConnection();
			IModel channel = conn.CreateModel();			
			channel.QueueDeclare(OutputQueueName, false, false, false, null);			
			IBasicProperties props = channel.CreateBasicProperties();
			props.ContentType = "text/xml";
			props.DeliveryMode = 2; //persistent

			inputChannel = conn.CreateModel();			
			inputChannel.QueueDeclare(InputQueueName, false, false, false, null);			

			messageCount = TotalBatches;

			System.Threading.Tasks.Task.Factory.StartNew(ProcessInputQueue);

			int count = 1;
			for (int i = 0; i < TotalBatches; i++)
			{
				MemoryStream ms = new MemoryStream();
				StreamUtil.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
				channel.BasicPublish("", OutputQueueName, props, ms.GetBuffer());

				count += BatchSize;
				Log.LogMessage("Sent " + count.ToString());
			}

			finishedEvent.WaitOne();

			watch.Stop();
			Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");
			conn.Close();

			return true;
		}

ConnectionFactory é o objeto da API que encapsula uma conexão com uma fila. A criação do channel, QueueDeclare seguem o modelinho da API do Rabbit MQ.

O método que efetivamente faz a publicação na fila é o channel.BasicPublish. Existem algumas regras para publicações com tópicos, publish/subscribe, mas para o nosso exemplo, somente o BasicPublish resolve o problema.

Este método também inicia uma Task (que internamente inicia uma segunda thread) para processar a fila de resposta para o cliente, no método ProcessInputQueue. O código dele segue:

		private void ProcessInputQueue()
		{			
			while (true)
			{
				BasicGetResult result = inputChannel.BasicGet(InputQueueName, true);
				if (result == null)
				{
					Thread.Sleep(250);
					continue;
				}
				MemoryStream ms = new MemoryStream(result.Body);
				messageCount--;

				Log.LogMessage("Waiting for more " + messageCount.ToString());

				StreamUtil.ImportarStream(ConnString, ms);

				if (messageCount <= 0)				
					break;									
			}

			finishedEvent.Set();
		}

Mesma dinâmica do Websphere MQ. Aqui fico num loop infinito, pegando as mensagens. Sempre que não tem nada a processar (result == null), dou uma pequena dormidinha na thread somente para não matar a CPU da máquina. Pega a resposta e processa. Sem grandes segredos.

Servidor RabbitMQ

O servidor segue a mesma dinâmica:

		public override bool Execute()
		{
			Log.LogMessage("Starting RabbitMQ Server");

			ConnectionFactory factory = new ConnectionFactory();
			factory.HostName = "localhost";
			IConnection conn = factory.CreateConnection();

			IModel inputChannel = conn.CreateModel();
			inputChannel.QueueDeclare(InputQueueName, false, false, false, null);

			IConnection conn2 = factory.CreateConnection();			

			IBasicProperties props = inputChannel.CreateBasicProperties();
			props.ContentType = "text/xml";
			props.DeliveryMode = 2; //persistent

			IModel outputChannel = conn2.CreateModel();			
			outputChannel.QueueDeclare(OutputQueueName, false, false, false, null);			

			int count = 0;

			while (true)
			{
				BasicGetResult result = inputChannel.BasicGet(InputQueueName, true);
				if (result == null)
				{
					Thread.Sleep(250);
					continue;
				}

				MemoryStream ms = new MemoryStream(result.Body);
				result = null; // let GC work.
				MemoryStream respStream = new MemoryStream();

				StreamUtil.ProcessClientBigRequest(ConnString, ms, respStream, false, null);

				outputChannel.BasicPublish("", OutputQueueName, props, respStream.GetBuffer());
				respStream = null; //let GC work

				count++;

				Log.LogMessage("Processed " + count.ToString());

			}

Comparativo dos resultados

Como nem tudo são rosas, a máquina que estava usando como servidor resolveu morrer e tive que reinstalar tudo novamente. Por tabela, perdi a confiança nos números que tinha executado anteriormente e acabei reexecutando todos os testes no novo ambiente, com o objetivo de manter a premissa de “mesmo payload, mesmo ambiente” que utilizei desde o início dessa série.

Mesmo assim, continuei executando 10 vezes cada teste com cada um dos volumes (20k, 50k, 500k, 1M) e o número apresentado aqui é a média dos 10, para evitar diferenças que ocorrem numa ou outra execução devido a cache de banco, paginação de memória, garbage collector, etc.

Os números são:

integracoes1

integracoes2

Como os números mostram, o desempenho do RabbitMQ não foi satisfatório. Fiquei bastante frustrado com esse resultado. Apesar de toda a simplicidade da implementação, da instalação, etc., o número ficou praticamente a mesma coisa do que usar Web Services em SOAP.

Além disso, apesar de todo o trabalho para montar um novo ambiente (que mudou de 32 para 64 bits na máquina servidora), houve uma piora significativa dos números do File Transfer. Vou investigar mais a fundo as razões desta piora dado o novo ambiente instalado.

Código fonte

O código fonte das implementações acima foi atualizado no Git Hub: https://github.com/ericlemes/IntegrationTests

Conclusão

Eu vi várias pessoas recomendando o RabbitMQ, o que me motivou a realizar esta pesquisa comparando também com o MSMQ e Websphere MQ. Acho que ainda vale a pena uma investigação mais aprofundada de como otimizar o RabbitMQ antes de simplesmente descartá-lo. Este trabalho é bastante superficial.

Mas acho que a principal conclusão que eu chego é que antes de sair utilizando uma nova tecnologia, vale a pena buscar informações sobre ela, testar, brincar, comparar. Baseado nos números aqui obtidos, numa implementação entre plataformas distintas (java/.NET por exemplo), ficaria com SOAP e numa implementação apenas em plataforma Windows, com o MSMQ.

Reorganizando conteúdo

De uns tempos pra cá, percebi que nem eu encontrava mais o conteúdo no meu blog sem dar um search no blog ou no google e fiquei um pouco incomodado com isso. Resolvi escrever um índice para ajudar a encontrar o conteúdo um pouco mais atemporal.

Acabei construindo esse índice como uma página que ficará fixa no blog, no link: https://ericlemes.com/indice/

O interessante deste processo de criar o índice é que nem eu tinha percebido quanta coisa acabei escrevendo nesse tempo todo (acho que comecei em 2009). Algumas coisas foram escritas para ajudar desenvolvedores do time que estavam começando, outras para compartilhar pensamentos, documentar descobertas, realizar pequenos benchmarks, mas o mais interessante é lembrar do começo de tudo, quando resolvi investir tempo no blog.

Naquela oportunidade sofria um pouco com a sensação de falta de conclusão das atividades. Aquele sentimento de que os projetos não terminavam e eu queria fazer coisas que tivessem começo, meio e fim. Cada post é assim. Nasce com uma idéia, uma pesquisa, as vezes fica muito maior do que eu pensava no começo, mas sempre que eu publico e depois recebo os feedbacks acabo tendo essa sensação legal de trabalho concluído.

Bom, mais um post concluído. Era isso.