Mês: abril 2014

Integrações entre Sistemas – Parte 14 – Web Service (Async)

Introdução

Com a evolução do framework e a implementação do modelo Async/Await, fiquei curioso sobre como seria o desempenho em relação aos demais métodos de integração já apresentados nas demais partes desta série (Ver o Índice para localizar as demais).

O que muda na implementação?

Basicamente, peguei a mesma implementação usada na parte 4 para SOAP e NET.TCP e converti todas as chamadas, do cliente e do servidor para o modelo async/await. Na série sobre programação paralela (parte 4), expliquei no que consiste o modelo e a razão dos ganhos de desempenho apresentados por ele.

No código servidor, fizemos as seguintes alterações:

        public List<ServiceTable> GetServiceTablesAsynchronous(int IDInicial, int IDFinal)
        {
            if (String.IsNullOrEmpty(connString))
                GetConnString();
            List<ServiceTable> l = new List<ServiceTable>();
            Queue<Task<ServiceTable>> queue = new Queue<Task<ServiceTable>>();
            for (int i = IDInicial; i <= IDFinal; i++)
                queue.Enqueue(DAO.GetServiceTableAsync(connString, i));

            while (queue.Count > 0)
            {
                Task<ServiceTable> task = queue.Dequeue();
                task.Wait();
                l.Add(task.Result);
            }
            return l;
        }

A idéia do código acima é que cada chamada que vai no banco de dados é executada de forma assíncrona, sendo que a chamada subsequente não espera o término dela para fazer a próxima requisição ao banco. Por isso é usada uma Queue para guardar todas as tasks e aguardar o término delas antes de retornar a resposta a quem chamou o serviço externamente.

Óbvio que seria muito mais eficiente fazer a query, assíncrona com todo o lote de uma vez só, mas para manter o mesmo cenário utilizada nas 13 partes anteriores dessa série, utilizei essa abordagem.

A chamada ao banco, tem a seguinte implementação:

        public static async Task<ServiceTable> GetServiceTableAsync(string ConnString, int ServiceTableID, TaskLoggingHelper Log)
        {
            ServiceTable result = new ServiceTable();

            SqlConnection conn = new SqlConnection(ConnString);
            conn.Open();

            SqlCommand cmd = new SqlCommand("select ServiceTableID, DescServiceTable, Value, CreationDate, StringField1, StringField2 " +
                    "from ServiceTable where ServiceTableID = @ServiceTableID", conn);

            using (conn)
            {
                SqlParameter p1 = cmd.Parameters.Add("@ServiceTableID", SqlDbType.Int);
                p1.Value = ServiceTableID;

                SqlDataReader rd = await cmd.ExecuteReaderAsync();
                rd.Read();
                using (rd)
                {
                    result.ServiceTableID = rd.GetInt32(0);
                    result.DescServiceTable = rd.GetString(1);
                    result.Value = (float)rd.GetDouble(2);
                    result.CreationDate = rd.GetDateTime(3);
                    result.StringField1 = rd.GetString(4);
                    result.StringField2 = rd.GetString(5);
                }
            }

            if (Log != null)
                Log.LogMessage("Getting ServiceTableID: " + ServiceTableID.ToString());

            return result;
        }

A principal mudança no código foi “await cmd.ExecuteReaderAsync();”. Basicamente utilizando novamente o modelo assíncrono para cada cutucada no banco de dados.

No código do cliente, utilizamos o stub do WCF em sua versão assícrona. Basicamente, ao adicionar as referências, utilizamos as opções:

ServiceRef1

ServiceRef2

Os métodos gerados para o serviço WCF ganham o sufixo “Async” no final e os mesmos retornam Tasks, invés das tradicionais implementações síncronas.

O consumo deles foi escrito da seguinte maneira:

            int count = 1;
            Queue<Task<ServiceTable[]>> tasks = new Queue<Task<ServiceTable[]>>();            
            for (int i = 0; i < TotalBatches; i++)
            {
                tasks.Enqueue(client.GetServiceTablesAsynchronousAsync(count, count + (BatchSize - 1)));                
                count += BatchSize;
                
            }

            Queue<Task> queue2 = new Queue<Task>();

            while (tasks.Count > 0)
            {                
                Task<ServiceTable[]> task = tasks.Dequeue();
                                
                task.Wait();
                ServiceTable[] stArray = task.Result;

                foreach (ServiceTable t in stArray)
                {

                    ServiceTable t2 = new ServiceTable();
                    t2.ServiceTableID = t.ServiceTableID;
                    t2.DescServiceTable = t.DescServiceTable;
                    t2.Value = t.Value;
                    t2.CreationDate = t.CreationDate;
                    t2.StringField1 = t.StringField1;
                    t2.StringField2 = t.StringField2;

                    queue2.Enqueue(DAO.ProcessServiceTableAsync(ConnString, t2));
                }                
            }

            while (queue2.Count > 0)
                queue2.Dequeue().Wait();

