Melhor maneira de mover mensagens do DLQ no Amazon SQS?

87

Qual é a prática recomendada para mover mensagens de uma fila de mensagens não entregues de volta para a fila original no Amazon SQS?

Seria

  1. Obter mensagem de DLQ
  2. Escreva a mensagem na fila
  3. Excluir mensagem de DLQ

Ou existe uma maneira mais simples?

Além disso, a AWS eventualmente terá uma ferramenta no console para mover as mensagens do DLQ?

Matt Dell
fonte
github.com/garryyao/replay-aws-dlq funciona muito bem
Ulad Kasach
também outra alternativa github.com/mercury2269/sqsmover
Sergey

Respostas:

131

Aqui está um hack rápido. Definitivamente, essa não é a melhor opção nem a recomendada.

  1. Defina a fila SQS principal como DLQ para o DLQ real com Máximo de Recebimentos como 1.
  2. Visualize o conteúdo em DLQ (isso moverá as mensagens para a fila principal, pois este é o DLQ para o DLQ real)
  3. Remova a configuração para que a fila principal não seja mais o DLQ do DLQ real
Rajkumar
fonte
12
Sim, isso é muito um hack - mas uma boa opção para uma solução rápida se você sabe o que está fazendo e não tem tempo para resolver isso da maneira adequada #yolo
Thomas Watson
14
Mas a contagem de recebimento não é redefinida para 0 quando você faz isso. Seja cuidadoso.
Rajdeep Siddhapura
1
A abordagem certa é configurar a Política de Redrive em SQS com contagem máxima de recebimento e ela moverá automaticamente a mensagem para DLQ quando cruzar a contagem de recebimento definida e, em seguida, escreverá um thread de leitor para ler DLQ.
Ash
5
Você é um génio.
JefClaes
1
Eu criei uma ferramenta CLI para este problema alguns meses atrás: github.com/renanvieira/phoenix-letter
MaltMaster
14

Existem alguns scripts que fazem isso para você:

# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 
Ulad Kasach
fonte
1
Esta é a maneira mais simples, ao contrário da resposta aceita. Basta executá-lo a partir do terminal que tem a propriedade AWS env vars definida:npx replay-aws-dlq DL_URI MAIN_URI
Vasyl Boroviak 01 de
Nota erro de digitação: dql -> dlq # install npm install replay-aws-dlq;
Lee Oades
Isso funcionou perfeitamente para mim (note, eu só tentei o baseado em go). Parecia mover as mensagens em etapas e não todas de uma vez (o que é bom) e até tinha uma barra de progresso. Melhor do que a resposta aceita IMO.
Yevgeny Ananin
13

Não precisa mover a mensagem porque ela virá com muitos outros desafios, como mensagens duplicadas, cenários de recuperação, mensagem perdida, verificação de eliminação de duplicação e etc.

Aqui está a solução que implementamos -

Normalmente, usamos o DLQ para erros transitórios, não para erros permanentes. Então, peguei a abordagem abaixo -

  1. Leia a mensagem do DLQ como uma fila normal

    Benefícios
    • Para evitar o processamento duplicado de mensagens
    • Melhor controle no DLQ- Como eu coloquei um cheque, para processar apenas quando a fila normal estiver completamente processada.
    • Amplie o processo com base na mensagem em DLQ
  2. Em seguida, siga o mesmo código que a fila regular está seguindo.

  3. Mais confiável no caso de abortar o trabalho ou o processo foi encerrado durante o processamento (por exemplo, instância interrompida ou processo encerrado)

    Benefícios
    • Reutilização de código
    • Manipulação de erros
    • Recuperação e repetição da mensagem
  4. Estenda a visibilidade da mensagem para que nenhum outro thread as processe.

    Beneficiar
    • Evite processar o mesmo registro por vários threads.
  5. Exclua a mensagem apenas quando houver um erro permanente ou com êxito.

    Beneficiar
    • Continue processando até obter um erro temporário.
Cinza
fonte
Eu realmente gosto da sua abordagem! Como você define "erro permanente" neste caso?
DMac the Destroyer
Qualquer coisa maior que o código de status HTTP> 200 <500 é um erro permanente
Ash
esta é de fato uma boa abordagem na produção. no entanto, acho que esta postagem está perguntando simplesmente como reenviar mensagens de DLQ para a fila normal. o que às vezes é útil se você souber o que está fazendo.
linehrr
Isso é o que estou dizendo que você não deve fazer isso. Porque se você fizer isso criará mais problemas. Podemos mover a mensagem como qualquer outro envio de mensagem, mas perderemos as funcionalidades DLQ como contagem de recebimento, visibilidade e tudo. Será tratado como uma nova mensagem.
Ash
6

Essa parece ser sua melhor opção. Existe a possibilidade de que o seu processo falhe após a etapa 2. Nesse caso, você acabará copiando a mensagem duas vezes, mas seu aplicativo deve lidar com a reenvio de mensagens (ou não se importar) de qualquer maneira.

Dave
fonte
6

aqui:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()
Brian Dilley
fonte
Isso é Python?
carlin.scott
python2 na verdade
Kristof Jozsa
4

