Objetivo
Nesta parte veremos como realizar integração utilizando Websphere MQ. A lógica é muito parecida com a do MSMQ, mudam poucas coisas na implementação. O complicadinho mesmo é se acostumar com como fazer o setup do Websphere MQ e configurar os canais.
Setup
Para realizar estes testes, utilizei o Websphere MQ versão 7.1. É possível baixá-lo para avaliação gratuitamente direto no site da IBM.
Para realizar os testes, utilizei filas persistentes e dois canais entre a máquina cliente e a máquina servidora (um de entrada e um de saída em cada uma das máquinas). Não vou abordar aqui o passo a passo de como criar e configurar os canais, já que o objetivo deste post não é detalhar as possíveis configurações do Websphere MQ.
Servidor
Para a implementação do Websphere MQ utilizei um assembly separado, pois o Websphere MQ necessita das classes da IBM implementadas no assembly amqmdnet.dll.
Segue o código do servidor:
public override bool Execute()
{
Log.LogMessage("Starting Websphere MQ Server");
IBM.WMQ.MQQueueManager queueManager = new IBM.WMQ.MQQueueManager(QueueManagerName);
IBM.WMQ.MQQueue queue = queueManager.AccessQueue(InputQueueName, IBM.WMQ.MQC.MQOO_INPUT_SHARED);
StreamUtil u = new StreamUtil();
int count = 0;
while (true)
{
try
{
IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();
queue.Get(msg);
MemoryStream ms = new MemoryStream();
MemoryStream respStream = new MemoryStream();
MQUtil.MQMessageToStream(msg, ms);
msg.ClearMessage();
u.ProcessClientBigRequest(ConnString, ms, respStream, false, null);
MQUtil.StreamToMQMessage(respStream, msg);
queueManager.Put(OutputQueueName, msg);
queueManager.Commit();
msg.ClearMessage();
count++;
Log.LogMessage("Processed " + count.ToString());
}
catch (IBM.WMQ.MQException ex)
{
if (ex.ReasonCode != IBM.WMQ.MQC.MQRC_NO_MSG_AVAILABLE)
throw;
else
Thread.Sleep(50);
}
}
}
O servidor não tem muito segredo. Ele abre a fila de entrada dele e em seguida converte a mensagem de entrada para um MemoryStream. A implementação do Websphere MQ não tem isso pronto, por isso foi necessária uma classe auxiliar para fazer essa conversão. Este ponto poderia ser otimizado, fazendo com que a stream não fosse duplicada. Fazendo uma implementação da classe Stream que saiba entregar neste formato o conteúdo de uma MQMessage resolveria o problema.
Uma vez convertida a entrada, o método ProcessClientBigRequest é usado na entrada e o resultado também é convertido de Stream para mensagem para que possa ser jogado na fila.
O código da classe MQUtil segue:
public static class MQUtil
{
public static void StreamToMQMessage(Stream stream, IBM.WMQ.MQMessage msg)
{
stream.Seek(0, SeekOrigin.Begin);
byte[] buffer = new byte[100 * 1024];
int read = stream.Read(buffer, 0, buffer.Length);
while (read > 0)
{
msg.Write(buffer, 0, read);
read = stream.Read(buffer, 0, buffer.Length);
}
}
public static void MQMessageToStream(IBM.WMQ.MQMessage msg, Stream stream)
{
byte[] buffer = new byte[100 * 1024];
int bytesToRead = msg.DataLength > buffer.Length ? buffer.Length : msg.DataLength;
buffer = msg.ReadBytes(bytesToRead);
stream.Write(buffer, 0, bytesToRead);
while (msg.DataLength > 0)
{
bytesToRead = msg.DataLength > buffer.Length ? buffer.Length : msg.DataLength;
buffer = msg.ReadBytes(bytesToRead);
stream.Write(buffer, 0, bytesToRead);
}
stream.Seek(0, SeekOrigin.Begin);
}
Aqui também, apesar de parecer uma implementação complexa, é bastante simples. A idéia é percorrer a mensagem, lendo em buffers de 100Kb e jogando numa memory stream, nos dois sentidos. A implementação da interface da IBM é um pouco diferente das que estamos acostumados no mundo Microsoft, mas nada que seja muito distante.
Cliente
O código do cliente também não é muito complexo. A grande diferença da implementação para o MSMQ é que não existe um evento de “chegou uma mensagem” na implementação da IBM. Para isso, criei uma thread que fica “checando” a fila e dando algumas dormidinhas caso não ache nada, só para evitar que a CPU bata 100%.
public override bool Execute()
{
Stopwatch watch = new Stopwatch();
watch.Start();
Log.LogMessage("Starting Websphere MQ transfer with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");
queueManager = new IBM.WMQ.MQQueueManager(QueueManagerName);
messageCount = TotalBatches;
inputQueue = queueManager.AccessQueue(InputQueueName, IBM.WMQ.MQC.MQOO_INPUT_SHARED);
Thread t = new Thread(ProcessMQQueue);
t.Start();
int count = 1;
for (int i = 0; i < TotalBatches; i++)
{
IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();
MemoryStream ms = new MemoryStream();
StreamUtil u = new StreamUtil();
u.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
MQUtil.StreamToMQMessage(ms, msg);
queueManager.Put(OutputQueueName, msg);
count += BatchSize;
Log.LogMessage("Sent " + count.ToString());
}
while (!finished)
Thread.Sleep(250);
watch.Stop();
Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");
return true;
}
private void ProcessMQQueue()
{
StreamUtil u = new StreamUtil();
bool keepListening = true;
while (keepListening)
{
try
{
IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();
inputQueue.Get(msg);
queueManager.Commit();
messageCount--;
MemoryStream ms = new MemoryStream();
MQUtil.MQMessageToStream(msg, ms);
msg.ClearMessage();
Log.LogMessage("Waiting for more " + messageCount.ToString());
u.ImportarStream(ConnString, ms);
if (messageCount <= 0)
{
keepListening = false;
finished = true;
}
}
catch (IBM.WMQ.MQException exception)
{
if (exception.ReasonCode != IBM.WMQ.MQC.MQRC_NO_MSG_AVAILABLE)
throw;
else
Thread.Sleep(100);
}
}
}
Código fonte
O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.
Conclusão
Nos números, a implementação com Websphere MQ ficou bem melhor que a de MSMQ. Na implementação inicial de MSMQ sem filas transacionais, o Websphere MQ ficou mais lento. Alguns fatores que podemos observar é o uso de filas persistentes, canais e também a duplicidade da stream de memória que pode ser otimizada. Não acho que a diferença tenha sido significativa.