A idéia é muito similar ao lado do servidor. Enquanto eu ainda não recebi a resposta para processar, continuo enviando requisições. Ao terminar de enviar todas as requisições, espero a primeira resposta, e vou processando o resultado de cada uma delas.

O nome do método GetServiceTablesAsynchronousAsync ficou bem estranho, porque minha falta de criatividade colocou o sufixo “Asynchronous” no nome do método no servidor (para diferenciar do outro método, síncrono) e ao gerar a chamada do método, o WCF adicionou o sufixo Async novamente.

O método que vai no banco de dados, do lado do cliente também foi implementado de forma assíncrona:

        public static async System.Threading.Tasks.Task ProcessServiceTableAsync(string ConnString, ServiceTable table)
        {
            SqlConnection conn = new SqlConnection(ConnString);
            conn.Open();

            SqlCommand cmd = new SqlCommand("insert into ClientTable (ClientTableID, DescClientTable, Value, CreationDate, StringField1, StringField2)" +
                    "values (@ClientTableID, @DescClientTable, @Value, @CreationDate, @StringField1, @StringField2)", conn);

            using (conn)
            {

                SqlParameter p1 = cmd.Parameters.Add("@ClientTableID", SqlDbType.Int);
                SqlParameter p2 = cmd.Parameters.Add("@DescClientTable", SqlDbType.VarChar, 200);
                SqlParameter p3 = cmd.Parameters.Add("@Value", SqlDbType.Float);
                SqlParameter p4 = cmd.Parameters.Add("@CreationDate", SqlDbType.DateTime);
                SqlParameter p5 = cmd.Parameters.Add("@StringField1", SqlDbType.VarChar, 200);
                SqlParameter p6 = cmd.Parameters.Add("@StringField2", SqlDbType.VarChar, 200);

                p1.Value = table.ServiceTableID;
                p2.Value = table.DescServiceTable;
                p3.Value = table.Value;
                p4.Value = table.CreationDate;
                p5.Value = table.StringField1;
                p6.Value = table.StringField2;

                await cmd.ExecuteNonQueryAsync();
            }
        }

O segredo aqui está em ExecuteNonQueryAsync. Isso significa que antes de receber a resposta do insert, já estou preparando o próximo insert.

A implementação toda tem por objetivo eliminar toda a espera entre cliente e servidor.

Tempos de execução

Como novamente tive mudanças no meu ambiente, fui obrigado a refazer os tempos. Como são muitos números e demora algumas preciosas horas reexecutá-los fiz apenas alguns métodos para conseguirmos ter uma relação de comparação com os demais métodos. A metodologia é a mesma. Cada teste é executado 10 vezes e o tempo apresentado aqui é a média.

Temos os seguintes tempos:

Comparativo1

Comparativo2

Como observamos, a implementação de NET.TCP ficou mais rápida que o meu antigo socket server multi thread! Até os métodos estudados, este era o mais rápido e mais complexo de implementar.

Isso significa que agora vou ter que reimplementar os servidores TCP puros novamente, utilizando dos mesmos benefícios do async/await. Quem sabe num post futuro?

Conclusão

A Microsoft fez um belo trabalho na implementação dessas API’s baseadas em async/await. Conseguiu tornar muito simples a aplicação prática de um conceito de difícil implementação.

Era isso!

Programação Paralela – Parte 4 – I/O Assícrono com Async/Await

Introdução

Na parte 3 desta série, vimos como o I/O assíncrono pode ser mais eficiente, liberando a CPU da thread que espera por I/O para fazer outra atividade, aumentando o desempenho geral da aplicação.

Vimos também como essa implementação pode ser complexa. O objetivo deste post é mostrar como as novas features de async/await, implementadas no framework 4.5 podem simplificar este processo.

Utilizando o async/await

