在非同步的資料處理當中,Queue 是常見的應用,本文使用 .NET Framework + AWS Toolkit for Visual Studio 控制有 FIFO 的 AWS SQS。
Version
Windows 10 Pro 1709 16299.251 .NET Framework 4.6.2 Visual Studio 2017 15.5.7 AWS Toolkit for Visual Studio
到 AWS 官網下載 AWS Toolkit for Visual Studio
選擇 AWS Toolkit for Visual Studio 2017
按 Download 開始下載
AWSToolkitPackage.vsix 會下載到 Downloads 目錄下
執行 AWSToolkitPackage 安裝 AWS Toolkit for Visual Studio
安裝完成後,重新啟動 Visual Studio 2017 就會出現 AWS Getting Started 首頁。
建立 Project
File -> New Project
Visual C# -> AWS Samples -> App Services -> AWS SQS Sample
第一次建立專案時,會要求你建立 Account Profile 與 Region。
按 + 新增
輸入 Access Key ID 與 Secret Access Key
會自動選擇美西的 server
AWS 並不是所有的 SQS 都支援 FIFO,如東京的 server 的 SQS 就不支援 FIFO
寫入 Message 進 Queue
Program.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 using System;using Amazon.SQS;using Amazon.SQS.Model;namespace SQSSendMessage { class Program { public static void Main (string [] args ) { var amazonSqsClient = new AmazonSQSClient(); string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ; try { for (var i = 0 ; i < 3 ; i++) { var message = "message" + i; var sendMessageRequest = new SendMessageRequest { QueueUrl = myQueueUrl, MessageBody = message, MessageGroupId = "Senao" }; amazonSqsClient.SendMessage(sendMessageRequest); Console.WriteLine("Send message {0}" , message); } } catch (AmazonSQSException ex) { Console.WriteLine("Caught Exception: " + ex.Message); Console.WriteLine("Response Status Code: " + ex.StatusCode); Console.WriteLine("Error Code: " + ex.ErrorCode); Console.WriteLine("Error Type: " + ex.ErrorType); Console.WriteLine("Request ID: " + ex.RequestId); } Console.WriteLine("Press Enter to continue..." ); Console.Read(); } } }
11 行
1 2 var amazonSqsClient = new AmazonSQSClient();
要使用 SQS,首先要建立 AmazonSQSClient 物件,所有的操作都從 AmazonSQSClient 開始。
14 行
1 2 string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ;
無論是 寫入 message、讀出 message 或者 刪除 message,都必須靠 QueueUrl 辨識你所要控制的 queue。
23 行
1 2 3 4 5 6 7 8 9 var sendMessageRequest = new SendMessageRequest{ QueueUrl = myQueueUrl, MessageBody = message, MessageGroupId = "oomusou" }; amazonSqsClient.SendMessage(sendMessageRequest); Console.WriteLine("Send message {0}" , message);
建立 SendMessageRequest DTO,其中 QueueUrl 為 queue 的 URL;而 MessageBody 為要傳入 queue 的內容。
FIFO queue 與 Standard queue 最大的差別在於 MessageGroupId,根據 AWS 文件,只有相同的 MessageGroupId 才能使用 FIFO queue,並保證 message 會 先進先出,若沒指定 MessageGroupId 則視為 Standard Queue,並不保證 message 會 先進先出。
再將 SendMessageRequest DTO 傳入 AmazonSQSClient.SendMessage(),正式將資料寫入 queue。
從 Queue 讀出 Message
Program.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 using System;using Amazon;using Amazon.SQS;using Amazon.SQS.Model;namespace SQSReceiveMessage { class Program { public static void Main (string [] args ) { var amazonSqsClient = new AmazonSQSClient(); string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ; try { for (var i = 0 ; i < 3 ; i++) { var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = myQueueUrl }; var receiveMessageResponse = amazonSqsClient.ReceiveMessage(receiveMessageRequest); if (receiveMessageResponse.Messages != null ) { var message = receiveMessageResponse.Messages[0 ]; if (!string .IsNullOrEmpty(message.Body)) { Console.WriteLine("Read message: {0}" , message.Body); } var messageRecieptHandle = message.ReceiptHandle; var deleteMessageRequest = new DeleteMessageRequest { QueueUrl = myQueueUrl, ReceiptHandle = messageRecieptHandle }; amazonSqsClient.DeleteMessage(deleteMessageRequest); Console.WriteLine("Delete message: {0}" , message.Body); } } } catch (AmazonSQSException ex) { Console.WriteLine("Caught Exception: " + ex.Message); Console.WriteLine("Response Status Code: " + ex.StatusCode); Console.WriteLine("Error Code: " + ex.ErrorCode); Console.WriteLine("Error Type: " + ex.ErrorType); Console.WriteLine("Request ID: " + ex.RequestId); } Console.WriteLine("Press Enter to continue..." ); Console.Read(); } } }
39 行
1 2 3 4 5 6 var receiveMessageRequest = new ReceiveMessageRequest{ QueueUrl = myQueueUrl }; var receiveMessageResponse = amazonSqsClient.ReceiveMessage(receiveMessageRequest);
建立 ReceiveMessageRequest DTO,其中 QueueUrl 為 queue 的 URL。
再將 ReceiveMessageRequest DTO 傳入 AmazonSQSClient.ReceiveMessage(),正式從 queue 讀出資料。
45 行
1 2 3 4 5 6 7 8 9 if (receiveMessageResponse.Messages != null ){ var message = receiveMessageResponse.Messages[0 ]; if (!string .IsNullOrEmpty(message.Body)) { Console.WriteLine("Read message: {0}" , message.Body); } }
其中 receiveMessageResponse.Messages 為 List<Message>,預設每次 ReceiveMessage() 只會從 queue 讀出一筆 message,且資料在 message.Body 內。
55 行
1 2 3 4 5 6 7 8 var messageRecieptHandle = message.ReceiptHandle;var deleteMessageRequest = new DeleteMessageRequest{ QueueUrl = myQueueUrl, ReceiptHandle = messageRecieptHandle }; amazonSqsClient.DeleteMessage(deleteMessageRequest);
建立 DeleteMessageRequest DTO,其中 QueueUrl 為 queue 的 URL;此外 ReceipeHandle 為要刪除 message 的 handle。
ReceipeHandle 由 message.ReceiptHandle 獲得。
再將 DeleteMessageRequest DTO 傳入 AmazonSQSClient.DeleteMessage(),正式從 queue 刪除資料。
根據 AWS SQS 特性,若不從 SQS 刪除 message, 若繼續下 ReceiveMessage() ,仍然會讀取到相同的 message,唯有刪除了 message 之後,才會依 FIFO 順序讀到下一筆 message
依序寫入 message0、message1 與 message2 三筆資料進有 FIFO 的 AWS SQS。
依序讀出 message0、message1 與 message2 。
Q&A
Q : 如何查看 AWS SQS API ?
到 AWS SDK for .NET API Reference 官網,查詢 Amazon.SQS 的 AmazonSQSClient,所有 SQS 的 API 都是由這裡展開出來。
Conclusion
FIFO queue 與 Standard queue 最大的差異在於 SendMessage() 的 DTO 必須包含 MessageGroupId,才能保證相同 MessageGroupId 有 FIFO 的 先進先出 特性
ReceiveMessage() 之後還必須 DeleteMessage(),才能確保 SendMessage() 會收到下一筆 message,否則仍然會繼續收到目前的 message
Sample Code
完整的範例可以在我的 GitHub 上找到
Reference
AWS , AWS SDK for .NET Developer Guide AWS , AWS Toolkit for Visual Studio AWS , Messaging Using Amazon SQS AWS , AWS SDK for .NET API Reference