Tabela de fila FIFO para vários trabalhadores no SQL Server

15

Eu estava tentando responder à seguinte pergunta do stackoverflow:

Depois de postar uma resposta um tanto ingênua, achei que colocaria meu dinheiro onde estava minha boca e realmente testaria o cenário que estava sugerindo, para ter certeza de que não estava enviando o OP em uma louca perseguição. Bem, acabou sendo muito mais difícil do que eu pensava (não há surpresa para ninguém, tenho certeza).

Aqui está o que eu tentei e pensei:

  • Primeiro, tentei um TOP 1 UPDATE com ORDER BY dentro de uma tabela derivada, usando ROWLOCK, READPAST. Isso gerou impasses e também processou itens fora de ordem. Ele deve estar o mais próximo possível do FIFO, impedindo erros que exijam a tentativa de processar a mesma linha mais de uma vez.

  • Então eu tentei selecionar o próximo QueueID desejado em uma variável, utilizando várias combinações de READPAST, UPDLOCK, HOLDLOCK, e ROWLOCKpara preservar exclusivamente a linha para atualização por essa sessão. Todas as variações que experimentei sofreram dos mesmos problemas de antes e, em certas combinações READPAST, com queixas:

    Você pode especificar apenas o bloqueio READPAST nos níveis de isolamento READ COMMITTED ou REPEATABLE READ.

    Isso foi confuso porque foi LIDO COMPROMISSO. Já deparei com isso antes e é frustrante.

  • Desde que comecei a escrever essa pergunta, Remus Rusani postou uma nova resposta para a pergunta. Li seu artigo vinculado e vi que ele estava usando leituras destrutivas, pois ele disse em sua resposta que "não é realisticamente possível manter travas durante a duração das chamadas pela web". Depois de ler o que o artigo dele diz sobre pontos de acesso e páginas que exigem bloqueio para fazer qualquer atualização ou exclusão, eu temo que, mesmo que eu fosse capaz de descobrir os bloqueios corretos para fazer o que estou procurando, ele não seria escalável e poderia não lide com simultaneidade maciça.

No momento, não tenho certeza para onde ir. É verdade que a manutenção de bloqueios enquanto a linha é processada não pode ser alcançada (mesmo que não tenha suporte a altos tps ou simultaneidade maciça)? o que estou perdendo?

Na esperança de que pessoas mais inteligentes que eu e pessoas mais experientes que eu possam ajudar, abaixo está o script de teste que eu estava usando. Ele voltou ao método TOP 1 UPDATE, mas deixei o outro método, comentei, caso você queira explorar isso também.

Cole cada um deles em uma sessão separada, execute a sessão 1 e rapidamente todos os outros. Em cerca de 50 segundos, o teste terminará. Veja as Mensagens de cada sessão para ver o que ele fez (ou como falhou). A primeira sessão mostrará um conjunto de linhas com um instantâneo tirado uma vez por segundo detalhando os bloqueios presentes e os itens da fila em processamento. Às vezes funciona e outras não.

Sessão 1

/* Session 1: Setup and control - Run this session first, then immediately run all other sessions */
IF Object_ID('dbo.Queue', 'U') IS NULL
   CREATE TABLE dbo.Queue (
      QueueID int identity(1,1) NOT NULL,
      StatusID int NOT NULL,
      QueuedDate datetime CONSTRAINT DF_Queue_QueuedDate DEFAULT (GetDate()),
      CONSTRAINT PK_Queue PRIMARY KEY CLUSTERED (QueuedDate, QueueID)
   );

IF Object_ID('dbo.QueueHistory', 'U') IS NULL
   CREATE TABLE dbo.QueueHistory (
      HistoryDate datetime NOT NULL,
      QueueID int NOT NULL
   );

IF Object_ID('dbo.LockHistory', 'U') IS NULL
   CREATE TABLE dbo.LockHistory (
      HistoryDate datetime NOT NULL,
      ResourceType varchar(100),
      RequestMode varchar(100),
      RequestStatus varchar(100),
      ResourceDescription varchar(200),
      ResourceAssociatedEntityID varchar(200)
   );

IF Object_ID('dbo.StartTime', 'U') IS NULL
   CREATE TABLE dbo.StartTime (
      StartTime datetime NOT NULL
   );

SET NOCOUNT ON;

