Mês: maio 2012

Integrações entre Sistemas – Parte 9 – Estatísticas

Objetivo

Após algum tempo criando estes exercícios de diferentes métodos de integração, resolvi executar uma bateria de testes em todos eles para tentar chegar num comparativo de tempos e prós e contras de cada um deles.

Foram alguns dias, de máquinas superaquecendo e desligando, ajustes nos códigos criados para conseguir chegar nestes resultados. Espero que gostem.

O Método

Para chegar nestes resultados, executei os exemplos criados nas partes 1 a 8 com diferentes quantidades de dados: 20.000, 50.000, 500.000 e 1 milhão de registros. Para eliminar o problema de cache de bancos de dados e outras pequenas variações que podem acontecer nesse tipo de teste, executei cada um deles 10 vezes e tirei uma média dos tempos.

Em todos os métodos sempre foi utilizada a mesma massa de dados e as mesmas máquinas, somente para tentarmos observar a diferença de tempos de cada um dos métodos.

Demorou algo em torno de 4 dias pra conseguir rodar tudo, mas valeu a pena.

Tempos de execução

Separei os resultados em 2 gráficos para simplificar a visualização e comparação dos resultados:

O método File Transfer (extração simples) está no gráfico somente para referência. Ele não possui a abordagem request/response necessária para o cenário que estamos exercitando. Somente coloquei ele nos testes para termos uma base da diferença de uma extração simples para um request/response. Percebemos que a diferença dá em torno de 35%.

Outra comparação interessante está no método de Tcp Client. A abordagem single thread é de 55 a 80% mais lenta que a abordagem multi thread.

Mais um número assustador é a comparação Websphere MQ x MSMQ. Usei filas transacionais no MSMQ e filas persistentes no Websphere MQ. O MSMQ ficou de 85 a 100% mais lento que o Websphere MQ. Neste caso, prefiro acreditar que minha implementação está ruim. Pretendo investigar mais. Outra coisa que pode ter dado diferença neste caso é que o Websphere MQ utiliza-se de uma segunda thread para processar.

Outra comparação clássica é net.tcp x SOAP. No cenário de requests pequenos (sem trabalhar com pequenos lotes), a diferença é gigante. No caso da separação por pequenos lotes de 1000, o SOAP mostrou-se de 26 a 34% mais lento.

Para compararmos os métodos mais rápidos, vou separar a comparação por volumes, começando por 20.000 registros:

Método Tempo em segundos (20K registros) % Diferença absoluta
Websphere MQ 8,92
Http Request 9,93 11,39 1,02
Tcp Client (multi thread) 10,68 19,80 1,77
File Transfer (extração simples) 14,72 65,06 5,80
MSMQ 15,94 78,80 7,03
Net TCP (lotes) 17,95 101,29 9,03
File Transfer (request/response) 19,26 115,98 10,34
SOAP (lotes) 22,40 151,26 13,49
TCP Client (single thread) 27,80 211,83 18,89

O percentual representa o quanto o método em questão é mais lento em relação ao mais rápido, por exemplo, o cenário de File Transfer (request/response) se mostrou 115% mais lento que o cenário usando Websphere MQ (mais que o dobro do tempo).

Agora os resultados para 50.000 registros:

Método Tempo em segundos (50K registros) % Diferença absoluta
Http Request 22,02
File Transfer (extração simples) 23,35 6,04 1,33
Websphere MQ 24,61 11,76 2,59
Tcp Client (multi thread) 28,05 27,36 6,03
File Transfer (request/response) 32,63 48,18 10,61
Net TCP (lotes) 41,73 89,49 19,71
MSMQ 45,59 107,02 23,57
TCP Client (single thread) 52,32 137,60 30,30
SOAP (lotes) 52,79 139,71 30,77

Novamente vemos que conforme o volume aumenta, alguns métodos melhoram de desempenho e outros pioram. O aumento dos registros também faz com que a diferença percentual entre os métodos fique menos significativa, mas a diferença absoluta de alguma forma significativa. Por exemplo, a diferença em segundos de file transfer para http é de 10s para 50.000 registros.

Resultados para 500.000 registros:

Método Tempo em segundos (500K registros) % Diferença absoluta
Http Request 230,19
Websphere MQ 233,63 1,50 3,44
File Transfer (extração simples) 250,76 8,94 20,57
File Transfer (request/response) 347,30 50,88 117,12
Tcp Client (multi thread) 351,39 52,65 121,20
Net TCP (lotes) 404,00 75,51 173,82
MSMQ 443,33 92,60 213,15
SOAP (lotes) 533,62 131,82 303,43
TCP Client (single thread) 545,48 136,97 315,29

Agora percebemos que a medida que o volume aumenta o Websphere MQ começa a ter um desempenho melhor, mas surpreendentemente apesar do consumo absurdo de memória e gerando muita paginação no sistema operacional, o método Http Request ainda funciona muito bem. Acredito que com muita concorrência ele não desempenhará tão bem assim devido ao alto consumo de memória.

Por último, os resultados para 1 milhão de registros:

Método Tempo em segundos (1 milhão de registros) % Diferença absoluta
Websphere MQ 417,36
Http Request 428,95 2,78 11,59
File Transfer (extração simples) 495,27 18,67 77,91
File Transfer (request/response) 657,09 57,44 239,73
Tcp Client (multi thread) 671,37 60,86 254,01
Net TCP (lotes) 788,20 88,85 370,84
MSMQ 836,12 100,34 418,76
SOAP (lotes) 1059,69 153,90 642,33
TCP Client (single thread) 1079,72 158,70 662,36

Agora, com alto volume, percebemos que o Websphere MQ atinge o topo novamente, ficando novamente em 2º lugar o Http Request e o método até então “preferido” pela maioria, File Transfer fica em 3º lugar. Eu já suspeitava disso, mas ele ficou em 3º.

File Transfer

Acredito que file transfer ainda seja o método mais utilizado para transferência de grandes massas de informação entre 2 sistemas. Para muitos, ainda é o método mais rápido. Com os resultados obtidos através destes testes (usando 1 milhão de registros como base de comparação) e mantendo somente os métodos mais populares, chegamos nas seguintes diferenças:

Método Tempo em segundos (1 milhão registros) % Diferença absoluta (s) Diferança absoluta (min)
File Transfer (request/response) 657,09
Websphere MQ 417,36 -36,48 -239,73 -4,00
Net TCP (lotes) 788,20 19,95 131,11 2,19
SOAP (lotes) 1059,69 61,27 402,61 6,71

Essa tabela compara os métodos sempre em relação a file transfer. Websphere MQ é 36% mais rápido enquanto o SOAP é 61% mais lento que file transfer, mas se compararmos em tempos absolutos, temos uma variação de 6,71 minutos do SOAP para file transfer em 1 milhão de registros!

