Integrações entre Sistemas – Parte 8 – Websphere MQ

Objetivo

Nesta parte veremos como realizar integração utilizando Websphere MQ. A lógica é muito parecida com a do MSMQ, mudam poucas coisas na implementação. O complicadinho mesmo é se acostumar com como fazer o setup do Websphere MQ e configurar os canais.

Setup

Para realizar estes testes, utilizei o Websphere MQ versão 7.1. É possível baixá-lo para avaliação gratuitamente direto no site da IBM.

Para realizar os testes, utilizei filas persistentes e dois canais entre a máquina cliente e a máquina servidora (um de entrada e um de saída em cada uma das máquinas). Não vou abordar aqui o passo a passo de como criar e configurar os canais, já que o objetivo deste post não é detalhar as possíveis configurações do Websphere MQ.

Servidor

Para a implementação do Websphere MQ utilizei um assembly separado, pois o Websphere MQ necessita das classes da IBM implementadas no assembly amqmdnet.dll.

Segue o código do servidor:

        public override bool Execute()
        {
            Log.LogMessage("Starting Websphere MQ Server");

            IBM.WMQ.MQQueueManager queueManager = new IBM.WMQ.MQQueueManager(QueueManagerName);
            IBM.WMQ.MQQueue queue = queueManager.AccessQueue(InputQueueName, IBM.WMQ.MQC.MQOO_INPUT_SHARED);

            StreamUtil u = new StreamUtil();
            int count = 0;

            while (true)
            {
                try
                {
                    IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();
                    queue.Get(msg);

                    MemoryStream ms = new MemoryStream();
                    MemoryStream respStream = new MemoryStream();

                    MQUtil.MQMessageToStream(msg, ms);
                    msg.ClearMessage();

                    u.ProcessClientBigRequest(ConnString, ms, respStream, false, null);

                    MQUtil.StreamToMQMessage(respStream, msg);
                    queueManager.Put(OutputQueueName, msg);
                    queueManager.Commit();

                    msg.ClearMessage();

                    count++;

                    Log.LogMessage("Processed " + count.ToString());                    
                }
                catch (IBM.WMQ.MQException ex)
                {
                    if (ex.ReasonCode != IBM.WMQ.MQC.MQRC_NO_MSG_AVAILABLE)
                        throw;
                    else
                        Thread.Sleep(50);
                }
            }
        }

O servidor não tem muito segredo. Ele abre a fila de entrada dele e em seguida converte a mensagem de entrada para um MemoryStream. A implementação do Websphere MQ não tem isso pronto, por isso foi necessária uma classe auxiliar para fazer essa conversão. Este ponto poderia ser otimizado, fazendo com que a stream não fosse duplicada. Fazendo uma implementação da classe Stream que saiba entregar neste formato o conteúdo de uma MQMessage resolveria o problema.

Uma vez convertida a entrada, o método ProcessClientBigRequest é usado na entrada e o resultado também é convertido de Stream para mensagem para que possa ser jogado na fila.

O código da classe MQUtil segue:

    public static class MQUtil
    {
        public static void StreamToMQMessage(Stream stream, IBM.WMQ.MQMessage msg)
        {
            stream.Seek(0, SeekOrigin.Begin);
            byte[] buffer = new byte[100 * 1024];

            int read = stream.Read(buffer, 0, buffer.Length);
            while (read > 0)
            {
                msg.Write(buffer, 0, read);
                read = stream.Read(buffer, 0, buffer.Length);
            }
        }

        public static void MQMessageToStream(IBM.WMQ.MQMessage msg, Stream stream)
        {
            byte[] buffer = new byte[100 * 1024];
            int bytesToRead = msg.DataLength > buffer.Length ? buffer.Length : msg.DataLength;
            buffer = msg.ReadBytes(bytesToRead);            
            stream.Write(buffer, 0, bytesToRead);

            while (msg.DataLength > 0)
            {
                bytesToRead = msg.DataLength > buffer.Length ? buffer.Length : msg.DataLength;
                buffer = msg.ReadBytes(bytesToRead);                
                stream.Write(buffer, 0, bytesToRead);
            }
            stream.Seek(0, SeekOrigin.Begin);
        }

Aqui também, apesar de parecer uma implementação complexa, é bastante simples. A idéia é percorrer a mensagem, lendo em buffers de 100Kb e jogando numa memory stream, nos dois sentidos. A implementação da interface da IBM é um pouco diferente das que estamos acostumados no mundo Microsoft, mas nada que seja muito distante.

Cliente

O código do cliente também não é muito complexo. A grande diferença da implementação para o MSMQ é que não existe um evento de “chegou uma mensagem” na implementação da IBM. Para isso, criei uma thread que fica “checando” a fila e dando algumas dormidinhas caso não ache nada, só para evitar que a CPU bata 100%.

        public override bool Execute()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            Log.LogMessage("Starting Websphere MQ transfer with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");            

            queueManager = new IBM.WMQ.MQQueueManager(QueueManagerName);
            messageCount = TotalBatches;

            inputQueue = queueManager.AccessQueue(InputQueueName, IBM.WMQ.MQC.MQOO_INPUT_SHARED);
            Thread t = new Thread(ProcessMQQueue);
            t.Start();            

            int count = 1;
            for (int i = 0; i < TotalBatches; i++)
            {
                IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();

                MemoryStream ms = new MemoryStream();
                StreamUtil u = new StreamUtil();
                u.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
                MQUtil.StreamToMQMessage(ms, msg);
                queueManager.Put(OutputQueueName, msg);
                count += BatchSize;
                Log.LogMessage("Sent " + count.ToString());                
            }

            while (!finished)
                Thread.Sleep(250);

            watch.Stop();
            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

            return true;
        }

        private void ProcessMQQueue()
        {
            StreamUtil u = new StreamUtil();
            bool keepListening = true;
            while (keepListening)
            {
                try
                {
                    IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();
                    inputQueue.Get(msg);
                    queueManager.Commit();
                    messageCount--;

                    MemoryStream ms = new MemoryStream();
                    MQUtil.MQMessageToStream(msg, ms);                    
                    msg.ClearMessage();

                    Log.LogMessage("Waiting for more " + messageCount.ToString());                    

                    u.ImportarStream(ConnString, ms);

                    if (messageCount <= 0)
                    {
                        keepListening = false;
                        finished = true;
                    }
                }
                catch (IBM.WMQ.MQException exception)
                {
                    if (exception.ReasonCode != IBM.WMQ.MQC.MQRC_NO_MSG_AVAILABLE)
                        throw;
                    else
                        Thread.Sleep(100);
                }

            }
        }

Código fonte

O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.

Conclusão

Nos números, a implementação com Websphere MQ ficou bem melhor que a de MSMQ. Na implementação inicial de MSMQ sem filas transacionais, o Websphere MQ ficou mais lento. Alguns fatores que podemos observar é o uso de filas persistentes, canais e também a duplicidade da stream de memória que pode ser otimizada. Não acho que a diferença tenha sido significativa.

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s