SQL Server Broker事务在出现病毒消息异常时完成

本文关键字:异常 消息 病毒 Broker Server 事务 SQL | 更新日期: 2023-09-27 18:19:34

我在C#应用程序中使用2008 R2中的SQL Server Broker,并试图处理SQL Server检测到有害消息并禁用目标队列的情况。

当这种情况发生时,当我试图接收消息时,会抛出SqlException。此时,我正在使用的SqlTransaction似乎不再可提交。

我将使用本教程与我的C#代码一起进行演示。

首先使用教程中的T-SQL代码创建必要的service broker对象,并发送一条消息,使其位于Target队列中。

CREATE MESSAGE TYPE
       [//AWDB/1DBSample/RequestMessage]
       VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE
       [//AWDB/1DBSample/ReplyMessage]
       VALIDATION = WELL_FORMED_XML;
GO
CREATE CONTRACT [//AWDB/1DBSample/SampleContract]
      ([//AWDB/1DBSample/RequestMessage]
       SENT BY INITIATOR,
       [//AWDB/1DBSample/ReplyMessage]
       SENT BY TARGET
      );
GO
CREATE QUEUE TargetQueue1DB;
CREATE SERVICE
       [//AWDB/1DBSample/TargetService]
       ON QUEUE TargetQueue1DB
       ([//AWDB/1DBSample/SampleContract]);
GO
CREATE QUEUE InitiatorQueue1DB;
CREATE SERVICE
       [//AWDB/1DBSample/InitiatorService]
       ON QUEUE InitiatorQueue1DB;
GO
DECLARE @InitDlgHandle UNIQUEIDENTIFIER;
DECLARE @RequestMsg NVARCHAR(100);
BEGIN TRANSACTION;
BEGIN DIALOG @InitDlgHandle
     FROM SERVICE
      [//AWDB/1DBSample/InitiatorService]
     TO SERVICE
      N'//AWDB/1DBSample/TargetService'
     ON CONTRACT
      [//AWDB/1DBSample/SampleContract]
     WITH
         ENCRYPTION = OFF;
SELECT @RequestMsg =
       N'<RequestMsg>Message for Target service.</RequestMsg>';
SEND ON CONVERSATION @InitDlgHandle
     MESSAGE TYPE 
     [//AWDB/1DBSample/RequestMessage]
     (@RequestMsg);
SELECT @RequestMsg AS SentRequestMsg;
COMMIT TRANSACTION;
GO

接下来运行这个C#代码,它是一个控制台应用程序。

using System.Data.SqlClient;
namespace ServerConsoleApplication
{
    class Program
    {
        static SqlConnection conn = null;
        static void Main(string[] args)
        {
            conn = new SqlConnection("connection string");
            conn.Open();
            Receive(); // 1
            Receive(); // 2
            Receive(); // 3
            Receive(); // 4
            Receive(); // 5
            Receive(); // 6 - Poison Message exception invoked
            conn.Close();
        }
        static void Receive()
        {
            using (SqlTransaction tran = conn.BeginTransaction())
            {
                try
                {
                    using (SqlCommand waitCommand = conn.CreateCommand())
                    {
                        waitCommand.Transaction = tran;
                        waitCommand.CommandText = string.Format("WAITFOR (RECEIVE TOP (1) conversation_handle, convert(xml,message_body) FROM TargetQueue1DB), TIMEOUT 1000");
                        using (SqlDataReader reader = waitCommand.ExecuteReader())
                        {
                        }
                    }
                    // Rollback on purpose to cause the poison message
                    tran.Rollback();
                }
                catch (SqlException ex)
                {
                    if (ex.Number == 9617)
                    {
                        // Re-Enable the queue
                        using (SqlCommand enableCmd = conn.CreateCommand())
                        {
                            enableCmd.Transaction = tran;
                            enableCmd.CommandText = string.Format(@"ALTER QUEUE TargetQueue1DB WITH STATUS = ON");
                            enableCmd.ExecuteNonQuery();
                        }
                        System.Data.SqlTypes.SqlGuid handle = System.Data.SqlTypes.SqlGuid.Null;
                        // Pull the poison message off the queue
                        using (SqlCommand waitCommand = conn.CreateCommand())
                        {
                            waitCommand.Transaction = tran;
                            waitCommand.CommandText = string.Format("WAITFOR (RECEIVE TOP (1) conversation_handle, convert(xml,message_body) FROM TargetQueue1DB), TIMEOUT 1000");
                            using (SqlDataReader reader = waitCommand.ExecuteReader())
                            {
                                while (reader.Read())
                                {
                                    handle = reader.GetSqlGuid(0);
                                }
                            }
                        }
                        // End the conversation just for clean up
                        using (SqlCommand endCmd = conn.CreateCommand())
                        {
                            endCmd.Transaction = tran;
                            endCmd.CommandText = "End Conversation @handle";
                            endCmd.Parameters.Add("@handle", System.Data.SqlDbType.UniqueIdentifier);
                            endCmd.Parameters["@handle"].Value = handle;
                            endCmd.ExecuteNonQuery();
                        }
                        // Commit the transaction so the message is removed from queue.
                        tran.Commit();
                    }
                }
            }
        }
    }
}

上面的代码只是行为的一个演示。当然,你通常不会像这样接收和呼叫Rollback。

Receive方法接收消息并在事务上调用Rollback以激发有害消息行为。在对Receive的第六次调用中,由于队列按预期被禁用,因此引发了SQLException。

此时,我想重新启用队列,删除有害消息并结束对话(不需要结束)。这一切都有效,但我提交了事务,因为我真的想把毒消息从队列中删除。

结果Commit调用上抛出异常,声明

此SqlTransaction已完成;它不再可用。

在第6次调用Receive时没有调用回滚或Commit,该事务是如何完成的?

此外,TargetQueue1DB中的消息是如何被删除的?我认为receive不会将消息从队列中删除,除非它在已提交的事务中。但是,如果在调用提交之前查看TargetQueue1DB,则队列为空。

如果您稍微修改代码,使waitCommand在捕获SqlException时处于作用域中,您将看到waitCommandinstance的Connection和Transaction属性已设置为null。这对我来说是一种奇怪的行为。

SQL Server Broker事务在出现病毒消息异常时完成

SqlTransaction的客户端状态不一定反映服务器上的事务状态。考虑您捕获的异常是否为1205,死锁。在这种情况下,在服务器中引发异常之前,事务已在服务器上回滚,即使当前帧中有一个SqlTransaction对象既没有提交也没有回滚。

在catch块中,您需要处理当前事务对象,并启动一个新的事务对象来执行错误处理逻辑。

该消息被删除是因为您执行了捕获处理逻辑,而没有在服务器上启动实际事务。您使用了已过期的不再相关的tran对象。您的RECEIVE已立即提交(完全没有相关交易)。