Será que a diferença de tempo vale todo o trabalho adicionado pelo método de file transfer, desde a sincronização dos arquivos, até a necessidade de customizar a entrega da informação para cada sistema diferente que solicite o arquivo? Na minha opinião não. Se a questão é desempenho, partir para mensageria é uma idéia melhor. Se a questão é velocidade de desenvolvimento, ficaria com net.tcp ou SOAP.

Prós e contras

Esse tipo de comparativo é muito ruim e subjetivo, mas resolvi montar alguns critérios, para analisarmos cada um dos métodos sob diferentes perspectivas:

Critério Websphere MQ Http Request File Transfer Tcp Client Net TCP SOAP MSMQ
Desempenho Bom Bom Regular Regular Regular Ruim Ruim
Facilidade de implementação Regular Regular Ruim Ruim Bom Bom Regular
Aderência a SOA Bom Ruim Ruim Ruim Ruim Bom Bom
Necessidade de parse de conteúdo Ruim Ruim Ruim Ruim Bom Bom Ruim
Abrangência de plataformas (baixa plataforma) Bom Bom Bom Bom Ruim Bom Ruim
Abrangência de plataformas (considerando alta plataforma) Bom Ruim Regular Regular Ruim Ruim Ruim
Acoplamento entre origem e destino Regular Ruim Ruim Ruim Bom Bom Regular
Custo com licenças de uso Ruim Bom Bom Bom Bom Bom Bom
Aderência a padrões de mercado (conteúdo da mensagem) Ruim Ruim Ruim Ruim Ruim Bom Ruim
Dependência de disponibilidade do sistema destino Bom Ruim Bom Ruim Ruim Ruim Bom
  • Desempenho: Este critério é baseado nos tempos aqui discutidos
  • Facilidade de implementação: Neste critério, estou considerando complexidades mais da mecânica de transferência do que de formato de arquivos. No Websphere MQ por exemplo, é necessário tratar a comunicação de forma assícrona, que adiciona alguma complexidade. No caso de File Transfer é necessário criar uma mecânica de sinalização para não pergar o arquivo em escrita e em leitura e ainda no caso de TCP, criar todo um protocolo para se estabelecer como as informações serão transferidas, problemas que em SOAP, por exemplo não existem.
  • Aderência a SOA: Todos os métodos aqui tratam comunicação ponto a ponto. Em arquiteturas mais desenvolvidas e numa malha grande de sistemas, sabemos que comunicação ponto a ponto torna o ambiente quase impossível de se gerenciar. No caso de MQ, SOAP é fácil implementar uma ESB na comunicação sem ter que mexer praticamente nada em provedor e consumidor de serviço.
  • Necessidade de parse de conteúdo: Sempre que falamos de parse de conteúdo manualmente, estamos sujeitos a erros. Hoje existem parsers de XML, schemas e diversas ferramentas para isso, mas é sempre uma fonte de erro. Quando falamos em alta plataforma, por exemplo, isso já pode não ser muito trivial. Alguns métodos como SOAP e Net.tcp removem essa complexidade.
  • Abrangência de plataformas (baixa plataforma): Esse critério trata-se da facilidade de utilizar esse método de integração em diferentes plataformas (Unix, Java, etc). Websphere MQ tem implementações para várias plataformas, o que já não é verdade para MSMQ (apenas Windows). File Transfer, HTTP e TCP são comuns em outras plataformas. Net.tcp que é proprietário da Microsoft também é um issue neste caso.
  • Abrangência de plataformas (considerando alta): Alta plataforma é algo extremamente fechado. Websphere MQ é uma excelente solução neste caso pois existe implementação para mainframe. É possível usar file transfer e TCP, mas a complexidade é bastante alta. Em ambos os casos, existe a preocupação na conversão ASCII/EBCDIC.
  • Acoplamento entre origem e destino: Falando de acoplamento, me refiro à necessidade de ambos os lados assumirem uma série de premissas em relação a formato, protocolo, conteúdo, etc. Os únicos métodos que abstraem isso melhor são SOAP e Net.tcp, devido à existencia de um WSDL que evita uma série de problemas em relação à premissas de como o conteúdo será formatado.
  • Custo com licenças de uso: Este critério pesa mais para o Websphere MQ que é um produto da IBM com custo de licenciamento.
  • Aderência a padrões de mercado: Neste critério está contemplada a facilidade de interoperabilidade com outras tecnologias. A única realmente padrão utilizada aqui é o SOAP
  • Dependência de disponibilidade do sistema destino: Este critério trata-se do quanto o sistema origem depende do sistema destino. Em métodos como SOAP, TCP, Http Request, a espera da resposta do sistema é imediata, devido à natureza síncrona da comunicação. Em caso de falha do sistema destino, o sistema origem falha também. Quando falamos de MQ, MSMQ ou File Transfer, uma vez que o arquivo ou mensagem esteja disponível para o sistema destino, o sistema origem não para no caso de falhas. Quando ocorre recuperação das falhas, as respostas são enviadas e o processamento segue, com menor necessidade de intervenção manual.

Código Fonte

O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.

Conclusão

Como podemos observar, integração entre sistemas com alto volume é algo que deve ser pensado e planejado com calma. Em relação a todos os métodos apresentados, a grande supresa pra mim ficou no Websphere MQ. Não esperava que o desempenho fosse tão bom.

Me parece que para um cenário “enterprise”, com várias plataformas, sistemas, a associação do MQ como transporte com um middleware como barramento (ESB) é a solução para a grande maioria dos problemas de integração, com bom desempenho e baixo acoplamento.

Mesmo o padrão SOAP apresentando várias vantagens, o desempenho pode ser um ponto importante em muitos casos. No trade-off do desempenho do MQ contra a simplicidade de desenvolvimento do SOAP, a princípio fico com o desempenho do MQ na maioria dos casos.

Anúncios

Integrações entre Sistemas – Parte 8 – Websphere MQ

Objetivo

Nesta parte veremos como realizar integração utilizando Websphere MQ. A lógica é muito parecida com a do MSMQ, mudam poucas coisas na implementação. O complicadinho mesmo é se acostumar com como fazer o setup do Websphere MQ e configurar os canais.

Setup

Para realizar estes testes, utilizei o Websphere MQ versão 7.1. É possível baixá-lo para avaliação gratuitamente direto no site da IBM.

Para realizar os testes, utilizei filas persistentes e dois canais entre a máquina cliente e a máquina servidora (um de entrada e um de saída em cada uma das máquinas). Não vou abordar aqui o passo a passo de como criar e configurar os canais, já que o objetivo deste post não é detalhar as possíveis configurações do Websphere MQ.

