Spring Scheduled Task em execução em ambiente de cluster

98

Estou escrevendo um aplicativo que tem um cron job executado a cada 60 segundos. O aplicativo é configurado para escalar quando necessário em várias instâncias. Eu só quero executar a tarefa em 1 instância a cada 60 segundos (em qualquer nó). Fora da caixa, não consigo encontrar uma solução para isso e estou surpreso que não tenha sido perguntado várias vezes antes. Estou usando o Spring 4.1.6.

    <task:scheduled-tasks>
        <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
user3131879
fonte
7
Acho que Quartz é a melhor solução para você: stackoverflow.com/questions/6663182/…
selalerer
Alguma sugestão sobre como usar CronJobno kubernetes?
ch271828n

Respostas:

97

Existe um projeto ShedLock que serve exatamente para esse propósito. Você apenas anota tarefas que devem ser bloqueadas quando executadas

@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
   // do something
}

Configure Spring e um LockProvider

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
class MySpringConfiguration {
    ...
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
       return new JdbcTemplateLockProvider(dataSource);
    }
    ...
}
Lukas
fonte
1
Só quero dizer "Bom trabalho!". Mas ... O bom recurso seria se a biblioteca pudesse descobrir o nome do banco de dados sem fornecê-lo explícito no código ... Exceto que funciona muito bem!
Krzysiek
Funciona para mim com o Oracle e Spring boot data jpa starter.
Mahendran Ayyarsamy Kandiar
Esta solução funciona para Spring 3.1.1.RELEASE e java 6? Por favor diga.
Vikas Sharma
Eu tentei com MsSQL e Spring boot JPA e usei o script liquibase para a parte SQL .. funciona bem .. Obrigado
planilha de
De fato, está funcionando bem. No entanto, eu encontrei um caso um pouco complexo aqui, poderia dar uma olhada. Obrigado!!! stackoverflow.com/questions/57691205/…
Dayton Wang
15

Esta é outra maneira simples e robusta de executar com segurança uma tarefa em um cluster. Você pode basear-se no banco de dados e executar a tarefa somente se o nó for o "líder" no cluster.

Além disso, quando um nó falha ou é encerrado no cluster, outro nó se torna o líder.

Tudo que você precisa é criar um mecanismo de "eleição de líder" e sempre verificar se você é o líder:

@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

Siga essas etapas:

1.Defina o objeto e a tabela que contém uma entrada por nó no cluster:

@Entity(name = "SYS_NODE")
public class SystemNode {

/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;

/** The ip. */
@Column(name = "IP")
private String ip;

/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;

/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();

/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;

public Long getId() {
    return id;
}

public void setId(final Long id) {
    this.id = id;
}

public String getTimestamp() {
    return timestamp;
}

public void setTimestamp(final String timestamp) {
    this.timestamp = timestamp;
}

public String getIp() {
    return ip;
}

public void setIp(final String ip) {
    this.ip = ip;
}

public Date getLastPing() {
    return lastPing;
}

public void setLastPing(final Date lastPing) {
    this.lastPing = lastPing;
}

public Date getCreatedAt() {
    return createdAt;
}

public void setCreatedAt(final Date createdAt) {
    this.createdAt = createdAt;
}

public Boolean getIsLeader() {
    return isLeader;
}

public void setIsLeader(final Boolean isLeader) {
    this.isLeader = isLeader;
}

@Override
public String toString() {
    return "SystemNode{" +
            "id=" + id +
            ", timestamp='" + timestamp + '\'' +
            ", ip='" + ip + '\'' +
            ", lastPing=" + lastPing +
            ", createdAt=" + createdAt +
            ", isLeader=" + isLeader +
            '}';
}

}

2. Criar o serviço que a) insira o nó no banco de dados, b) verifique o líder

@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {

/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);

/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";

/** The ip. */
private String ip;

/** The system service. */
private SystemService systemService;

/** The system node repository. */
private SystemNodeRepository systemNodeRepository;

@Autowired
public void setSystemService(final SystemService systemService) {
    this.systemService = systemService;
}

@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
    this.systemNodeRepository = systemNodeRepository;
}

@Override
public void pingNode() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    if (node == null) {
        createNode();
    } else {
        updateNode(node);
    }
}

@Override
public void checkLeaderShip() {
    final List<SystemNode> allList = systemNodeRepository.findAll();
    final List<SystemNode> aliveList = filterAliveNodes(allList);

    SystemNode leader = findLeader(allList);
    if (leader != null && aliveList.contains(leader)) {
        setLeaderFlag(allList, Boolean.FALSE);
        leader.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    } else {
        final SystemNode node = findMinNode(aliveList);

        setLeaderFlag(allList, Boolean.FALSE);
        node.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    }
}

