Luciano Evaristo Guerche

A brazilian geek interested in .NET technologies

About Me

How to implement a work queue in SQL server

How to implement a work queue in SQL server script is attached just below. Any comment is welcome.

/******************************************************************************
 * Creates WorkQueue table                                                    *
 ******************************************************************************/
CREATE TABLE dbo.WorkQueue
 (
 WQID int NOT NULL,
 Field1 varchar(10) NULL,
 Field2 varchar(10) NULL,
 Field3 varchar(10) NULL,
 Field4 varchar(50) NULL,
 Field5 varchar(50) NULL,
 FieldN varchar(50) NULL,
 Stage1Flag tinyint NOT NULL,
 Stage2Flag tinyint NOT NULL,
 Stage3Flag tinyint NOT NULL
 )  ON [PRIMARY]
GO

ALTER TABLE dbo.WorkQueue ADD CONSTRAINT
 DF_WorkQueue_Stage1Flag DEFAULT 0 FOR Stage1Flag
GO

ALTER TABLE dbo.WorkQueue ADD CONSTRAINT
 DF_WorkQueue_Stage2Flag DEFAULT 0 FOR Stage2Flag
GO

ALTER TABLE dbo.WorkQueue ADD CONSTRAINT
 DF_WorkQueue_Stage3Flag DEFAULT 0 FOR Stage3Flag
GO

ALTER TABLE dbo.WorkQueue ADD CONSTRAINT
 PK_WorkQueue PRIMARY KEY CLUSTERED
 (
 WQID
 ) ON [PRIMARY]
GO

/******************************************************************************
 * Creates WorkQueue index which will be used by TOP procedures               *
 * ATTENTION: Do not delete this index, since it is masterkey to the process  *
 ******************************************************************************/
CREATE  INDEX [IX_WorkQueue_Stages] ON [dbo].[WorkQueue]([Stage1Flag], [Stage2Flag], [Stage3Flag]) ON [PRIMARY]
GO

/******************************************************************************
 * Creates WorkQueueErrors to hold errors which might occurs eventually       *
 ******************************************************************************/
CREATE TABLE dbo.WorkQueueErrors
 (
 WQEID int NOT NULL IDENTITY (1, 1),
 WQID int NOT NULL,
 Stage tinyint NOT NULL,
 Error varchar(255) NOT NULL
 )  ON [PRIMARY]
GO

ALTER TABLE dbo.WorkQueueErrors ADD CONSTRAINT
 PK_WorkQueueErrors PRIMARY KEY CLUSTERED
 (
 WQEID
 ) ON [PRIMARY]

GO

ALTER TABLE dbo.WorkQueueErrors ADD CONSTRAINT
 FK_WorkQueueErrors_WorkQueue FOREIGN KEY
 (
 WQID
 ) REFERENCES dbo.WorkQueue
 (
 WQID
 )
GO