Servidor

Para a implementação do Websphere MQ utilizei um assembly separado, pois o Websphere MQ necessita das classes da IBM implementadas no assembly amqmdnet.dll.

Segue o código do servidor:

        public override bool Execute()
        {
            Log.LogMessage("Starting Websphere MQ Server");

            IBM.WMQ.MQQueueManager queueManager = new IBM.WMQ.MQQueueManager(QueueManagerName);
            IBM.WMQ.MQQueue queue = queueManager.AccessQueue(InputQueueName, IBM.WMQ.MQC.MQOO_INPUT_SHARED);

            StreamUtil u = new StreamUtil();
            int count = 0;

            while (true)
            {
                try
                {
                    IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();
                    queue.Get(msg);

                    MemoryStream ms = new MemoryStream();
                    MemoryStream respStream = new MemoryStream();

                    MQUtil.MQMessageToStream(msg, ms);
                    msg.ClearMessage();

                    u.ProcessClientBigRequest(ConnString, ms, respStream, false, null);

                    MQUtil.StreamToMQMessage(respStream, msg);
                    queueManager.Put(OutputQueueName, msg);
                    queueManager.Commit();

                    msg.ClearMessage();

                    count++;

                    Log.LogMessage("Processed " + count.ToString());                    
                }
                catch (IBM.WMQ.MQException ex)
                {
                    if (ex.ReasonCode != IBM.WMQ.MQC.MQRC_NO_MSG_AVAILABLE)
                        throw;
                    else
                        Thread.Sleep(50);
                }
            }
        }

O servidor não tem muito segredo. Ele abre a fila de entrada dele e em seguida converte a mensagem de entrada para um MemoryStream. A implementação do Websphere MQ não tem isso pronto, por isso foi necessária uma classe auxiliar para fazer essa conversão. Este ponto poderia ser otimizado, fazendo com que a stream não fosse duplicada. Fazendo uma implementação da classe Stream que saiba entregar neste formato o conteúdo de uma MQMessage resolveria o problema.

Uma vez convertida a entrada, o método ProcessClientBigRequest é usado na entrada e o resultado também é convertido de Stream para mensagem para que possa ser jogado na fila.

O código da classe MQUtil segue:

    public static class MQUtil
    {
        public static void StreamToMQMessage(Stream stream, IBM.WMQ.MQMessage msg)
        {
            stream.Seek(0, SeekOrigin.Begin);
            byte[] buffer = new byte[100 * 1024];

            int read = stream.Read(buffer, 0, buffer.Length);
            while (read > 0)
            {
                msg.Write(buffer, 0, read);
                read = stream.Read(buffer, 0, buffer.Length);
            }
        }

        public static void MQMessageToStream(IBM.WMQ.MQMessage msg, Stream stream)
        {
            byte[] buffer = new byte[100 * 1024];
            int bytesToRead = msg.DataLength > buffer.Length ? buffer.Length : msg.DataLength;
            buffer = msg.ReadBytes(bytesToRead);            
            stream.Write(buffer, 0, bytesToRead);

            while (msg.DataLength > 0)
            {
                bytesToRead = msg.DataLength > buffer.Length ? buffer.Length : msg.DataLength;
                buffer = msg.ReadBytes(bytesToRead);                
                stream.Write(buffer, 0, bytesToRead);
            }
            stream.Seek(0, SeekOrigin.Begin);
        }

Aqui também, apesar de parecer uma implementação complexa, é bastante simples. A idéia é percorrer a mensagem, lendo em buffers de 100Kb e jogando numa memory stream, nos dois sentidos. A implementação da interface da IBM é um pouco diferente das que estamos acostumados no mundo Microsoft, mas nada que seja muito distante.

Cliente

O código do cliente também não é muito complexo. A grande diferença da implementação para o MSMQ é que não existe um evento de “chegou uma mensagem” na implementação da IBM. Para isso, criei uma thread que fica “checando” a fila e dando algumas dormidinhas caso não ache nada, só para evitar que a CPU bata 100%.

        public override bool Execute()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            Log.LogMessage("Starting Websphere MQ transfer with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");            

            queueManager = new IBM.WMQ.MQQueueManager(QueueManagerName);
            messageCount = TotalBatches;

            inputQueue = queueManager.AccessQueue(InputQueueName, IBM.WMQ.MQC.MQOO_INPUT_SHARED);
            Thread t = new Thread(ProcessMQQueue);
            t.Start();            

            int count = 1;
            for (int i = 0; i < TotalBatches; i++)
            {
                IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();

                MemoryStream ms = new MemoryStream();
                StreamUtil u = new StreamUtil();
                u.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
                MQUtil.StreamToMQMessage(ms, msg);
                queueManager.Put(OutputQueueName, msg);
                count += BatchSize;
                Log.LogMessage("Sent " + count.ToString());                
            }

            while (!finished)
                Thread.Sleep(250);

            watch.Stop();
            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

            return true;
        }

        private void ProcessMQQueue()
        {
            StreamUtil u = new StreamUtil();
            bool keepListening = true;
            while (keepListening)
            {
                try
                {
                    IBM.WMQ.MQMessage msg = new IBM.WMQ.MQMessage();
                    inputQueue.Get(msg);
                    queueManager.Commit();
                    messageCount--;

                    MemoryStream ms = new MemoryStream();
                    MQUtil.MQMessageToStream(msg, ms);                    
                    msg.ClearMessage();

                    Log.LogMessage("Waiting for more " + messageCount.ToString());                    

                    u.ImportarStream(ConnString, ms);

                    if (messageCount <= 0)
                    {
                        keepListening = false;
                        finished = true;
                    }
                }
                catch (IBM.WMQ.MQException exception)
                {
                    if (exception.ReasonCode != IBM.WMQ.MQC.MQRC_NO_MSG_AVAILABLE)
                        throw;
                    else
                        Thread.Sleep(100);
                }

            }
        }

Código fonte

O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.

Conclusão

Nos números, a implementação com Websphere MQ ficou bem melhor que a de MSMQ. Na implementação inicial de MSMQ sem filas transacionais, o Websphere MQ ficou mais lento. Alguns fatores que podemos observar é o uso de filas persistentes, canais e também a duplicidade da stream de memória que pode ser otimizada. Não acho que a diferença tenha sido significativa.

Integrações entre Sistemas – Parte 7 – MSMQ

Objetivo

Nesta parte veremos como realizar integração através de MSMQ. MSMQ é uma tecnologia baseada em filas de propriedade da Microsoft (concorrente direta do Websphere MQ, anteriormente conhecido como MQSeries, da IBM).