/**
 * Returns the leaded
 * @param list
 *          the list
 * @return  the leader
 */
private SystemNode findLeader(final List<SystemNode> list) {
    for (SystemNode systemNode : list) {
        if (systemNode.getIsLeader()) {
            return systemNode;
        }
    }
    return null;
}

@Override
public boolean isLeader() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    return node != null && node.getIsLeader();
}

@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
    try {
        ip = InetAddress.getLocalHost().getHostAddress();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    if (applicationEvent instanceof ContextRefreshedEvent) {
        pingNode();
    }
}

/**
 * Creates the node
 */
private void createNode() {
    final SystemNode node = new SystemNode();
    node.setIp(ip);
    node.setTimestamp(String.valueOf(System.currentTimeMillis()));
    node.setCreatedAt(new Date());
    node.setLastPing(new Date());
    node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
    systemNodeRepository.save(node);
}

/**
 * Updates the node
 */
private void updateNode(final SystemNode node) {
    node.setLastPing(new Date());
    systemNodeRepository.save(node);
}

/**
 * Returns the alive nodes.
 *
 * @param list
 *         the list
 * @return the alive nodes
 */
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
    int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
    final List<SystemNode> finalList = new LinkedList<>();
    for (SystemNode systemNode : list) {
        if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
            finalList.add(systemNode);
        }
    }
    if (CollectionUtils.isEmpty(finalList)) {
        LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
        throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
    }
    return finalList;
}

/**
 * Finds the min name node.
 *
 * @param list
 *         the list
 * @return the min node
 */
private SystemNode findMinNode(final List<SystemNode> list) {
    SystemNode min = list.get(0);
    for (SystemNode systemNode : list) {
        if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
            min = systemNode;
        }
    }
    return min;
}

/**
 * Sets the leader flag.
 *
 * @param list
 *         the list
 * @param value
 *         the value
 */
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
    for (SystemNode systemNode : list) {
        systemNode.setIsLeader(value);
    }
}

}

3. acessar o banco de dados para enviar que você está vivo

@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
    systemNodeService.pingNode();
}

@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
    systemNodeService.checkLeaderShip();
}

4. você está pronto! Basta verificar se você é o líder antes de executar a tarefa:

@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}
mspapant
fonte
Nesse caso, o que são SystemService e SettingEnum? Parece que é extremamente simples e retorna apenas um valor de tempo limite. Nesse caso, por que não apenas codificar o tempo limite?
tlavarea
@mspapant, o que é SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT? Qual é o valor ideal que devo usar aqui?
user525146
@tlavarea você implementou este código, tenho uma pergunta sobre o método DateUtils.hasExpired? é um método personalizado ou é um utils comum do apache?
user525146
10

Os trabalhos em lote e agendados são normalmente executados em seus próprios servidores autônomos, longe de aplicativos voltados para o cliente, portanto, não é um requisito comum incluir um trabalho em um aplicativo que deve ser executado em um cluster. Além disso, as tarefas em ambientes em cluster normalmente não precisam se preocupar com outras instâncias da mesma tarefa em execução em paralelo, portanto, outra razão pela qual o isolamento das instâncias de tarefa não é um grande requisito.

Uma solução simples seria configurar seus jobs dentro de um Spring Profile. Por exemplo, se sua configuração atual é:

<beans>
  <bean id="someBean" .../>

  <task:scheduled-tasks>
    <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
  </task:scheduled-tasks>
</beans>

mude para:

<beans>
  <beans profile="scheduled">
    <bean id="someBean" .../>

    <task:scheduled-tasks>
      <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
  </beans>
</beans>

Em seguida, inicie seu aplicativo em apenas uma máquina com o scheduledperfil ativado ( -Dspring.profiles.active=scheduled).

Se o servidor primário ficar indisponível por algum motivo, basta iniciar outro servidor com o perfil habilitado e tudo continuará funcionando bem.


As coisas mudam se você quiser failover automático para os trabalhos também. Em seguida, você precisará manter o trabalho em execução em todos os servidores e verificar a sincronização por meio de um recurso comum, como uma tabela de banco de dados, um cache em cluster, uma variável JMX, etc.