/******************************************************************************
 * Creates procedure WorkQueueStage1Top                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage1Top  -- Last Input First Output (LIFO)
AS
SELECT TOP 1 dbo.WorkQueue.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages))
WHERE (Stage1Flag = 0) -- 0 = not processed
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage1Pop                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage1Pop
(@WQID int,
 @Stage1Flag tinyint -- ATTENTION: parameter must be set to 1 (processed without error) or 2 (processed with error(s))
)
AS
UPDATE dbo.WorkQueue
SET Stage1Flag = @Stage1Flag
WHERE WQID = @WQID
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage2Top                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage2Top  -- Last Input First Output (LIFO)
AS
SELECT TOP 1 dbo.WorkQueue.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages))
WHERE (Stage1Flag = 1) AND -- 1 = processed without error
      (Stage2Flag = 0)     -- 0 = not processed
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage2Pop                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage2Pop
(@WQID int,
 @Stage2Flag tinyint -- ATTENTION: parameter must be set to 1 (processed without error) or 2 (processed with error(s))
)
AS
UPDATE dbo.WorkQueue
SET Stage2Flag = @Stage2Flag
WHERE WQID = @WQID
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage3Top                                        *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage3Top  -- Last Input First Output (LIFO)
AS
SELECT TOP 1 dbo.WorkQueue.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages))
WHERE (Stage1Flag = 1) AND -- 1 = processed without error
      (Stage2Flag = 1) AND -- 1 = processed without error
      (Stage3Flag = 0)     -- 0 = not processed
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage3Pop                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage3Pop
(@WQID int,
 @Stage3Flag tinyint -- ATTENTION: parameter must be set to 1 (processed without error) or 2 (processed with error(s))
)
AS
UPDATE dbo.WorkQueue
SET Stage3Flag = @Stage3Flag
WHERE WQID = @WQID
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueErrorsPush                                      *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueErrorsPush
(@WQID int,
 @Stage tinyint,
 @Error varchar(255)
)
AS
INSERT INTO dbo.WorkQueueErrors
(WQID, Stage, Error)
VALUES
(@WQID, @Stage, @Error)
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueErrorsGetStage1                                 *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueErrorsGetStage1
AS
SELECT dbo.WorkQueue.*,
       dbo.WorkQueueErrors.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages)) INNER JOIN
     dbo.WorkQueueErrors ON dbo.WorkQueue.WQID = dbo.WorkQueueErrors.WQID
WHERE (dbo.WorkQueue.Stage1Flag = 2) AND -- 2 = processed with error(s)
      (dbo.WorkQueueErrors.Stage = 1)
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueErrorsGetStage2                                 *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueErrorsGetStage2
AS
SELECT dbo.WorkQueue.*,
       dbo.WorkQueueErrors.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages)) INNER JOIN
     dbo.WorkQueueErrors ON dbo.WorkQueue.WQID = dbo.WorkQueueErrors.WQID
WHERE (dbo.WorkQueue.Stage1Flag = 1) AND -- 1 = processed without error
      (dbo.WorkQueue.Stage2Flag = 2) AND -- 2 = processed with error(s)
      (dbo.WorkQueueErrors.Stage = 2)
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueErrorsGetStage3                                 *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueErrorsGetStage3
AS
SELECT dbo.WorkQueue.*,
       dbo.WorkQueueErrors.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages)) INNER JOIN
     dbo.WorkQueueErrors ON dbo.WorkQueue.WQID = dbo.WorkQueueErrors.WQID
WHERE (dbo.WorkQueue.Stage1Flag = 1) AND -- 1 = processed without error
      (dbo.WorkQueue.Stage2Flag = 1) AND -- 1 = processed without error
      (dbo.WorkQueue.Stage3Flag = 2) AND -- 2 = processed with error(s)
      (dbo.WorkQueueErrors.Stage = 3)
RETURN
GO

Comments

Aaron Weiker said:

Pardon my ignorance, but what is processing the work queue? Or were your intentions here at just providing a store and retrieval mechanism and letting something else do the actual processing?
# February 13, 2004 9:55 AM

Roy Osherove said:

It would be easier on the eyes to post this as an article and just provide a short post with a link to it. That way the main feed page does not get too large or hard to read :)
# February 13, 2004 9:59 AM

Luciano Evaristo Guerche said:

Aaron,

I posted just the implementation. As my work queues often involves 1) getting a record, 2) inputing/processing the record in a mainframe system and 3) then setting it as processed, I usually create Visual Basic programs which consumes, processes and sets the work queue through ADO callings to stored procedures similar to the ones I posted.

# February 13, 2004 11:01 AM

Luciano Evaristo Guerche said:

Roy,

Thanks for your comment. I am new to the blog world. When setting/administering my blog I saw there was tabs for posts and articles, but I did not figure out what the differences between articles and posts and how to use articles at all. Would you mind commenting more on articles in .Text?
# February 13, 2004 11:03 AM

Aaron Weiker said:

Luciano,
It would be nice if in further posts you extend this and show examples of you could do your implementation.

I do have some questions/suggestions on why you may have done a few things though. First of all, why did you chose only 3 stages? Would it be more extensible if you instead each row was just one stage, however you had another field that identified a previous row that it was a depedency of. Now assuming you did 3 stages per row to cut down on the number of rows that will be in the table, it makes sense as the larger the table the longer it takes to parse. Then speaking of a large table, what type of cleanup work do you recomend doing on the queue? (how long do records stay in there, do you archive them for future reference, etc)
# February 13, 2004 11:44 AM

Luciano Evaristo Guerche said:

Aaron,

With regard to your question about 3 stages (steps), I did it thinking of a modular/scalable/multiuser plataform. As an example stage 1 could be "get additional customer information from mainframe system and save it to missing fields in the record being processed", stage 2 could be "select our nearest branch which could supply our custumer as soon as possible and set the proper missing fields in the record been processed" and stage 3 could be "send an email to selected branch with customer and requested supplies information". Can you see what I am mean. It is like splitting the whole proccess into parts and putting one clerk in charge of it part. Clerk in charge o stage 2 will proccess only records handled accordingly by clerk in charge of stage 1 and so on. In my case, as some of the stages involves dealing with mainframe connections, which are usually slow, I can run many instances of the module (clerk) in charge of a given stage and work will be done in parallel, each one pulling form the same table, but querying in different mainframe connections, regarded the program enclose each record processing inside a transaction so other instances will not get/mess with the same record. My example was made of 3 stages, but there are systems I work on we deal with lots of stages (steps), each one depending on the results of the previous, all working in parallel. My advice for you would be: stop thinking monolithic, it is not scalable at all.

I am not sure I fully understand what you mean by "you had another field that identified a previous row that it was a depedency of". If you were talking about the Error table, do you agree with me that some processes may produce many errors for a given proccessed record? In the example previously mentioned, when I try to get information from mainframe system might return more than one error message like 1) Too many customers found for given information; 2) Give me more parameters and I will try to narrow the search 3) If you cannot do it, you are out of luck. Then I ask you, how would I fit all these myriad of message just in one field/table?

As for cleanup work, I would recommending you involving infrastructure staff and the user who uses/pays for the system and setting some meetings. Sometimes these meetings are funny because the infrastructure staff request database be as fit as possible and user say he needs the data kept for 10 years or more, just to let him/her be possible to query the database for the queries/summaries they want. ;-) I read some articles long time ago which recommend creating an archive database (ex. CustumersDb and CustumerDbArchive) or replicating tables with a suffix in the same database (ex. Custumers and CustumersArchive) and archiving old data in an established timetable. I guess it would fine, but have not implemented it yet because we have plenty of space in our servers, tables are well indexed and user queries all the data often.

Well, hope I have been clear and you can understand what I say. I am a system analyst, not a writer, so hope you understand my words.
# February 13, 2004 1:36 PM
Leave a Comment

(required) 

(required) 

(optional)

(required)