No exemplo anterior, utilizamos um método que faz uma grande escrita em disco, seguida de um grande processamento. Vamos recriar este exemplo, utilizando o padrão async/await.

        [TestMethod]
        public void BigAsyncIOTest2()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();
            
            for (int i = 0; i &lt; 10; i++)            
                DoBigWriteAsync().Wait();
            
            watch.Stop();
            Console.WriteLine(&quot;Total elapsed (ms): &quot; + watch.ElapsedMilliseconds);
        }

        private async Task DoBigWriteAsync()
        {
            FileStream fs = new FileStream(&quot;file1.txt&quot;, FileMode.Create, FileAccess.ReadWrite);
            byte[] buffer = new byte[100 * 1024 * 1024]; //Não faça isso. Aloca 100Mb de memória.                        
            Task t = fs.WriteAsync(buffer, 0, buffer.Length);           
            Task t2 = t.ContinueWith(new Action((task) =&gt; { 
                fs.Flush();
                fs.Close();
            }));
            BubbleSort.DoBigBubbleSort(10000, null);
            await t;            
        }

O método DoBigWriteAsync teve um modificador “async” iniciado e um “await t” inserido no final. O método WriteAsync implementado pela classe Stream, provê os mesmos mecanismos. Na verdade em nível mais baixo da API é ele quem faz o tratamento da chamada de I/O assícrono.

Na prática, o que acontece nesta implementação é que cada vez que WriteAsync é chamado, uma continuação é inserida (para dar o Flush e o Close na Stream) e o BubbleSort é iniciado. Somente quando ocorre o “await t”, é aguardado o término do I/O para que o método devolva o resultado.

Na prática, o tempo que o método ficou parado com “await” não travou a thread da aplicação e sim a disponibilizou para executar outras atividades enquanto o I/O está em andamento. Aí está toda a mágica e a simplicidade do async/await. Você escreve um método com a mesma característica e simplicidade que escreve um método sícrono, porém, o framework não travará a thread.

Qual o desempenho?

Pelos testes muito simples executados nesta série, podemos perceber que há um overhead em relação ao método assíncrono puro (utilizado na parte 2 da série). O método executou em 13835 milisegundos, contra 9111 milisegundos da implementação pura. Meu palpite é que este overhead ocorre devido à mecânica da Task Parallel Library que exploraremos em seguida.

Quais as vantagens de usar?

Praticamente todas as API’s de I/O no framework estão sendo reescritas para suportar este padrão, entre elas: Acesso HTTP, Sockets, Web Services, Banco de dados (DataReader), etc. A lista completa está em: http://msdn.microsoft.com/pt-br/library/hh191443.aspx.

Você pode fazer, com baixo esforço, sua aplicação sofrer muito menos com problemas de responsividade e espera por I/O. Somente no exemplo acima, partimos de 20 segundos numa implementação totalmente síncrona para 14 segundos, basicamente trocando keywords nos métodos.

Como funciona?

Sempre que um método assíncrono (Ex.: WriteAsync) é chamado, o mesmo inicia a operação de I/O e cria uma Task que somente conclui quando o retorno do callback avisando que a operação de I/O terminou. Ou seja, quando o callback termina, a task é marcada como concluída. Nenhuma nova thread (worker thread) é criada neste processo. Este é um dos pontos que mais me gera confusão, pois apesar da classe Task fazer parte do namespace System.Threading, não significa que sempre que uma task é criada a mesma é executada numa thread separada.

Pelo próprio Task Parallel Library, toda nova task, é criada numa TaskFactory padrão, que utiliza um TaskScheduler padrão. O TaskScheduler padrão do framework é um ThreadPoolTaskScheduler. Ele tem um comportamento que a cada nova Task ele delega para seu thread pool a sua execução. Em outras palavras, ele tenta utilizar alguma thread livre do seu thread pool. Se não possui nenhuma, cria uma. Se possui threads demais (maior que a quantidade de cores da máquina), ele aguarda alguma task finalizar e pega uma das threads já criadas.

Com este comportamento, ele consegue manter o maior paralelismo possível, com a menor quantidade de trocas de contexto possível. Esperto, né?

Esse comportamento também proporciona um outro comportamento desagradável que é o fato de não sabermos exatamente em que thread a Task será executada, o que pode trazer potenciais problemas de locks, deadlocks, ou problemas com código não thread-safe sendo executado dentro de uma task. É o preço.

Por isso o overhead imposto acima. Numa execução simples, com a máquina com pouco processamento (cenário deste teste), há um incremento do tempo devido ao overhead imposto pelo processo de escalonamento das tasks. Num cenário de alta carga, o task scheduler deve valer seu preço.

Em contrapartida, sempre que executamos uma operação de I/O assíncrono no modelo async/await, esta task é enfileirada no TaskScheduler. Se ela está aguardando conclusão, a thread que originalmente estaria esperando pega outra Task e a executa. Essa outra task pode ser outra operação de I/O, um pedaço de processamento paralelo utilizado numa expressão PLINQ (parallel LINQ) ou qualquer outro processamento do framework que esteja encapsulado numa task.