Existe uma outra maneira de fazer isso sem escrever uma única linha de código. Considere que o nome real da fila é SQS_Queue e o DLQ para ele é SQS_DLQ. Agora siga estas etapas:

  1. Defina SQS_Queue como o dlq de SQS_DLQ. Uma vez que SQS_DLQ já é um dlq de SQS_Queue. Agora, ambos estão agindo como dlq um do outro.
  2. Defina a contagem máxima de recebimento de seu SQS_DLQ como 1.
  3. Agora leia as mensagens do console SQS_DLQ. Como a contagem de recebimento de mensagens é 1, ele enviará todas as mensagens para seu próprio dlq, que é sua fila SQS_Queue real.
Priyanka Agarwal
fonte
Isso anulará o propósito de manter um DLQ. DLQ destina-se a não sobrecarregar seu sistema quando você estiver observando falhas, para que você possa fazer isso mais tarde.
Buda,
Isso definitivamente anulará o propósito e você não será capaz de obter outros benefícios, como aumento de escala, limitação e contagem de recebimento. Além disso, você deve usar a fila regular como fila de processamento e se a contagem de recebimento de mensagens atingir 'N', ela deve ir para DLQ. Isso é o que idealmente deve ser configurado.
Ash
3
Como uma solução única para redirecionar muitas mensagens, funciona perfeitamente. Porém, não é uma boa solução de longo prazo.
nmio
Sim, isso é extremamente valioso como uma solução única para redriver mensagens (depois de corrigir o problema na fila principal). Em AWS CLI o comando que eu usei é: aws sqs receive-message --queue-url <url of DLQ> --max-number-of-messages 10. Como as mensagens máximas podem ser lidas em maiúsculas em 10, sugiro executar o comando em um loop como este:for i in {1..1000}; do <CMD>; done
Patrick Finnigan
3

Eu escrevi um pequeno script python para fazer isso, usando boto3 lib:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

você pode obter este script neste link

esse script basicamente pode mover mensagens entre quaisquer filas arbitrárias. e suporta filas fifo, assim como você pode abastecer o message_group_idcampo.

linehrr
fonte
3

Usamos o seguinte script para redirecionar a mensagem da fila src para a fila tgt:

nome do arquivo: redrive.py

uso: python redrive.py -s {source queue name} -t {target queue name}

'''
This script is used to redrive message in (src) queue to (tgt) queue

The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1. 
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.

Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--src', required=True,
                        help='Name of source SQS')
    parser.add_argument('-t', '--tgt', required=True,
                        help='Name of targeted SQS')

    args = parser.parse_args()
    return args


def verify_queue(queue_name):
    queue_url = sqs.get_queue_url(QueueName=queue_name)
    return True if queue_url.get('QueueUrl') else False


def get_queue_attribute(queue_url):
    queue_attributes = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['All'])['Attributes']
    print(queue_attributes)

    return queue_attributes


def main():
    args = parse_args()
    for q in [args.src, args.tgt]:
        if not verify_queue(q):
            print(f"Cannot find {q} in AWS SQS")

    src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']

    target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
    target_queue_attributes = get_queue_attribute(target_queue_url)

    # Set the Source Queue's Redrive policy
    redrive_policy = {
        'deadLetterTargetArn': target_queue_attributes['QueueArn'],
        'maxReceiveCount': '1'
    }
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '5',
            'RedrivePolicy': json.dumps(redrive_policy)
        }
    )
    get_queue_attribute(src_queue_url)

    # read all messages
    num_received = 0
    while True:
        try:
            resp = sqs.receive_message(
                QueueUrl=src_queue_url,
                MaxNumberOfMessages=10,
                AttributeNames=['All'],
                WaitTimeSeconds=5)

            num_message = len(resp.get('Messages', []))
            if not num_message:
                break

            num_received += num_message
        except Exception:
            break
    print(f"Redrive {num_received} messages")

    # Reset the Source Queue's Redrive policy
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '30',
            'RedrivePolicy': ''
        }
    )
    get_queue_attribute(src_queue_url)


if __name__ == "__main__":
    main()
menrfa
fonte
0

DLQ entra em ação apenas quando o consumidor original não consegue consumir a mensagem com sucesso após várias tentativas. Não queremos excluir a mensagem, pois acreditamos que ainda podemos fazer algo com ela (talvez tentar processá-la novamente ou registrá-la ou coletar algumas estatísticas) e não queremos continuar encontrando esta mensagem repetidamente e impedir a capacidade de processar outras mensagens por trás desta.

DLQ nada mais é que outra fila. O que significa que precisaríamos escrever um consumidor para DLQ que idealmente seria executado com menos frequência (em comparação com a fila original) que consumiria de DLQ e produziria a mensagem de volta para a fila original e a excluiria de DLQ - se esse for o comportamento pretendido e nós pensamos o consumidor original estaria agora pronto para processá-lo novamente. Deve estar tudo bem se este ciclo continuar por um tempo, pois agora também temos a oportunidade de inspecionar manualmente e fazer as alterações necessárias e implantar outra versão do consumidor original sem perder a mensagem (dentro do período de retenção de mensagem, é claro - que é de 4 dias até padrão).

Seria bom se a AWS fornecesse esse recurso pronto para uso, mas ainda não o vejo - eles estão deixando isso para o usuário final usá-lo da maneira que achar adequada.

rd2
fonte