Objetivo
Tcp Server… de novo??
Sim. Na verdade essa história começou logo que eu publiquei o post da parte 10. Meu amigo Alexandre Costa (aka Magoo) estava acompanhando o post depois que discutimos o assunto num café e logo que saiu, ele me mandou via twitter: “Por que você não usou a TPL?”.
Estragou meu dia. Toca a procurar sobre a tal da TPL e entender o que muda na prática.
A idéia aqui foi essa, fui atrás do que é a lib, uma forma de aplicá-la e ver o quanto agrega ao problema como um todo.
TPL ou Task Parallel Library
A Task Parallel Library é uma lib para trabalhar com paralelismo no .NET. A documentação da mesma encontra-se em: http://msdn.microsoft.com/en-us/library/dd460717.aspx.
No meu problema, percebi que tinha 2 situações em específico que ela poderia ajudar:
- Usar o Parallel.Invoke, para processar a resposta do servidor em paralelo.
- Usar o “pattern” para programação assíncrona proposto pela TPL (http://msdn.microsoft.com/en-us/library/dd997423(v=vs.100).aspx)
Vamos à prática.
Tcp Server 3
Nesta versão do cliente e servidor do TCP, o que muda na prática é:
public void ProcessInputQueue() { while (true) { TcpClient2InputStreamContext ctx = null; lock (inputQueue) { if (inputQueue.Count > 0) ctx = inputQueue.Dequeue(); } if (ctx == null) { Thread.Sleep(250); continue; } Parallel.Invoke(() => { Stream inputStream = ctx.InputStream; ctx.InputStream = null; inputStream.Seek(0, SeekOrigin.Begin); Log.LogMessage("Before import stream " + ctx.ID.ToString()); StreamUtil.ImportarStream(ConnString, inputStream, Log); Log.LogMessage("Stream imported " + ctx.ID.ToString()); responsesProcessed++; }); if (responsesProcessed == TotalBatches) break; } processingDone.Set(); }
Fiz outras alterações no código, colocando um ID na pergunta e devolvendo pelo servidor, para melhorar situações de debug. Eu realmente peguei uma série de bugs nesses meus exemplos relacionados a sincronização. Ainda não está totalmente estável, mas como disse anteriormente, está longe de ser uma implementação de referência para o assunto.
Tcp Server 4
Além das melhorias feitas na versão 3, na 4 eu implementei também o pattern assíncrono proposto. Na prática, tudo que é BeginRead, EndRead e BeginWrite e EndWrite são substituídos por construções na TPL. Isso tudo foi tratado no cliente e no servidor.
Na prática, não muda nada. Existe a vantagem de poder inserir um callback para tratamento de exceções (exceções em threads não param o fluxo principal e são bem chatas de debugar) e não ter que chamar o “EndRead” no callback (ele já é chamado automaticamente pela TPL).
Fica mais ou menos assim:
private void WriteHeader(NetworkStream stream) { batchesSent++; if (batchesSent > TotalBatches) { //Mark end of transfer. byte[] b = new byte[sizeof(long)]; b = BitConverter.GetBytes((long)0); stream.Write(b, 0, sizeof(long)); sendDone.Set(); return; } Log.LogMessage("Sending request " + batchesSent.ToString()); TcpClient2OutputStreamContext ctx = new TcpClient2OutputStreamContext(); ctx.OutputStream = new MemoryStream(); ctx.ClientStream = stream; StreamUtil.GenerateBigRequest(ctx.OutputStream, false, recordCount, recordCount + (BatchSize - 1)); ctx.OutputStream.Seek(0, SeekOrigin.Begin); byte[] header = BitConverter.GetBytes(ctx.OutputStream.Length); Task.Factory.FromAsync<byte[], int, int>(stream.BeginWrite, stream.EndWrite, header, 0, header.Length, ctx).ContinueWith(BeginWriteCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); recordCount += BatchSize; }
O trecho que compõe a chamada da TPL é o Task.Factory.FromAsync. Vou tentar explicar:
- No FromAsync, significa que os 3 parâmetros tratados pela função serão dos tipos especificados (que são os parâmetros da função BeginWrite)
- stream.BeginWrite é o método que será chamado, usando o pattern assíncrono
- stream.EndWrite é o método que será chamado imediatamente antes do callback. Ele é chamamdo automaticamente pela biblioteca, por isso foi removido do callback (no caso BeginWriteCallback)
- header, 0 e header.Length são os parâmetros que serão passados para BeginWrite
- ctx é o objeto de estado que será devolvido também no callback.
- Em seguida, pode-se encadear vários “ContinueWith”. O primeiro, BeginWriteCallback é de fato para tratar o processamento da Stream. O segundo será chamado somente no caso de erro, para mandar para a console o erro
Agora veremos como fica o callback:
private void BeginWriteCallback(Task task) { TcpClient2OutputStreamContext ctx = (TcpClient2OutputStreamContext)task.AsyncState; if (ctx.OutputStream.Position < ctx.OutputStream.Length) { byte[] buffer = new byte[bufferSize]; int read = ctx.OutputStream.Read(buffer, 0, buffer.Length); Task t = Task.Factory.FromAsync<byte[], int, int>(ctx.ClientStream.BeginWrite, ctx.ClientStream.EndWrite, buffer, 0, read, ctx); t.ContinueWith(BeginWriteCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); } else { WriteHeader(ctx.ClientStream); } }
O callback recebe como parâmetro a Task. A propriedade AsyncState é quem devolve o objeto de estado passado no chamada da função como parâmetro.
Outro exemplo interessante é a leitura. Como neste caso, existe o retorno da quantidade de bytes que foi lida na stream que é muito importante para o tratamento da leitura. Essa construção fica da seguinte forma:
private void ReadHeader(NetworkStream stream) { contextCount++; TcpClient2InputStreamContext ctx = new TcpClient2InputStreamContext(); ctx.ID = contextCount; ctx.ClientStream = stream; ctx.Header = new byte[sizeof(long)]; ctx.HeaderRead = false; Task<int>.Factory.FromAsync<byte[], int, int>(stream.BeginRead, stream.EndRead, ctx.Header, 0, ctx.Header.Length, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); ; }
Neste caso, usa-se a construção Task. Apenas isso muda. O callback fica da seguinte maneira:
private void BeginReadCallback(Task<int> task) { TcpClient2InputStreamContext ctx = (TcpClient2InputStreamContext)task.AsyncState; int bytesRead = task.Result; Log.LogMessage("Read " + bytesRead.ToString() + " bytes"); if (!ctx.HeaderRead) { Log.LogMessage("Reading Header"); ctx.ResponseSize = BitConverter.ToInt64(ctx.Header, 0); ctx.TotalRead = 0; Log.LogMessage("Response Size: " + ctx.ResponseSize.ToString()); ctx.Header = null; //I don't need this buffer anymore. ctx.HeaderRead = true; ctx.InputStream = new MemoryStream(); ctx.InputStream.SetLength(ctx.ResponseSize); if (ctx.ResponseSize > 0) { ctx.Buffer = new byte[bufferSize]; int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead); Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes"); Task<int>.Factory.FromAsync<byte[], int, int>(ctx.ClientStream.BeginRead, ctx.ClientStream.EndRead, ctx.Buffer, 0, bytesToRead, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); ; } else receiveDone.Set(); } else { ctx.TotalRead += bytesRead; Log.LogMessage("Writing " + bytesRead.ToString() + " to input stream"); ctx.InputStream.Write(ctx.Buffer, 0, bytesRead); if (ctx.TotalRead < ctx.ResponseSize) { int bytesToRead = (ctx.ResponseSize - ctx.TotalRead) > ctx.Buffer.Length ? ctx.Buffer.Length : (int)(ctx.ResponseSize - ctx.TotalRead); Log.LogMessage("Reading " + bytesToRead.ToString() + " bytes"); ctx.Buffer = new byte[bufferSize]; Task<int>.Factory.FromAsync<byte[], int, int>(ctx.ClientStream.BeginRead, ctx.ClientStream.EndRead, ctx.Buffer, 0, bytesToRead, ctx).ContinueWith(BeginReadCallback).ContinueWith(TaskExceptionHandler, TaskContinuationOptions.OnlyOnFaulted); } else { Log.LogMessage("Finished reading " + ctx.InputStream.Length.ToString() + " bytes"); lock (inputQueue) inputQueue.Enqueue(ctx); ReadHeader(ctx.ClientStream); } } }
Os pontos relevantes é que o tipo recebido no callback também é Task. A forma de obter o objeto de estado é a mesma e a resposta do método BeginRead, vem pelo parâmetro task.Result. O resto, mesma coisa.
Os tempos
Agora é a parte divertida. Vou comparar as 3 coisas: O modo “roots” que eu fiz anteriormente, criando todas as threads e tratamentos manualmente (Tcp Server 2), a versão 3 que somente trata o processamento de forma paralela e a versão 4 que também implementa o pattern assíncrono. Seguem os números:
Código fonte
O código fonte está atualizado no git hub: https://github.com/ericlemes/IntegrationTests.
Ele tem uma série de coisas a melhorar, existem problemas de isolamento de responsabilidades e muita coisa a refatorar, porém, acredito que atinge o objetivo aqui proposto.
Conclusão
Como vemos nos números, as diferenças são insignificantes. Meu palpite é que a quantidade de paralelismo envolvida é muito pequena. O pesado do processamento está mesmo em tratar as streams de forma assíncrona com I/O não bloqueante (implementado no servidor 2).
Sobre a TPL, ela implementa uma sintaxe bastante elegante. Não pude estudá-la de forma exaustiva, mas a princípio não vejo nada que ela traga de novo em relação ao tratamento manual das threads.
Isso mais uma vez reforça minha idéia de aprender os conceitos na raiz. O princípio de como lidar com threads é o mesmo independente de como é a implementação. Muitas dessas bibliotecas tem por objetivo tornar a sintaxe apenas mais simples, o que pode ser as vezes uma armadilha. A simplicidade por vezes acaba escondendo o conceito que existe por tras da tecnologia fazendo com que a usemos de forma incorreta. Este exercício é bem didático neste sentido. Existe um consenso de que “sempre” paralelizar trás ganhos de performance o que na prática não se mostra verdadeiro.