Era isso.

Programação Paralela – Parte 3 – Operações de I/O e Locks

Introdução

Na parte 1 desta série, observamos que muitas vezes aumentar a quantidade de threads da aplicação sem nenhum critério pode causar pior desempenho do que manter a aplicação numa única thread. Na parte 2, como os locks também podem gerar problemas potenciais de performance. E qual a relação destes dois assuntos com I/O (input/output ou entrada/saída)?

O objetivo deste post é explicar esta relação.

O que são operações de I/O

Sempre que uma aplicação acessa algum recurso que está em algum periférico do computador (fora da CPU ou fora da memória), uma operação de I/O é realizada. Ou seja, praticamente tudo. Operações de I/O ocorrem o tempo todo nas aplicações.

Exemplos de operações de I/O são acesso a disco e acesso a rede. Se acesso a rede é uma operação de I/O, obviamente tudo que implica comunicação, seja com banco de dados utilizando o driver do fornecedor, requests HTTP, TCP, UDP, ou seja, qualquer coisa que sai para rede.

Operações de I/O síncronas geram espera nas threads

Sempre que uma operação de I/O é feita, há uma espera na thread. Explicando de uma forma mais detalhada, se sua aplicação faz um acesso a disco de forma síncrona, no momento em que o sistema operacional vai no disco, a thread está parada esperando o cabeçote do HD se mover até o local correto (seek time), realizar a leitura e devolver as informações. Existem diversos mecanismos de cache para minimizar o movimento físico do HD, mas usei um exemplo forte para que seja possível imaginar a razão da lentidão.

Em máquinas modernas esse tempo é praticamente imperceptível para humanos (fica nos milisegundos), mas do ponto de vista do computador, são vários ciclos de CPU desperdiçados que poderiam estar sendo usados para outras operações enquanto a CPU está aguardando a resposta do periférico.

Um exemplo de I/O síncrono:

        public void SynchronousIOTest()
        {
            FileStream fs = new FileStream("file1.txt", FileMode.Create, FileAccess.ReadWrite);            
            byte[] buffer = new byte[100 * 1024 * 1024]; //Não faça isso. Aloca 100Mb de memória.            
            fs.Write(buffer, 0, buffer.Length);
            fs.Flush();
            fs.Close();
        }

A idéia do código acima é que no “Write”, a thread fica esperando o disco escrever 100Mb antes de continuar.

Isso executa bem rápido. Na minha máquina demorou ~1 segundo. É válido lembrar que em servidores, com diversas aplicações acessando o disco simultaneamente, isso pode piorar significativamente.

A mesma idéia, de forma assíncrona.

        public void AsyncIOTest()
        {
            FileStream fs = new FileStream("file1.txt", FileMode.Create, FileAccess.ReadWrite);
            byte[] buffer = new byte[100 * 1024 * 1024]; //Não faça isso. Aloca 100Mb de memória.            
            fs.BeginWrite(buffer, 0, buffer.Length, null, null);
            fs.Flush();
            fs.Close();
        }

A única coisa que mudou foi o “BeginWrite” no lugar do Write. O tempo cai para 10ms. Óbvio que a implementação acima está errada, mas ela serve para ilustrar que no primeiro exemplo, a thread espera toda a operação de I/O enquanto no segundo exemplo, ela continua a execução mesmo que a operação de I/O não esteja completa.

Numa implementação correta de I/O assíncrono deveria haver o tratamento do callback. Na verdade, a API de I/O “avisa” sua aplicação quando a operação de I/O é concluída, sendo que entre o BeginWrite e a chamada do callback, você pode usar a CPU para o que bem entender.

Em outras palavras, a espera por I/O funciona exatamente como um lock na thread. No exemplo acima, usei uma escrita em disco, mas uma escrita em rede funciona exatamente da mesma maneira. Sim, toda vez que você executa o seu SqlCommand, ou enche o seu DataSet com dados, ou chama seu querido Entity Framework, NHibernate, por debaixo dos panos, essa espera por I/O está acontecendo e está travando a thread (ressalvas para quem já utiliza as novas API’s de I/O).

Afinal, como funciona o I/O assíncrono?

É uma excelente pergunta. É algo que funciona muito próximo de magia negra. A explicação simples é que sempre que ocorre uma operação de I/O, a CPU pede para o periférico realizar a operação e continua processando. CPU’s são máquinas sequenciais e não param nunca. CPU ociosa é CPU jogada fora.