manish
fonte
58
Esta é uma solução válida, mas irá violar a ideia por trás de um ambiente em cluster, onde se um nó estiver inativo, o outro nó pode atender a outras solicitações. Nesta solução alternativa, se o nó com o perfil "programado" ficar inativo, este trabalho em segundo plano não será executado
Ahmed Hashem
3
Acho que poderíamos usar o Redis com operação atômica gete setpara arquivar isso.
Thanh Nguyen Van
Existem vários problemas com sua sugestão: 1. Você geralmente deseja que cada nó de um cluster tenha exatamente a mesma configuração, então eles serão 100% intercambiáveis ​​e requerem os mesmos recursos sob a mesma carga que compartilham. 2. Sua solução exigiria intervenção manual quando o nó "tarefa" fosse desativado. 3. Isso ainda não garantiria que o trabalho foi realmente executado com sucesso, porque o nó "tarefa" caiu antes de concluir o processamento da execução atual e o novo "executor de tarefa" foi criado após o primeiro cair, sem saber se tinha terminado ou não.
Moshe Bixenshpaner
1
simplesmente viola a ideia de ambientes em cluster, não pode haver nenhuma solução com a abordagem que você sugeriu. Você não pode replicar nem mesmo os servidores de perfil para garantir a disponibilidade, pois isso resultará em custo adicional e desperdício desnecessário de recursos também. A solução sugerida por @Thanh é muito mais limpa do que isso. Pense no mesmo como um MUTEX. Qualquer servidor executando o script irá adquirir um bloqueio temporário em algum cache distribuído como o redis e então prosseguir com os conceitos de bloqueio tradicional.
anuj pradhan
2

dlock é projetado para executar tarefas apenas uma vez usando índices e restrições de banco de dados. Você pode simplesmente fazer algo como abaixo.

@Scheduled(cron = "30 30 3 * * *")
@TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
public void execute() {

}

Veja o artigo sobre como usá-lo.

Will Hughes
fonte
3
Se estiver usando dlock .Assume, estamos usando DB para manter o bloqueio. E um dos nós no cluster caiu inesperadamente após ser bloqueado, então o que acontecerá neste cenário? Estará em estado de deadlock?
Badman
1

Estou usando uma tabela de banco de dados para fazer o bloqueio. Apenas uma tarefa por vez pode fazer uma inserção na mesa. O outro obterá uma DuplicateKeyException. A lógica de inserção e exclusão é tratada por um aspecto em torno da anotação @Scheduled. Estou usando Spring Boot 2.0

@Component
@Aspect
public class SchedulerLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);

    @Autowired
    private JdbcTemplate jdbcTemplate;  

    @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
    public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {

        String jobSignature = joinPoint.getSignature().toString();
        try {
            jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});

            Object proceed = joinPoint.proceed();

            jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
            return proceed;

        }catch (DuplicateKeyException e) {
            LOGGER.warn("Job is currently locked: "+jobSignature);
            return null;
        }
    }
}


@Component
public class EveryTenSecondJob {

    @Scheduled(cron = "0/10 * * * * *")
    public void taskExecution() {
        System.out.println("Hello World");
    }
}


CREATE TABLE scheduler_lock(
    signature varchar(255) NOT NULL,
    date datetime DEFAULT NULL,
    PRIMARY KEY(signature)
);
RenRen
fonte
3
Você acha que vai funcionar perfeitamente? Porque se um dos nós ficar inativo após ser bloqueado, então os outros não saberão porque há bloqueio (no seu caso, a entrada de linha correspondente ao trabalho na tabela).
Badman
0

Você pode usar um agendador embutido como db-scheduler para fazer isso. Possui execuções persistentes e usa um mecanismo de bloqueio otimista simples para garantir a execução por um único nó.

Código de exemplo de como o caso de uso pode ser alcançado:

   RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
    .execute((taskInstance, executionContext) -> {
        System.out.println("Executing " + taskInstance.getTaskAndInstance());
    });

   final Scheduler scheduler = Scheduler
          .create(dataSource)
          .startTasks(recurring1)
          .build();

   scheduler.start();
Gustav Karlsson
fonte
-1

O contexto Spring não é agrupado, portanto, gerenciar a tarefa em um aplicativo distribuído é um pouco difícil e você precisa usar sistemas que suportam jgroup para sincronizar o estado e permitir que sua tarefa tenha prioridade para executar a ação. Ou você pode usar o contexto ejb para gerenciar o serviço de singleton ha clusterizado como o ambiente jboss ha https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd Ou você pode usar o cache clusterizado e o recurso de bloqueio de acesso entre o serviço e o primeiro serviço, o bloqueio será feito a ação ou implementar seu próprio jgroup para comunicar seu serviço e executar a ação em um nó

Abdulghaffar Al-Labadi
fonte