IF (SELECT Count(*) FROM dbo.Queue) < 10000 BEGIN
   TRUNCATE TABLE dbo.Queue;

   WITH A (N) AS (SELECT 1 UNION ALL SELECT 1 UNION ALL SELECT 1 UNION ALL SELECT 1),
   B (N) AS (SELECT 1 FROM A Z, A I, A P),
   C (N) AS (SELECT Row_Number() OVER (ORDER BY (SELECT 1)) FROM B O, B W)
   INSERT dbo.Queue (StatusID, QueuedDate)
   SELECT 1, DateAdd(millisecond, C.N * 3, GetDate() - '00:05:00')
   FROM C
   WHERE C.N <= 10000;
END;

TRUNCATE TABLE dbo.StartTime;
INSERT dbo.StartTime SELECT GetDate() + '00:00:15'; -- or however long it takes you to go run the other sessions
GO
TRUNCATE TABLE dbo.QueueHistory;
SET NOCOUNT ON;

DECLARE
   @Time varchar(8),
   @Now datetime;
SELECT @Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;

DECLARE @i int,
@QueueID int;
SET @i = 1;
WHILE @i <= 33 BEGIN
   SET @Now  = GetDate();
   INSERT dbo.QueueHistory
   SELECT
      @Now,
      QueueID
   FROM
      dbo.Queue Q WITH (NOLOCK)
   WHERE
      Q.StatusID <> 1;

   INSERT dbo.LockHistory
   SELECT
      @Now,
      L.resource_type,
      L.request_mode,
      L.request_status,
      L.resource_description,
      L.resource_associated_entity_id
   FROM
      sys.dm_tran_current_transaction T
      INNER JOIN sys.dm_tran_locks L
         ON L.request_owner_id = T.transaction_id;
   WAITFOR DELAY '00:00:01';
   SET @i = @i + 1;
END;

WITH Cols AS (
   SELECT *, Row_Number() OVER (PARTITION BY HistoryDate ORDER BY QueueID) Col
   FROM dbo.QueueHistory
), P AS (
   SELECT *
   FROM
      Cols
      PIVOT (Max(QueueID) FOR Col IN ([1], [2], [3], [4], [5], [6], [7], [8])) P
)
SELECT L.*, P.[1], P.[2], P.[3], P.[4], P.[5], P.[6], P.[7], P.[8]
FROM
   dbo.LockHistory L
   FULL JOIN P
      ON L.HistoryDate = P.HistoryDate

/* Clean up afterward
DROP TABLE dbo.StartTime;
DROP TABLE dbo.LockHistory;
DROP TABLE dbo.QueueHistory;
DROP TABLE dbo.Queue;
*/

Sessão 2