Esse método de integração é popularmente conhecido como “mensageria”. É muito comum no mercado financeiro. Não estaremos abordando aqui todos os possíveis mecanismos de integração, apenas o que está sendo explorado nesta série que é a integração de grande volume de dados entre duas aplicações.

Este método também tem algumas características interessantes:

  • Ele pode ser transacional, ou seja, existe uma garantia de entrega e processamento das mensagens
  • Devido a esta característica transacional, a mensagem só pode ser entregue de forma atômica (inteira, nunca em pedaços)
  • Nos sistemas de mensageria, o sistema origem sempre entrega a mensagem para a fila de mensagens e o sistema destino pega a informação desta fila, o que faz com que exista um menor acoplamento entre eles (óbvio que eles precisam conhecer a mesma fila). O sistema destino também não precisa estar disponível para que a mensagem seja entregue (isso não quer dizer que ele não precise estar disponível para que a mensagem seja processada)
  • O sistema de filas garante a entrega da mensagem, porém, não faz nenhum tratamento em relação ao formato da mensagem. Desta forma, precisa existir um formato acordado entre os sistemas. Por essa razão, sistemas de mensageria são muito interessantes se usados em conjunto com barramentos de serviço (message brokers), pois esse trabalho de transformação tanto técnica quanto no conteúdo da mensagem (enriquecimento, por exemplo) pode ser realizado por uma ferramenta especialista nisso, tirando código de integração de dentro dos sistemas

Todas as características acima aplicam-se tanto ao MSMQ quanto ao Websphere MQ. No próximo post, falarei sobre Websphere MQ.

Setup

No nosso exemplo, precisaremos apenas criar as filas e configurar o MSMQ na máquina servidora.

O MSMQ é um componente do Windows. Ele não vem instalado por Default. No Windows 7, para instalá-lo basta seguir os passos:

  • Painel de Controle | Programas e Recursos | Ativar ou desativar recursos do Windows
  • Habilitar todos os itens abaixo de Servidor do MSMQ (Microsoft Message Queue)

Após clicar em Ok, o MSMQ será instalado.

Após a instalação, será necessário configurar as filas. Para isso, no Windows Explorer, clique com o botão direito no computador em seguida em “Gerenciar”. Em serviços e aplicativos aparecerá “Enfileiramento de mensagens”.

Em “filas privativas”, clique com o botão direito e nova. Em seguida criaremos duas filas (transacionais): integrationtests.in, integrationtests.out.

Servidor

O servidor é bastante simples, segue o código:

        public override bool  Execute()
        {
            Log.LogMessage("Starting MSMQ Server");

            outputQueue = new MessageQueue(OutputQueueName);

            inputQueue = new MessageQueue(InputQueueName);
            inputQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(queue_ReceiveCompleted);
            inputQueue.BeginReceive();

            while (true)
                Thread.Sleep(250);                        
        }

        private void queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
        {
            
            Log.LogMessage("Message received " + messageCount.ToString());
            messageCount++;

            Message resp = new Message();
            resp.BodyStream = new MemoryStream();
            
            util.ProcessClientBigRequest(ConnString, e.Message.BodyStream, resp.BodyStream, false, null);
            resp.BodyStream.Seek(0, SeekOrigin.Begin);
            
            outputQueue.Send(resp, MessageQueueTransactionType.Single);

            inputQueue.BeginReceive();
        }

A idéia aqui é ouvir o evento “ReceiveCompleted” da fila, já disparar um BeginReceive. Isso significa que a cada mensagem que pingar na fila, o método queue_ReceiveCompleted é chamado.

Sempre que chega uma nova mensagem, o servidor processa o request, e já armazena a resposta na própria stream da mensagem de resposta, passando a mesma como parâmetro no método ProcessClientBigRequest.

A mensagem com a resposta é colocada na fila de saída.

Cliente

O cliente segue a mesma filosofia do servidor:

        public override bool Execute()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            Log.LogMessage("Starting MSMQ transfer with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");            

            inputQueue = new MessageQueue(InputQueueName);
            inputQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(inQueue_ReceiveCompleted);
            inputQueue.BeginReceive();

            MessageQueue queue = new MessageQueue(OutputQueueName);            
            int count = 1;            
            for (int i = 0; i < TotalBatches; i++)
            {
                Message msg = new Message();
                msg.BodyStream = new MemoryStream();
                
                util.GenerateBigRequest(msg.BodyStream, false, count, count + (BatchSize - 1));
                msg.BodyStream.Seek(0, SeekOrigin.Begin);
                queue.Send(msg, MessageQueueTransactionType.Single);                

                count += BatchSize;
                messageCount++;
            }

            while (!finished)
                Thread.Sleep(250);

            watch.Stop();
            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

            return true;
        }

        private void inQueue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
        {
            inputQueue.BeginReceive();  

            Log.LogMessage("Received message. Waiting for " + messageCount.ToString() + " messages");

            messageCount--;
            util.ImportarStream(ConnString, e.Message.BodyStream);            

            if (messageCount <= 0)
                finished = true;            
        }

O cliente também inicializa sua fila de entrada e começa a esperar as mensagens de resposta. Em seguida, gera os lotes de requisições. Para cada mensagem recebida, o contador messageCount é decrementado, para saber a hora de parar de receber mensagens e calcular os tempos.

Código fonte

O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.

Conclusão

Surpreendentemente este método foi um dos mais performáticos sem o uso de filas transacionais, sempre seguindo essa idéia de quebrar em pequenos lotes. Se cada requisição gerar uma mensagem, ele também fica bastante prejudicado (como nos web services).

Usando filas transacionais, a performance caiu bastante. Essa implementação ainda precisa ser revista. Acredito que dê para melhorar significativamente a performance dela (ver post: Integrações entre sistemas – Parte 12 – MSMQ, revisitado).

O principal ganho aqui é que os processadores das duas máquinas estão trabalhando simultaneamente, devido à natureza assíncrona da comunicação. No momento que o cliente está gerando sua segunda requisição, já está processando a primeira resposta, ou seja, os intervalos em que existe espera pela rede, as respostas estão sendo processadas, tanto no cliente quanto no servidor.

Integrações entre Sistemas – Parte 6 – TCP Server

Objetivo

Nesta parte veremos como criar e transferir informações utilizando um TCP Server. A idéia é criar um método personalizado de transferir as informações, buscando melhor desempenho.

Em TCP, fiz dois testes, um que realiza toda a comunicação de uma forma síncrona e outro que o cliente processa as requisições numa segunda thread. A diferença de performance dos dois é significativa.

Não consegui nestes exemplos chegar numa implementação ótima. A idéia é somente ilustrar o método de transferência e já nos dar uma idéia de que é mais complexo que os demais. Ficarei devendo por enquanto uma implementação otimizada, mas depois que publicar a primeira “rodada” de experimentos, pretendo chegar numa implementação melhor.

