在非同步的資料處理當中,Queue 是常見的應用,本文使用 .NET Framework + AWS Toolkit for Visual Studio 控制有 FIFO 的 AWS SQS。
Version Windows 10 Pro 1709 16299.251
到 AWS 官網下載 AWS Toolkit for Visual Studio  
選擇 AWS Toolkit for Visual Studio 2017 
 
按 Download 開始下載 
 
AWSToolkitPackage.vsix 會下載到 Downloads 目錄下執行 AWSToolkitPackage 安裝 AWS Toolkit for Visual Studio 
 
安裝完成後,重新啟動 Visual Studio 2017 就會出現 AWS Getting Started 首頁。
建立 Project 
File -> New Project 
Visual C# -> AWS Samples -> App Services -> AWS SQS Sample 
第一次建立專案時,會要求你建立 Account Profile 與 Region。
按 + 新增 
 
輸入 Access Key ID 與 Secret Access Key 
 
會自動選擇美西的 server 
 
AWS 並不是所有的 SQS 都支援 FIFO,如東京的 server 的 SQS 就不支援 FIFO
 
寫入 Message 進 Queue Program.cs 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 using  System;using  Amazon.SQS;using  Amazon.SQS.Model;namespace  SQSSendMessage {     class  Program      {         public  static  void  Main (string [] args         {                          var  amazonSqsClient = new  AmazonSQSClient();                          string  myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ;             try              {                                  for  (var  i = 0 ; i < 3 ; i++)                 {                     var  message = "message"  + i;                     var  sendMessageRequest = new  SendMessageRequest                     {                         QueueUrl = myQueueUrl,                         MessageBody = message,                         MessageGroupId = "Senao"                      };                     amazonSqsClient.SendMessage(sendMessageRequest);                     Console.WriteLine("Send message {0}" , message);                 }             }             catch  (AmazonSQSException ex)             {                 Console.WriteLine("Caught Exception: "  + ex.Message);                 Console.WriteLine("Response Status Code: "  + ex.StatusCode);                 Console.WriteLine("Error Code: "  + ex.ErrorCode);                 Console.WriteLine("Error Type: "  + ex.ErrorType);                 Console.WriteLine("Request ID: "  + ex.RequestId);             }             Console.WriteLine("Press Enter to continue..." );             Console.Read();         }     } } 
11 行
1 2 var  amazonSqsClient = new  AmazonSQSClient();
要使用 SQS,首先要建立 AmazonSQSClient 物件,所有的操作都從 AmazonSQSClient 開始。
14 行
1 2 string  myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ;
無論是 寫入 message、讀出 message 或者 刪除 message,都必須靠 QueueUrl 辨識你所要控制的 queue。
23 行
1 2 3 4 5 6 7 8 9 var  sendMessageRequest = new  SendMessageRequest{     QueueUrl = myQueueUrl,     MessageBody = message,     MessageGroupId = "oomusou"  }; amazonSqsClient.SendMessage(sendMessageRequest); Console.WriteLine("Send message {0}" , message); 
建立 SendMessageRequest DTO,其中 QueueUrl 為 queue 的 URL;而 MessageBody 為要傳入 queue 的內容。
FIFO queue 與 Standard queue 最大的差別在於 MessageGroupId,根據 AWS 文件,只有相同的 MessageGroupId 才能使用 FIFO queue,並保證 message 會 先進先出,若沒指定 MessageGroupId 則視為 Standard Queue,並不保證 message 會 先進先出。
 
再將 SendMessageRequest DTO 傳入 AmazonSQSClient.SendMessage(),正式將資料寫入 queue。
從 Queue 讀出 Message Program.cs 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 using  System;using  Amazon;using  Amazon.SQS;using  Amazon.SQS.Model;namespace  SQSReceiveMessage {     class  Program      {         public  static  void  Main (string [] args         {                          var  amazonSqsClient = new  AmazonSQSClient();                          string  myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ;             try              {                                  for  (var  i = 0 ; i < 3 ; i++)                 {                                          var  receiveMessageRequest = new  ReceiveMessageRequest                     {                         QueueUrl = myQueueUrl                     };                     var  receiveMessageResponse = amazonSqsClient.ReceiveMessage(receiveMessageRequest);                     if  (receiveMessageResponse.Messages != null )                     {                         var  message = receiveMessageResponse.Messages[0 ];                         if  (!string .IsNullOrEmpty(message.Body))                         {                             Console.WriteLine("Read message: {0}" , message.Body);                         }                                                  var  messageRecieptHandle = message.ReceiptHandle;                         var  deleteMessageRequest = new  DeleteMessageRequest                         {                             QueueUrl = myQueueUrl,                             ReceiptHandle = messageRecieptHandle                         };                         amazonSqsClient.DeleteMessage(deleteMessageRequest);                         Console.WriteLine("Delete message: {0}" , message.Body);                     }                 }             }             catch  (AmazonSQSException ex)             {                 Console.WriteLine("Caught Exception: "  + ex.Message);                 Console.WriteLine("Response Status Code: "  + ex.StatusCode);                 Console.WriteLine("Error Code: "  + ex.ErrorCode);                 Console.WriteLine("Error Type: "  + ex.ErrorType);                 Console.WriteLine("Request ID: "  + ex.RequestId);             }             Console.WriteLine("Press Enter to continue..." );             Console.Read();         }     } } 
39 行
1 2 3 4 5 6 var  receiveMessageRequest = new  ReceiveMessageRequest{     QueueUrl = myQueueUrl }; var  receiveMessageResponse = amazonSqsClient.ReceiveMessage(receiveMessageRequest);
建立 ReceiveMessageRequest DTO,其中 QueueUrl 為 queue 的 URL。
再將 ReceiveMessageRequest DTO 傳入 AmazonSQSClient.ReceiveMessage(),正式從 queue 讀出資料。
45 行
1 2 3 4 5 6 7 8 9 if  (receiveMessageResponse.Messages != null ){     var  message = receiveMessageResponse.Messages[0 ];     if  (!string .IsNullOrEmpty(message.Body))     {         Console.WriteLine("Read message: {0}" , message.Body);     } } 
其中 receiveMessageResponse.Messages 為 List<Message>,預設每次 ReceiveMessage() 只會從 queue 讀出一筆 message,且資料在 message.Body 內。
55 行
1 2 3 4 5 6 7 8 var  messageRecieptHandle = message.ReceiptHandle;var  deleteMessageRequest = new  DeleteMessageRequest{     QueueUrl = myQueueUrl,     ReceiptHandle = messageRecieptHandle };                         amazonSqsClient.DeleteMessage(deleteMessageRequest); 
建立 DeleteMessageRequest DTO,其中 QueueUrl 為 queue 的 URL;此外 ReceipeHandle 為要刪除 message 的 handle。
ReceipeHandle 由 message.ReceiptHandle 獲得。
再將 DeleteMessageRequest DTO 傳入 AmazonSQSClient.DeleteMessage(),正式從 queue 刪除資料。
根據 AWS SQS 特性,若不從 SQS 刪除 message, 若繼續下  ReceiveMessage() ,仍然會讀取到相同的 message,唯有刪除了 message 之後,才會依 FIFO 順序讀到下一筆 message
 
依序寫入 message0、message1 與 message2 三筆資料進有 FIFO 的 AWS SQS。
依序讀出 message0、message1 與 message2 。
Q&A 
Q : 如何查看 AWS SQS API ?
 
到 AWS SDK for .NET API Reference  官網,查詢 Amazon.SQS 的 AmazonSQSClient,所有 SQS 的 API 都是由這裡展開出來。
Conclusion 
FIFO queue 與 Standard queue 最大的差異在於 SendMessage() 的 DTO 必須包含 MessageGroupId,才能保證相同 MessageGroupId 有 FIFO 的 先進先出 特性 
ReceiveMessage() 之後還必須 DeleteMessage(),才能確保 SendMessage() 會收到下一筆 message,否則仍然會繼續收到目前的 message 
Sample Code 完整的範例可以在我的 GitHub  上找到
Reference AWS , AWS SDK for .NET Developer Guide AWS , AWS Toolkit for Visual Studio AWS , Messaging Using Amazon SQS AWS , AWS SDK for .NET API Reference