Objetivo
Nesta parte veremos como realizar integração utilizando Websphere MQ. A lógica é muito parecida com a do MSMQ, mudam poucas coisas na implementação. O complicadinho mesmo é se acostumar com como fazer o setup do Websphere MQ e configurar os canais.
Setup
Para realizar estes testes, utilizei o Websphere MQ versão 7.1. É possível baixá-lo para avaliação gratuitamente direto no site da IBM.
Para realizar os testes, utilizei filas persistentes e dois canais entre a máquina cliente e a máquina servidora (um de entrada e um de saída em cada uma das máquinas). Não vou abordar aqui o passo a passo de como criar e configurar os canais, já que o objetivo deste post não é detalhar as possíveis configurações do Websphere MQ.
Servidor
Para a implementação do Websphere MQ utilizei um assembly separado, pois o Websphere MQ necessita das classes da IBM implementadas no assembly amqmdnet.dll.
Segue o código do servidor:
public override bool Execute() { Log.LogMessage("Starting Websphere MQ Server"); IBM.WMQ.MQQueueManager queueManager = new IBM.WMQ.MQQueueManager(QueueManagerName); IBM.WMQ.MQQueue queue = queueManager.AccessQueue(InputQueueName, IBM.WMQ.MQC.MQOO_INPUT_SHARED); StreamUtil u = new StreamUtil(); int count = 0; while (true) { try { IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage(); queue.Get(msg); MemoryStream ms = new MemoryStream(); MemoryStream respStream = new MemoryStream(); MQUtil.MQMessageToStream(msg, ms); msg.ClearMessage(); u.ProcessClientBigRequest(ConnString, ms, respStream, false, null); MQUtil.StreamToMQMessage(respStream, msg); queueManager.Put(OutputQueueName, msg); queueManager.Commit(); msg.ClearMessage(); count++; Log.LogMessage("Processed " + count.ToString()); } catch (IBM.WMQ.MQException ex) { if (ex.ReasonCode != IBM.WMQ.MQC.MQRC_NO_MSG_AVAILABLE) throw; else Thread.Sleep(50); } } }
O servidor não tem muito segredo. Ele abre a fila de entrada dele e em seguida converte a mensagem de entrada para um MemoryStream. A implementação do Websphere MQ não tem isso pronto, por isso foi necessária uma classe auxiliar para fazer essa conversão. Este ponto poderia ser otimizado, fazendo com que a stream não fosse duplicada. Fazendo uma implementação da classe Stream que saiba entregar neste formato o conteúdo de uma MQMessage resolveria o problema.
Uma vez convertida a entrada, o método ProcessClientBigRequest é usado na entrada e o resultado também é convertido de Stream para mensagem para que possa ser jogado na fila.
O código da classe MQUtil segue:
public static class MQUtil { public static void StreamToMQMessage(Stream stream, IBM.WMQ.MQMessage msg) { stream.Seek(0, SeekOrigin.Begin); byte[] buffer = new byte[100 * 1024]; int read = stream.Read(buffer, 0, buffer.Length); while (read > 0) { msg.Write(buffer, 0, read); read = stream.Read(buffer, 0, buffer.Length); } } public static void MQMessageToStream(IBM.WMQ.MQMessage msg, Stream stream) { byte[] buffer = new byte[100 * 1024]; int bytesToRead = msg.DataLength > buffer.Length ? buffer.Length : msg.DataLength; buffer = msg.ReadBytes(bytesToRead); stream.Write(buffer, 0, bytesToRead); while (msg.DataLength > 0) { bytesToRead = msg.DataLength > buffer.Length ? buffer.Length : msg.DataLength; buffer = msg.ReadBytes(bytesToRead); stream.Write(buffer, 0, bytesToRead); } stream.Seek(0, SeekOrigin.Begin); }
Aqui também, apesar de parecer uma implementação complexa, é bastante simples. A idéia é percorrer a mensagem, lendo em buffers de 100Kb e jogando numa memory stream, nos dois sentidos. A implementação da interface da IBM é um pouco diferente das que estamos acostumados no mundo Microsoft, mas nada que seja muito distante.
Cliente
O código do cliente também não é muito complexo. A grande diferença da implementação para o MSMQ é que não existe um evento de “chegou uma mensagem” na implementação da IBM. Para isso, criei uma thread que fica “checando” a fila e dando algumas dormidinhas caso não ache nada, só para evitar que a CPU bata 100%.
public override bool Execute() { Stopwatch watch = new Stopwatch(); watch.Start(); Log.LogMessage("Starting Websphere MQ transfer with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each"); queueManager = new IBM.WMQ.MQQueueManager(QueueManagerName); messageCount = TotalBatches; inputQueue = queueManager.AccessQueue(InputQueueName, IBM.WMQ.MQC.MQOO_INPUT_SHARED); Thread t = new Thread(ProcessMQQueue); t.Start(); int count = 1; for (int i = 0; i < TotalBatches; i++) { IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage(); MemoryStream ms = new MemoryStream(); StreamUtil u = new StreamUtil(); u.GenerateBigRequest(ms, false, count, count + (BatchSize - 1)); MQUtil.StreamToMQMessage(ms, msg); queueManager.Put(OutputQueueName, msg); count += BatchSize; Log.LogMessage("Sent " + count.ToString()); } while (!finished) Thread.Sleep(250); watch.Stop(); Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds"); return true; } private void ProcessMQQueue() { StreamUtil u = new StreamUtil(); bool keepListening = true; while (keepListening) { try { IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage(); inputQueue.Get(msg); queueManager.Commit(); messageCount--; MemoryStream ms = new MemoryStream(); MQUtil.MQMessageToStream(msg, ms); msg.ClearMessage(); Log.LogMessage("Waiting for more " + messageCount.ToString()); u.ImportarStream(ConnString, ms); if (messageCount <= 0) { keepListening = false; finished = true; } } catch (IBM.WMQ.MQException exception) { if (exception.ReasonCode != IBM.WMQ.MQC.MQRC_NO_MSG_AVAILABLE) throw; else Thread.Sleep(100); } } }
Código fonte
O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.
Conclusão
Nos números, a implementação com Websphere MQ ficou bem melhor que a de MSMQ. Na implementação inicial de MSMQ sem filas transacionais, o Websphere MQ ficou mais lento. Alguns fatores que podemos observar é o uso de filas persistentes, canais e também a duplicidade da stream de memória que pode ser otimizada. Não acho que a diferença tenha sido significativa.