Servidor

Para variar, o servidor foi implementado numa task MSBuild. O código principal é o seguinte:

        public override bool Execute()
        {
            Log.LogMessage("Listening to TCP Connections on port " + Port.ToString());

            IPAddress ipAddress = IPAddress.Any;
            TcpListener tcpListener = new TcpListener(ipAddress, Port);
            tcpListener.Start();
            tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);

            while (true)
                Thread.Sleep(250);                                    
        }

Este código somente deixa o servidor “ouvindo” numa porta (inicialmente usei 8081, tudo é parametrizável na task). Quando receber um conexão, ele começa a processar (de forma assíncrona, no resultado de BeginAcceptSocketCallback) e libera o servidor para receber outra conexão.

O código do método BeginAcceptSocketCallback segue:

        private void BeginAcceptSocketCallback(IAsyncResult result)
        {
            StreamUtil u = new StreamUtil();

            TcpListener tcpListener = (TcpListener)result.AsyncState;
            TcpClient tcpClient = tcpListener.EndAcceptTcpClient(result);
            
            tcpListener.BeginAcceptTcpClient(BeginAcceptSocketCallback, tcpListener);

            Stream clientStream = tcpClient.GetStream();

            int headerBytes;
            int count = 1;

            do {
                byte[] header = new byte[8];
                headerBytes = clientStream.Read(header, 0, 8);
                long size = BitConverter.ToInt64(header, 0);

                Log.LogMessage("Processing " + count.ToString());
                count++;

                MemoryStream ms = ReadBatch(clientStream, size);

                if (ms.Length > 0)               
                    WriteOutput(ms, u, clientStream);                                    
                else               
                    headerBytes = 0;                
            }
            while (headerBytes > 0);
            Log.LogMessage("Finished request.");
        }

Este código é executado a cada conexão recebida de um cliente. Convencionei um protocolo simplista entre cliente e servidor. O cliente manda um “long” com o tanto de dados que vai transferir, o servidor lê esse long e passa a receber a quantidade de bytes informada para dentro de uma memory stream. Em seguida, dá esse memory stream para processar pelo ProcessarClientBigRequest. O resultado também é armazenado numa memory stream. Em seguida, o servidor avisa o cliente quantos bytes vão voltar, e o cliente repete o procedimento do lado de lá.

Explicando passo a passo o método acima:

  • BeginAcceptTcpClient avisa o servidor que outra conexão pode ser recebida
  • No início do “do/while”, a quantidade de bytes fornecida pelo cliente é armazenada em size (recebe em char[] e converte para long)
  • O método ReadBatch lê a quantidade de bytes fornecida e devolve o MemoryStream correspondente
  • Se chegou algo no MemoryStream, a resposta é fornecida escrita pelo método WriteOutput
  • A comunicação termina quando o cliente fala que tem “zero” bytes a transmitir

O código de ReadBatch segue:

        private MemoryStream ReadBatch(Stream clientStream, long size)
        {            
            MemoryStream ms = new MemoryStream();
            byte[] buffer = new byte[100 * 1024];

            long tmpSize = size;

            int read;
            do{
                int bytesToRead = size > buffer.Length ? buffer.Length : (int)size;
                read = clientStream.Read(buffer, 0, bytesToRead);

                ms.Write(buffer, 0, read);

                tmpSize -= read;                
            }
            while (tmpSize > 0);                      

            return ms;
        }

Este método é o “feijão com arroz” de trabalhar com Streams. O buffer é um local de armazenamento temporário, apenas para ler em pedaços a stream. Ler de uma vez não é uma boa idéia. Aqui o código poderia ser otimizado, de uma forma que evitasse essa MemoryStream intermediária.

A idéia aqui é simples, vir enchendo o buffer e descarregando o memory stream, até que não exista mais nada pra ler. No final, devolver o memory stream.

O código do WriteOutput segue:

        private void WriteOutput(MemoryStream ms, StreamUtil u, Stream clientStream)
        {
            byte[] header = new byte[sizeof(Int64)];

            ms.Seek(0, SeekOrigin.Begin);
            MemoryStream outputStream = new MemoryStream();
            u.ProcessClientBigRequest(ConnString, ms, outputStream, false, null);
            outputStream.Seek(0, SeekOrigin.Begin);
            header = BitConverter.GetBytes(outputStream.Length);
            clientStream.Write(header, 0, header.Length);
            outputStream.CopyTo(clientStream);            
        }

Esse método, dado um memory stream, faz o processamento (ProcessClientBigRequest) numa memory stream (pra poder saber o tamanho), em seguida, envia o tamanho e copia a memory stream de volta para o cliente.

Essa é toda a lógica do servidor. Ele pode ser executado através do batch runserver-tcptestserver.bat (executa a task MSBuild correspondente).

Cliente – Single Thread

Neste teste, seguimos a seguinte abordagem:

  • O cliente manda a quantidade de bytes que vai transferir do lote, em seguida um lote de requests (Ex.: 1000 pedidos)
  • O servidor processa e escreve a quantidade de bytes e a resposta
  • O cliente processa a resposta, inicia o próximo request
  • O processo segue, até o final do lote

O código principal do teste segue:

        public override bool Execute()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            TcpClient tcpClient = new TcpClient();
            tcpClient.Connect(HostName, Port);

            Log.LogMessage("Starting TCP transfer (single thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");            

            NetworkStream stream = tcpClient.GetStream();
            int count = 1;
            for (int i = 0; i < TotalBatches; i++)
            {
                Log.LogMessage("Processing " + count.ToString());

                MemoryStream ms = GenerateRequest(count);
                WriteHeader(ms, stream);
                long totalBytes = ReadHeader(stream);

                MemoryStream inputStream = ReadResponse(totalBytes, stream);
                _util.ImportarStream(ConnString, inputStream);

                count += BatchSize;                
            }            
            stream.Write(new byte[8], 0, 8);            

            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");
            tcpClient.Close();

            return true;
        }

Este código começa iniciando o relógio, a conexão TCP. Para cada lote, executa GenerateRequest para obter o xml com o request, executa WriteHeader (escreve a quantidade de bytes a transferir e o request na stream do servidor), lê a quantidade de bytes retornado pelo servidor, através de ReadHeader. Em seguida, lê a resposta através de ReadResponse e importa o resultado (ImportarStream).

O código de GenerateRequest segue:

        private MemoryStream GenerateRequest(int count)
        {
            MemoryStream ms = new MemoryStream();            
            _util.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
            ms.Seek(0, SeekOrigin.Begin);

            return ms;
        }

O código de WriteHeader segue:

        private void WriteHeader(MemoryStream ms, Stream stream)
        {
            byte[] header = BitConverter.GetBytes(ms.Length);
            stream.Write(header, 0, header.Length);
            ms.CopyTo(stream);
        }