Quando o periférico termina de fazer o que tem que fazer, ele gera uma interrupção (quem já configurou jumper na unha, já ouviu falar de IRQ), avisando a CPU que ele terminou.

Essencialmente, o Windows encapsulou muito bem isso numa API chamada I/O completion ports. Essa API tem a idéia de permitir que, lá do nível dos dispositivos, passando pelos drivers (kernel mode) até o código que executa em user mode, a thread que originalmente pediu a operação de I/O receba uma notificação de que ela terminou.

O post do Stephen Cleary “there is no thread” explica passo a passo como isso acontece, sofrendo diversos “empréstimos” de threads existentes até notificar a thread original. O ponto que ele detalha é que não existe uma worker thread parada, esperando a operação de I/O terminar. A thread fica executando até que seja notificada pelo sistema operacional que o I/O terminou.

Esperto, né?

Dá muita diferença?

Para exemplificar a diferença entre o I/O síncrono e assíncrono, montei o exemplo abaixo. A idéia é realizar 10 vezes um processamento que combina o uso de recursos de CPU (de novo o grande e gastão BubbleSort) com I/O.

Exemplo síncrono:

        [TestMethod]
        public void BigSyncIOTest()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            for (int i = 0; i < 10; i++)
            {                
                FileStream fs = new FileStream("file1.txt", FileMode.Create, FileAccess.ReadWrite);
                byte[] buffer = new byte[100 * 1024 * 1024]; //Não faça isso. Aloca 100Mb de memória.            
                fs.Write(buffer, 0, buffer.Length);
                fs.Flush();
                fs.Close();

                BubbleSort.DoBigBubbleSort(10000, null);
            }

            watch.Stop();
            Console.WriteLine("Total elapsed (ms): " + watch.ElapsedMilliseconds);
        }

O exemplo síncrono é realmente bem simples. Escreve 100Mb em disco, depois faz um bubble sort numa matriz invertida de 10000 elementos.

O exemplo assíncrono é mais complexo (implementado desta maneira propositalmente com o APM para fins didáticos). O método BeginWrite recebe dois parâmetros adicionais, um AsyncCallback e um object para manter estado.

A idéia deste modelo de programação é que o método passado como callback é chamado quando a operação de I/O termina.

A classe utilizada para manter estado, segue essa estrutura:

    public class State
    {
        public int Current
        {
            get;
            set;
        }

        public FileStream FileStream
        {
            get;
            set;
        }
    }

O método principal:

        private ManualResetEvent ev;

        [TestMethod]
        public void BigAsyncIOTest()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();

            ev = new ManualResetEvent(false);

            WriteFinishedCallback(null);
            ev.WaitOne();

            watch.Stop();
            Console.WriteLine("Total elapsed (ms): " + watch.ElapsedMilliseconds);
        }

        public void WriteFinishedCallback(IAsyncResult ar)
        {
            State state = null;
            if (ar == null)
            {
                state = new State();
                state.Current = 0;
            }
            else{
                state = (State)ar.AsyncState;
                state.FileStream.EndWrite(ar);
                state.FileStream.Flush();
                state.FileStream.Close();
                state.Current++;
                if (state.Current == 10)
                {
                    ev.Set();
                    return;
                }
            }            

            FileStream fs = new FileStream("file1.txt", FileMode.Create, FileAccess.ReadWrite);
            byte[] buffer = new byte[100 * 1024 * 1024]; //Não faça isso. Aloca 100Mb de memória.            
            fs.BeginWrite(buffer, 0, buffer.Length, WriteFinishedCallback, state);
            state.FileStream = fs;

            BubbleSort.DoBigBubbleSort(10000, null);

        }      

Vejam que um objeto de sincronização é necessário. Ele será acionado de dentro do método WriteFinishedCallback, que será chamado da primeira vez do método principal e será chamado recursivamente até completar 10 vezes. Realmente é bem chatinho implementar esse modelo e debugar então, fora de questão. Debugar esse tipo de código assíncrono é uma tarefa das mais ingratas.

O “payload” do processamento é o mesmo. O mesmo bubble sort gastão, e a mesma escrita em disco. Os resultados:

  • Síncrono: 21781 milisegundos
  • Assíncrono: 9111 milisegundos

A explicação é simples. No modelo assíncrono, enquanto a CPU espera I/O ela fica ordenando a matriz. No modelo síncrono, ela espera toda a operação de I/O terminar para então começar a processar o bubble sort. Fica menos da metade do processamento total, utilizando uma única thread.

Já imaginou isso escalado para toda uma aplicação, fortemente orientada a banco de dados?