Estou trabalhando para refatorar certos aspectos de um serviço da web existente. A maneira como as APIs de serviço são implementadas é por meio de um tipo de "pipeline de processamento", onde existem tarefas que são executadas em sequência. Sem surpresa, as tarefas posteriores podem precisar de informações calculadas por tarefas anteriores e, atualmente, a maneira como isso é feito é adicionando campos a uma classe "estado do pipeline".
Eu estive pensando (e esperando?) Que existe uma maneira melhor de compartilhar informações entre as etapas do pipeline do que ter um objeto de dados com um zilhão de campos, alguns dos quais fazem sentido para algumas etapas de processamento e não para outras. Seria uma grande dor para tornar essa classe segura para threads (não sei se isso seria possível), não há como argumentar sobre seus invariantes (e é provável que não tenha nenhum).
Eu estava folheando o livro de padrões de design da Gang of Four para encontrar alguma inspiração, mas não sentia que houvesse uma solução lá (o Memento estava um pouco no mesmo espírito, mas não exatamente). Também procurei on-line, mas no momento em que você pesquisa por "pipeline" ou "fluxo de trabalho", você é inundado por informações sobre tubos Unix ou mecanismos e estruturas de fluxo de trabalho proprietários.
Minha pergunta é: como você abordaria a questão de registrar o estado de execução de um pipeline de processamento de software, para que tarefas posteriores possam usar as informações computadas pelas anteriores? Eu acho que a principal diferença com os pipes Unix é que você não se importa apenas com a saída da tarefa imediatamente anterior.
Conforme solicitado, algum pseudocódigo para ilustrar meu caso de uso:
O objeto "contexto do pipeline" possui vários campos que as diferentes etapas do pipeline podem preencher / ler:
public class PipelineCtx {
... // fields
public Foo getFoo() { return this.foo; }
public void setFoo(Foo aFoo) { this.foo = aFoo; }
public Bar getBar() { return this.bar; }
public void setBar(Bar aBar) { this.bar = aBar; }
... // more methods
}
Cada uma das etapas do pipeline também é um objeto:
public abstract class PipelineStep {
public abstract PipelineCtx doWork(PipelineCtx ctx);
}
public class BarStep extends PipelineStep {
@Override
public PipelineCtx doWork(PipelieCtx ctx) {
// do work based on the stuff in ctx
Bar theBar = ...; // compute it
ctx.setBar(theBar);
return ctx;
}
}
Da mesma forma, para um hipotético FooStep
, que pode precisar da barra computada pelo BarStep antes dela, juntamente com outros dados. E então temos a chamada de API real:
public class BlahOperation extends ProprietaryWebServiceApiBase {
public BlahResponse handle(BlahRequest request) {
PipelineCtx ctx = PipelineCtx.from(request);
// some steps happen here
// ...
BarStep barStep = new BarStep();
barStep.doWork(crx);
// some more steps maybe
// ...
FooStep fooStep = new FooStep();
fooStep.doWork(ctx);
// final steps ...
return BlahResponse.from(ctx);
}
}
fonte
Respostas:
O principal motivo para usar um design de pipeline é que você deseja desacoplar os estágios. Ou porque um estágio pode ser usado em vários pipelines (como as ferramentas de shell do Unix) ou porque você obtém algum benefício de dimensionamento (ou seja, é possível mover facilmente de uma arquitetura de nó único para uma arquitetura de vários nós).
Em ambos os casos, cada estágio do pipeline precisa receber tudo o que precisa para fazer seu trabalho. Não há razão para que você não possa usar um armazenamento externo (por exemplo, banco de dados), mas na maioria dos casos é melhor passar os dados de um estágio para outro.
No entanto, isso não significa que você deve ou deve passar um grande objeto de mensagem com todos os campos possíveis (embora veja abaixo). Em vez disso, cada estágio no pipeline deve definir interfaces para suas mensagens de entrada e saída, que identificam apenas os dados necessários para o estágio.
Você tem muita flexibilidade na maneira de implementar seus objetos de mensagem reais. Uma abordagem é usar um grande objeto de dados que implementa todas as interfaces necessárias. Outra é criar classes de wrapper em torno de um simples
Map
. Ainda outra é criar uma classe de wrapper em torno de um banco de dados.fonte
Há alguns pensamentos que vêm à mente, primeiro dos quais é que não tenho informações suficientes.
As respostas provavelmente me farão pensar com mais cuidado sobre o design, no entanto, com base no que você disse, existem duas abordagens que eu provavelmente consideraria primeiro.
Estruture cada estágio como seu próprio objeto. O enésimo estágio terá de 1 a n-1 como uma lista de delegados. Cada estágio encapsula os dados e o processamento dos dados; reduzindo a complexidade geral e os campos em cada objeto. Você também pode ter estágios posteriores acessando os dados conforme necessário a partir de estágios muito anteriores, percorrendo os delegados. Você ainda tem um acoplamento bastante rígido entre todos os objetos, porque os resultados dos estágios (ou seja, todos os atributos) são importantes, mas são significativamente reduzidos e cada estágio / objeto é provavelmente mais legível e compreensível. Você pode torná-lo seguro, tornando a lista de delegados preguiçosa e usando uma fila segura de threads para preencher a lista de delegados em cada objeto, conforme necessário.
Como alternativa, eu provavelmente faria algo semelhante ao que você está fazendo. Um grande objeto de dados que passa por funções que representam cada estágio. Isso geralmente é muito mais rápido e leve, mas mais complexo e propenso a erros por causa de apenas uma grande pilha de atributos de dados. Obviamente não é seguro para threads.
Honestamente, eu fiz o posterior com mais frequência para ETL e alguns outros problemas semelhantes. Eu estava focado no desempenho por causa da quantidade de dados e não pela capacidade de manutenção. Além disso, foram pontuais que não seriam usados novamente.
fonte
Parece um padrão de cadeia no GoF.
Um bom ponto de partida seria olhar para o que a cadeia comum faz.
fonte
Uma primeira solução que posso imaginar é explicitar as etapas. Cada um deles se torna um objeto capaz de processar um dado e transmiti-lo ao próximo objeto de processo. Cada processo produz um novo produto (idealmente imutável), para que não haja interação entre os processos e, portanto, não haja riscos devido ao compartilhamento de dados. Se alguns processos são mais demorados do que outros, você pode colocar um buffer entre dois processos. Se você explorar corretamente um agendador para multithreading, ele alocará mais recursos para liberar os buffers.
Uma segunda solução poderia ser pensar "mensagem" em vez de pipeline, possivelmente com uma estrutura dedicada. Você tem alguns "atores" recebendo mensagens de outros atores e enviando outras mensagens para outros atores. Você organiza seus atores em um pipeline e fornece seus dados primários para um primeiro ator que inicia a cadeia. Não há compartilhamento de dados, pois o compartilhamento é substituído pelo envio de mensagens. Eu sei que o modelo de ator do Scala pode ser usado em Java, pois não há nada específico do Scala aqui, mas nunca o usei em um programa Java.
As soluções são semelhantes e você pode implementar a segunda com a primeira. Basicamente, os principais conceitos são lidar com dados imutáveis para evitar os problemas tradicionais devido ao compartilhamento de dados e criar entidades explícitas e independentes que representam os processos em seu pipeline. Se você atender a essas condições, poderá criar facilmente pipelines claros e simples e usá-los em um programa paralelo.
fonte