Sem muito segredo aqui. Envia o tamanho e copia a stream com o request na stream de rede.

O código de ReadHeader:

        private long ReadHeader(Stream stream)
        {
            byte[] header = new byte[sizeof(long)];
            int read = stream.Read(header, 0, header.Length);
            long totalBytes = BitConverter.ToInt64(header, 0);
            return totalBytes;
        }

Também sem segredo. O código de ReadResponse:

        private MemoryStream ReadResponse(long responseSize, Stream stream)
        {
            MemoryStream inputStream = new MemoryStream();

            long totalBytes = responseSize;
            byte[] buffer = new byte[100 * 1024];
            int bytesToRead;
            int read;
            do
            {
                bytesToRead = totalBytes > buffer.Length ? buffer.Length : (int)totalBytes;
                read = stream.Read(buffer, 0, bytesToRead);
                inputStream.Write(buffer, 0, read);

                totalBytes -= read;                                                
            }
            while (totalBytes > 0);

            inputStream.Seek(0, SeekOrigin.Begin);

            return inputStream;
        }

Esse é um pouquinho mais complicado. Ele armazena em totalBytes o total da resposta, calcula o quanto precisa ler (o restante, ou um buffer cheio), executa a leitura e escreve na stream temporária (inputStream). Vai decrementando totalBytes a cada leitura. Quando totalBytes chega a 0, é porque leu tudo o que precisava. A stream temporária é retornada pela função.

Cliente – Multi thread

Este teste usa uma abordagem um pouco diferente:

  • O cliente inicia uma thread para processar as respostas. Ela fica lá processando eternamente.
  • O cliente manda a quantidade de bytes existente no request, em seguida o request
  • O cliente recebe a quantidade de bytes da resposta e a resposta.
  • Invés de processá-la, o cliente joga numa queue interna e segue enviando requests e responses
  • Caso o cliente tenha 10 coisas na fila pra processar ele espera antes de receber as próximas respostas.
  • A thread de processamento de respostas, vai desenfileirando as respostas e processando as mesmas

A vantagem deste método é que enquanto aguarda a transferência de rede, o processador não para de trabalhar. Ainda não está na sua forma mais otimizada, mas já é um começo e dá uma diferença significativa nos tempos.

O código principal deste teste segue:

        public override bool Execute()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            Thread t = new Thread(ProcessMemoryStreamQueue);
            t.Start();

            Log.LogMessage("Starting TCP transfer (multi thread) with " + TotalBatches.ToString() + " batchs with " + BatchSize.ToString() + " items each");            

            TcpClient tcpClient = new TcpClient();
            tcpClient.Connect(HostName, Port);
            NetworkStream stream = tcpClient.GetStream();

            int count = 1;
            for (int i = 0; i < TotalBatches; i++)
            {
                Log.LogMessage("Processing " + i.ToString());

                MemoryStream ms = new MemoryStream();
                
                util.GenerateBigRequest(ms, false, count, count + (BatchSize - 1));
                ms.Seek(0, SeekOrigin.Begin);

                WriteHeader(stream, ms);
                
                //Sends the request
                ms.CopyTo(stream);

                long totalBytes = GetBytesToRead(stream);
                
                //Sleeps if have 10 responses to process. 
                while (memoryStreamQueue.Count >= 10)
                    Thread.Sleep(100);

                MemoryStream responseStream = ReadResponse(totalBytes, stream);
                //Don't process response. Just queue it. The other thread will process it.
                lock (memoryStreamQueue)
                    memoryStreamQueue.Enqueue(responseStream);

                count += BatchSize;                
            }
            //Mark end of transfer.
            stream.Write(new byte[8], 0, 8);
                       
            tcpClient.Close();
            watch.Stop();
            t.Abort();

            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

            return true;
        }
		
		        private void WriteHeader(Stream stream, Stream streamToSend)
        {
            byte[] header = BitConverter.GetBytes(streamToSend.Length);
            stream.Write(header, 0, header.Length);
        }

        private long GetBytesToRead(Stream stream)
        {
            byte[] header = new byte[sizeof(long)];
            int read = stream.Read(header, 0, header.Length);
            return BitConverter.ToInt64(header, 0);
        }

        private MemoryStream ReadResponse(long responseSize, Stream stream)
        {
            MemoryStream inputStream = new MemoryStream();
            byte[] buffer = new byte[100 * 1024];
            int bytesToRead;
            int read;
            long totalBytes = responseSize;

            do
            {
                bytesToRead =  totalBytes > buffer.Length ? buffer.Length : (int)totalBytes;
                read = stream.Read(buffer, 0, bytesToRead);

                inputStream.Write(buffer, 0, read);
                totalBytes -= read;
            }
            while (totalBytes > 0);
                        
            inputStream.Seek(0, SeekOrigin.Begin);
            return inputStream;
        }

Após fazer todo o código de inicialização, da thread, relógio, conexão TCP, os lotes começam a ser processados. Os métodos WriteHeader, GetBytesToRead e ReadResponse seguem praticamente a mesma lógica do teste anterior. O que muda realmente está no trecho:

       lock (memoryStreamQueue)
                    memoryStreamQueue.Enqueue(responseStream);

Essa memoryStreamQueue é simplesmente uma Queue<MemoryStream> e o uso de lock é necessário para evitar concorrência entre as duas threads.

A thread auxiliar, que faz o processamento executa o seguinte código:

        private void ProcessMemoryStreamQueue()
        {
            while (true)
            {
                if (memoryStreamQueue.Count > 0)
                {
                    MemoryStream ms = null;
                    lock (memoryStreamQueue)
                    {
                        ms = memoryStreamQueue.Dequeue();                        
                    }
                    util.ImportarStream(ConnString, ms);                    
                }
                Thread.Sleep(200);
            }
        }

Este método é bastante simples. Fica num loop eterno (Dando dormidinhas de 200 mili para evitar um 100% CPU caso não tenha nada pra fazer). Desempilha um memory stream e faz o processamento.

Conclusão

Apesar de aparentar ser muito eficiente, este método (nesta implementação) não foi o mais performático. Obviamente, a versão multi-thread ficou mais rápida, pois otimiza um pouco mais o uso do processador.

Como percebemos também, a implementação não é lá muito simples, apesar de ser possível fazer uma “casca” de servidor e cliente abstraindo somente o conteúdo.

Código-fonte

O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.

Integrações entre Sistemas – Parte 5 – HTTP Request

Objetivo

Nesta parte veremos mais uma abordagem para transferência de informações que é através de um request HTTP.

Setup