/* Session 2: Simulate an application instance holding a row locked for a long period, and eventually abandoning it. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET NOCOUNT ON;
SET XACT_ABORT ON;

DECLARE
   @QueueID int,
   @Time varchar(8);
SELECT @Time = Convert(varchar(8), StartTime + '0:00:01', 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;
BEGIN TRAN;

--SET @QueueID = (
--   SELECT TOP 1 QueueID
--   FROM dbo.Queue WITH (READPAST, UPDLOCK)
--   WHERE StatusID = 1 -- ready
--   ORDER BY QueuedDate, QueueID
--);

--UPDATE dbo.Queue
--SET StatusID = 2 -- in process
----OUTPUT Inserted.*
--WHERE QueueID = @QueueID;

SET @QueueID = NULL;
UPDATE Q
SET Q.StatusID = 1, @QueueID = Q.QueueID
FROM (
   SELECT TOP 1 *
   FROM dbo.Queue WITH (ROWLOCK, READPAST)
   WHERE StatusID = 1
   ORDER BY QueuedDate, QueueID
) Q

PRINT @QueueID;

WAITFOR DELAY '00:00:20'; -- Release it partway through the test

ROLLBACK TRAN; -- Simulate client disconnecting

Sessão 3

/* Session 3: Run a near-continuous series of "failed" queue processing. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET XACT_ABORT ON;
SET NOCOUNT ON;
DECLARE
   @QueueID int,
   @EndDate datetime,
   @NextDate datetime,
   @Time varchar(8);

SELECT
   @EndDate = StartTime + '0:00:33',
   @Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;

WAITFOR TIME @Time;

WHILE GetDate() < @EndDate BEGIN
   BEGIN TRAN;

   --SET @QueueID = (
   --   SELECT TOP 1 QueueID
   --   FROM dbo.Queue WITH (READPAST, UPDLOCK)
   --   WHERE StatusID = 1 -- ready
   --   ORDER BY QueuedDate, QueueID
   --);

   --UPDATE dbo.Queue
   --SET StatusID = 2 -- in process
   ----OUTPUT Inserted.*
   --WHERE QueueID = @QueueID;

   SET @QueueID = NULL;
   UPDATE Q
   SET Q.StatusID = 1, @QueueID = Q.QueueID
   FROM (
      SELECT TOP 1 *
      FROM dbo.Queue WITH (ROWLOCK, READPAST)
      WHERE StatusID = 1
      ORDER BY QueuedDate, QueueID
   ) Q

   PRINT @QueueID;

   SET @NextDate = GetDate() + '00:00:00.015';
   WHILE GetDate() < @NextDate SET NOCOUNT ON;
   ROLLBACK TRAN;
END

Sessão 4 e superior - quantas você quiser

/* Session 4: "Process" the queue normally, one every second for 30 seconds. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET XACT_ABORT ON;
SET NOCOUNT ON;

DECLARE @Time varchar(8);
SELECT @Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;

DECLARE @i int,
@QueueID int;
SET @i = 1;
WHILE @i <= 30 BEGIN
   BEGIN TRAN;

   --SET @QueueID = (
   --   SELECT TOP 1 QueueID
   --   FROM dbo.Queue WITH (READPAST, UPDLOCK)
   --   WHERE StatusID = 1 -- ready
   --   ORDER BY QueuedDate, QueueID
   --);

   --UPDATE dbo.Queue
   --SET StatusID = 2 -- in process
   --WHERE QueueID = @QueueID;

   SET @QueueID = NULL;
   UPDATE Q
   SET Q.StatusID = 1, @QueueID = Q.QueueID
   FROM (
      SELECT TOP 1 *
      FROM dbo.Queue WITH (ROWLOCK, READPAST)
      WHERE StatusID = 1
      ORDER BY QueuedDate, QueueID
   ) Q

   PRINT @QueueID;
   WAITFOR DELAY '00:00:01'
   SET @i = @i + 1;
   DELETE dbo.Queue
   WHERE QueueID = @QueueID;   
   COMMIT TRAN;
END
ErikE
fonte
2
As filas, conforme descrito no artigo vinculado, podem ser escaladas para centenas ou milhares de operações por segundo. Os problemas de contenção de hot spot são relevantes apenas em maior escala. Existem estratégias de mitigação conhecidas que podem atingir maior produtividade no sistema de ponta, chegando a dezenas de milhares por segundo, mas essas mitigações precisam de avaliação cuidadosa e são implementadas sob a supervisão do SQLCAT .
Remus Rusanu
Uma READPAST, UPDLOCK, ROWLOCKdesvantagem interessante é que, com meu script para capturar dados na tabela QueueHistory, não está fazendo nada. Gostaria de saber se é porque o StatusID não está comprometido? Está usando WITH (NOLOCK)tão teoricamente deve funcionar ... e funcionou antes! Não sei por que não está funcionando agora, mas provavelmente é outra experiência de aprendizado.
ErikE
Você poderia reduzir seu código ao menor exemplo que apresenta o impasse e outros problemas que você está tentando resolver?
Nick Chammas
@ Nick Vou tentar reduzir o código. Sobre seus outros comentários, há uma coluna de identidade que faz parte do índice em cluster e ordenada por após a data. Estou bastante disposto a receber uma "leitura destrutiva" (DELETE with OUTPUT), mas um dos requisitos solicitados era, no caso de uma instância de aplicativo falhar, que a linha retornasse ao processamento automaticamente. Então, minha pergunta aqui é se isso é possível.
ErikE
Tente a abordagem de leitura destrutiva e coloque os itens desenfileirados em uma tabela separada de onde eles podem ser enfileirados, se necessário. Se isso for corrigido, você poderá investir para que esse processo de re-enfileiramento funcione sem problemas.
Nick Chammas

Respostas:

10

Você precisa exatamente de 3 dicas de bloqueio

  • READPAST
  • UPDLOCK
  • ROWLOCK

Eu respondi isso anteriormente no SO: /programming/939831/sql-server-process-queue-race-condition/940001#940001

Como Remus diz, usar o service broker é mais agradável, mas essas dicas funcionam

Seu erro sobre o nível de isolamento geralmente significa replicação ou NOLOCK está envolvido.

gbn
fonte
Usar essas dicas no meu script, conforme indicado acima, gera impasses e processos fora de ordem. ( UPDATE SET ... FROM (SELECT TOP 1 ... FROM ... ORDER BY ...)) Isso significa que meu padrão UPDATE segurando uma trava não pode funcionar? Além disso, no momento em que você combina READPASTcom HOLDLOCKvocê, obtém o erro. Não há replicação neste servidor e o nível de isolamento é READ COMMITTED.
ErikE
2
@ ErikE - Tão importante quanto a maneira como você consulta a tabela é como ela está estruturada. A tabela que você está usando como fila deve ser agrupada na ordem de desenfileiramento, de modo que o próximo item a ser desenfileirado seja inequívoco . Isso é crítico. Analisando seu código acima, não vejo nenhum índice agrupado definido.
Nick Chammas
@ Nick que faz sentido perfeitamente eminente e não sei por que não pensei nisso. Adicionei a restrição PK adequada (e atualizei meu script acima) e ainda tenho impasses. No entanto, os itens agora foram processados ​​na ordem correta, impedindo o processamento repetido dos itens em conflito.
ErikE
@ErikE - 1. Sua fila deve conter apenas itens na fila. Desfileirar e item deve significar excluí-lo da tabela da fila. Vejo que você está atualizando o StatusIDpara desenfileirar um item. Isso está correto? 2. Seu pedido de desenfileiramento deve ser inequívoco. Se você estiver enfileirando itens GETDATE(), em volumes altos, é muito provável que vários itens sejam igualmente elegíveis para desenfileirar ao mesmo tempo. Isso levará a impasses. Sugiro adicionar um IDENTITYao índice clusterizado para garantir uma ordem de remoção da fila inequívoca.
6114 Nick Chammas #
1

O servidor SQL funciona muito bem para armazenar dados relacionais. Quanto a uma fila de empregos, não é tão boa. Veja este artigo escrito para o MySQL, mas também pode ser aplicado aqui. https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you

Eric Humphrey - lotes de ajuda
fonte
Obrigado, Eric. Na minha resposta original à pergunta, eu estava sugerindo o uso do SQL Server Service Broker porque sei que o método tabela como fila não é realmente o que o banco de dados foi criado. Mas acho que não é mais uma boa recomendação, porque o SB é realmente apenas para mensagens. As propriedades ACID dos dados inseridos no banco de dados o tornam um contêiner muito atraente para tentar (ab) usar. Você pode sugerir um produto alternativo e de baixo custo que funcione bem como uma fila genérica? E pode ser feito backup, etc. etc.?
ErikE
8
O artigo é culpado de uma falácia conhecida no processamento de filas: combine estado e eventos em uma única tabela (na verdade, se você olhar para os comentários do artigo, verá que eu me opus a isso há algum tempo). O sintoma típico desse problema é o campo 'processado / processando'. A combinação do estado com os eventos (por exemplo, tornar a tabela de estados a 'fila') resulta no aumento da 'fila' para tamanhos enormes (já que a tabela de estados é a fila). A separação de eventos em uma fila verdadeira leva a uma fila que "drena" (fica vazia) e isso se comporta muito melhor.
Remus Rusanu
O artigo não sugere exatamente o seguinte: a tabela da fila possui APENAS itens prontos para o trabalho.
ErikE
2
@ErikE: você está se referindo a este parágrafo, certo? também é muito fácil evitar a síndrome da mesa grande. Basta criar uma tabela separada para novos e-mails e, quando terminar de processá-los, insira-os no armazenamento de longo prazo e, em seguida, exclua-os da tabela de filas. A tabela de novos e-mails normalmente fica muito pequena e as operações são rápidas . Minha briga com isso é que é apresentada como uma solução alternativa para a questão das "grandes filas". Esta recomendação deveria ter sido na abertura do artigo, é uma questão fundamental .
Remus Rusanu
Se você começar a pensar em uma clara separação de estado x evento, iniciará um caminho muito mais fácil. Até a recomendação acima mudaria para inserir novos emails na emailstabela e na new_emailsfila. O processamento controla a new_emailsfila e atualiza o estado na emailstabela . Isso também evita o problema do estado "gordo" que viaja em filas. Se falarmos sobre processamento distribuído e filas verdadeiras , com comunicação (por exemplo, SSB), as coisas ficam mais complicadas, pois o estado compartilhado é problemático em sistemas com restrições.
Remus Rusanu