O servidor para nosso teste foi implementado como uma aplicação Web (IntegrationTests.WebApp), num handler http genérico. Para utilizá-lo, precisamos configurar nosso IIS. Para isso, seguiremos os passos:

  • Abrir a console do IIS (inetmgr)
  • Adicionar novo aplicativo, nome “integrationtests2”, Application Pool DefaultAppPool (v. 4.0, Integrated)
  • Caminho físico: apontar para o diretório IntegrationTests\IntegrationTests.WebApp
  • Configurar a connection string do banco de dados, no arquivo web.config existente no diretório IntegrationTests\IntegrationTests.WebApp

Código do teste

O teste segue a seguinte abordagem:

  • O cliente executa um request http no servidor, baseado na stream do request, escreve o xml com a requisição para o servidor
  • O servidor, recebe o request, enquanto vai escrevendo a resposta, vai dando “Flushes” no request, de forma que a stream de resposta é devolvida em pedaços para o cliente
  • O cliente, vem lendo a stream de resposta enquanto o servidor ainda está processando o request.

Segue o fonte do servidor, escrito como um generic handler (.ashx) numa aplicação web padrão:

    public class GenericHandler : IHttpHandler
    {        
        public void ProcessRequest(HttpContext context)
        {
            if (String.IsNullOrEmpty(connString))
                GetConnString();

            if (context.Request.Headers["Flush"] == "true")
                flush = true;

            context.Response.ContentType = "text/xml";
            this.context = context;
            
            StreamUtil u = new StreamUtil();            
            u.ProcessClientBigRequest(connString, context.Request.InputStream, context.Response.OutputStream, true, Flush);
            context.Response.Flush();
            context.Response.End();
        }

        private void Flush()
        {
            if (flush)
                context.Response.Flush();
        }

Mais uma implementação simplista. A idéia aqui é obter a connection string, posteriormente, ler o header pra saber se os flushs parciais serão dados ou não. O método Flush, é dado como um callback para ProcessClientBigRequest. No meio do loop dele, ele executa o callback que vem dando Response.Flush, para cada pedacinho de xml tratado.

O código do cliente (task MSBuild) segue:

        public override bool Execute()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(new Uri(Uri));            
            request.Method = "POST";
            request.ContentType = "text/xml";

            request.BeginGetRequestStream(GetRequestStreamCallback, request);

            while (!finished)
                Thread.Sleep(250);

            watch.Stop();
            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");


            return true;
        }

        private void GetRequestStreamCallback(IAsyncResult result)
        {
            HttpWebRequest request = (HttpWebRequest)result.AsyncState;
            if (Flush)
                request.Headers.Add("Flush:true");

            Stream requestStream = request.EndGetRequestStream(result);
            Log.LogMessage("Writing request with " + BigRequestSize.ToString() + " itens. Flush " + Flush.ToString());

            StreamUtil util = new StreamUtil();
                        
            util.GenerateBigRequest(requestStream, true, BigRequestSize);
            request.BeginGetResponse(GetResponseCallback, request);
        }

        private void GetResponseCallback(IAsyncResult result)
        {
            HttpWebRequest request = (HttpWebRequest)result.AsyncState;
            HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result);

            Stream responseStream = response.GetResponseStream();
                
            Log.LogMessage("Reading response");

            _util = new StreamUtil();
            _util.ImportarStream(ConnString, responseStream);
            finished = true;            
        }

O método Execute dispara o request (usando BeginGetRequestStream) e fica aguardando o retorno. O callback do BeginGetRequestStream é o método GetRequestStreamCallback. Este método adiciona os headers, e escreve na stream de request o xml com o request, item a item. Em seguida, executa o método BeginGetResponse, que por sua vez chama o callback GetResponseCallback.

O método GetResponseCallback, interpreta a stream contida na resposta, inserindo os resultados na base.

Conclusão

A idéia inicial que eu tinha ao construir este teste era vir devolvendo o response enquanto recebia o request. Percebi que por uma característica do protocolo http, isso é impossível. Não é possível começar a escrever a resposta enquanto o request ainda está sendo lido. Isso faz com que ocorra uma bufferização do request do lado do servidor, ou seja, como ele precisa esperar chegar todo o request antes de começar a gerar a resposta, ele vai alocando memória para esta stream (no lado do servidor), até que comece a paginação. A partir do momento que ele começa a liberar a resposta, esta memória vai sendo liberada.

Apesar de muito eficiente, ao executarmos este método com um request grande, percebemos um consumo alto de memória. É um método relativamente simples de transferir informações, tem esse ponto de atenção da memória e mesmo assim, não aproveita muito bem os recursos das duas máquinas, visto que acaba serializando o processamento (precisa esperar todo o request para começar a gerar a resposta). O único ponto que ele torna mais eficiente é que à medida que a resposta vai sendo escrita, o cliente já vai processando ela.

Código fonte da solução

O código fonte está disponível no git hub: https://github.com/ericlemes/IntegrationTests.

Integrações entre Sistemas – Parte 4 – WCF (Soap e net.tcp)

Objetivo

Nesta parte faremos a discussão do código da comunicação através de Web Service e net.tcp.

Servidor

O servidor foi escrito como um serviço WCF simples, com dois endpoints configurados, um net.tcp, e um soap. O código para o serviço segue:

    public class IntegrationTestsService : IIntegrationTestsService
    {
        private static string connString;

        private void GetConnString()
        {

            connString = System.Configuration.ConfigurationManager.AppSettings["ConnString"];
        }
        
        public ServiceTable GetServiceTable(int ServiceTableID)
        {
            if (String.IsNullOrEmpty(connString))
                GetConnString();
            return DAO.GetServiceTable(connString, ServiceTableID);
        }

        public List<ServiceTable> GetServiceTables(int IDInicial, int IDFinal)
        {
            if (String.IsNullOrEmpty(connString))
                GetConnString();
            List<ServiceTable> l = new List<ServiceTable>();
            for (int i = IDInicial; i <= IDFinal; i++)
            {
                l.Add(DAO.GetServiceTable(connString, i));
            }
            return l;
        }
    }

A connection string está sendo recuperada do web.config, e os mesmos métodos da classe DAO são usados. O primeiro método, GetServiceTable é utilizado na chamada “simples”, pergunta e resposta.

O método GetServiceTables, retorna uma lista baseado num ID inicial e final. Ele será utilizado nas chamadas em lote.

WCF em pequenos requests

Este teste também foi implementado em MSBuild, na task WCFSmallRequestsTest. A mesma task pode ser usada tanto para http quanto para net.tcp, dependendo da configuração. Segue o código:

        public override bool Execute()
        {

            Binding binding;

            if (this.EndpointType == "http")
            {
                binding = new BasicHttpBinding();
                ((BasicHttpBinding)binding).MessageEncoding = WSMessageEncoding.Text;
                ((BasicHttpBinding)binding).TextEncoding = Encoding.UTF8;
                ((BasicHttpBinding)binding).TransferMode = TransferMode.Buffered;
                ((BasicHttpBinding)binding).Security.Mode = BasicHttpSecurityMode.None;
            }
            else if (this.EndpointType == "nettcp")
            {
                binding = new NetTcpBinding();
                ((NetTcpBinding)binding).MaxReceivedMessageSize = 1024 * 1024;
                ((NetTcpBinding)binding).Security.Mode = SecurityMode.None;
                ((NetTcpBinding)binding).CloseTimeout = new TimeSpan(0, 1, 0);
                ((NetTcpBinding)binding).OpenTimeout = new TimeSpan(0, 1, 10);
                ((NetTcpBinding)binding).ReceiveTimeout = new TimeSpan(0, 1, 10);
                ((NetTcpBinding)binding).SendTimeout = new TimeSpan(0, 1, 10);
            }
            else
                throw new ArgumentException("Invalid value for EndpointType. Expected: http, nettcp");
            EndpointAddress address = new EndpointAddress(new Uri(WebServiceUri));

            IntegrationTestsService.IntegrationTestsServiceClient client = new IntegrationTestsService.IntegrationTestsServiceClient(binding, address);

            Log.LogMessage("Doing " + TotalRequests.ToString() + " calls");

            Stopwatch watch = new Stopwatch();
            watch.Start();
            
            for (int i = 1; i <= TotalRequests; i++)
            {
                ServiceTable t = null;
                bool tryAgain = true;
                while (tryAgain)
                {
                    try
                    {
                        t = client.GetServiceTable(i);
                        tryAgain = false;
                    }
                    catch (EndpointNotFoundException)
                    {
                        Thread.Sleep(100);
                        t = client.GetServiceTable(i);
                        tryAgain = true;
                    }
                }

                ServiceTable t2 = new ServiceTable();
                t2.ServiceTableID = t.ServiceTableID;
                t2.DescServiceTable = t.DescServiceTable;
                t2.Value = t.Value;
                t2.CreationDate = t.CreationDate;
                t2.StringField1 = t.StringField1;
                t2.StringField2 = t.StringField2;

                DAO.ProcessServiceTable(ConnString, t2);            
            }

            watch.Stop();
            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

            
            
            return true;
        }

Neste teste, o serviço foi importado, gerando o namespace IntegrationTestsService. O método GetServiceTable é chamado de um em um para concluir a transferência de todo o lote de informações.

WCF em pequenos lotes

Este teste foi implementado na task MSBuild WCFSmallBatchesTest. Segue o código:

        public override bool Execute()
        {
            Binding binding;

            if (this.EndpointType == "http")
            {
                binding = new BasicHttpBinding();
                ((BasicHttpBinding)binding).MaxReceivedMessageSize = 2048 * 1024;
            }
            else if (this.EndpointType == "nettcp")
            {
                binding = new NetTcpBinding();
                ((NetTcpBinding)binding).MaxReceivedMessageSize = 1024 * 1024;
                ((NetTcpBinding)binding).Security.Mode = SecurityMode.None;
                ((NetTcpBinding)binding).CloseTimeout = new TimeSpan(0, 0, 10);
                ((NetTcpBinding)binding).OpenTimeout = new TimeSpan(0, 0, 10);
                ((NetTcpBinding)binding).ReceiveTimeout = new TimeSpan(0, 0, 10);
                ((NetTcpBinding)binding).SendTimeout = new TimeSpan(0, 0, 10);
            }
            else
                throw new ArgumentException("Invalid value for EndpointType. Expected: http, nettcp");
            EndpointAddress address = new EndpointAddress(new Uri(WebServiceUri));

            IntegrationTestsService.IntegrationTestsServiceClient client = new IntegrationTestsService.IntegrationTestsServiceClient(binding, address);

            Log.LogMessage("Doing " + TotalBatches.ToString() + " batch calls with " + BatchSize.ToString() + " itens each");

            Stopwatch watch = new Stopwatch();
            watch.Start();

            int count = 1;
            for (int i = 0; i < TotalBatches; i++)
            {
                ServiceTable[] stArray = client.GetServiceTables(count, count + (BatchSize - 1));

                foreach (ServiceTable t in stArray)
                {

                    ServiceTable t2 = new ServiceTable();
                    t2.ServiceTableID = t.ServiceTableID;
                    t2.DescServiceTable = t.DescServiceTable;
                    t2.Value = t.Value;
                    t2.CreationDate = t.CreationDate;
                    t2.StringField1 = t.StringField1;
                    t2.StringField2 = t.StringField2;

                    DAO.ProcessServiceTable(ConnString, t2);
                }
                count += BatchSize;
            }

            watch.Stop();
            Log.LogMessage("Total processing time: " + watch.Elapsed.TotalSeconds.ToString("0.00") + " seconds");

            return true;
        }

A idéia deste código é invés de fazer uma chamada para cada registro desejado (o que resulta num request), empacotamos o request em pequenos lotes, ou seja, colocamos o ID inicial e final do request e recebemos a resposta numa lista de objetos ServiceTable.

A sugestão inicial era de 1000 em 1000, mas fiz uma melhoria no código para que o tamanho do batch possa ser configurado. Abaixo, veremos as diferenças de tempo.

Estatísticas e Conclusão

Abaixo observamos algumas estatísticas de execução (usando duas máquinas, numa rede local):

Método Tempos net.tcp (segundos) Tempos soap (segundos)
20.000 chamadas 201,2 193,65
2000 chamadas em lotes de 10 40,87 39,75
200 chamadas em lotes de 100 24,46 25,2
20 chamadas em lotes de 1000 22,39 19,84
10 chamadas em lotes de 2000 20,23 18,82
5 chamadas em lotes de 4000 18,44 19
4 chamadas em lotes de 5000 18,69 17,94

Essas estatísticas obviamente sofrem variações em diferentes execuções, porém, percebemos que enviando as chamas em lotes obtemos ganhos significativos quando passamos para lotes de 10, depois lotes de 100. A partir daí, o ganho passa a ser muito pequeno (apesar de existir).

O trade-off aqui é que quanto maior o request, maior o custo de reprocessamento no caso de uma perda de request. Quanto mais aumentamos o request mais fugimos da característica do protocolo http, e consequentemente dependemos de configurações específicas no cliente e no servidor (aumentar o tamanho do request http, por exemplo).

Outra conclusão que podemos chegar é que o uso do net.tcp em relação ao soap, apresenta muito ganho no cenário de pequenso requests (20.000 chamadas). A medida que o request aumenta, a diferença já não é tão significativa.