在非同步的資料處理當中,Queue 是常見的應用,本文使用 .NET Framework + AWS Toolkit for Visual Studio 控制有 FIFO 的 AWS SQS。
Version
Windows 10 Pro 1709 16299.251 .NET Framework 4.6.2 Visual Studio 2017 15.5.7 AWS Toolkit for Visual Studio
到 AWS 官網下載 AWS Toolkit for Visual Studio
選擇 AWS Toolkit for Visual Studio 2017
按 Download
開始下載
AWSToolkitPackage.vsix
會下載到 Downloads
目錄下
執行 AWSToolkitPackage
安裝 AWS Toolkit for Visual Studio
安裝完成後,重新啟動 Visual Studio 2017 就會出現 AWS Getting Started
首頁。
建立 Project
File -> New Project
Visual C# -> AWS Samples -> App Services -> AWS SQS Sample
第一次建立專案時,會要求你建立 Account Profile
與 Region
。
按 +
新增
輸入 Access Key ID
與 Secret Access Key
會自動選擇美西的 server
AWS 並不是所有的 SQS 都支援 FIFO,如東京的 server 的 SQS 就不支援 FIFO
寫入 Message 進 Queue
Program.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 using System;using Amazon.SQS;using Amazon.SQS.Model;namespace SQSSendMessage { class Program { public static void Main (string [] args ) { var amazonSqsClient = new AmazonSQSClient(); string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ; try { for (var i = 0 ; i < 3 ; i++) { var message = "message" + i; var sendMessageRequest = new SendMessageRequest { QueueUrl = myQueueUrl, MessageBody = message, MessageGroupId = "Senao" }; amazonSqsClient.SendMessage(sendMessageRequest); Console.WriteLine("Send message {0}" , message); } } catch (AmazonSQSException ex) { Console.WriteLine("Caught Exception: " + ex.Message); Console.WriteLine("Response Status Code: " + ex.StatusCode); Console.WriteLine("Error Code: " + ex.ErrorCode); Console.WriteLine("Error Type: " + ex.ErrorType); Console.WriteLine("Request ID: " + ex.RequestId); } Console.WriteLine("Press Enter to continue..." ); Console.Read(); } } }
11 行
1 2 var amazonSqsClient = new AmazonSQSClient();
要使用 SQS,首先要建立 AmazonSQSClient
物件,所有的操作都從 AmazonSQSClient
開始。
14 行
1 2 string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ;
無論是 寫入 message
、讀出 message
或者 刪除 message
,都必須靠 QueueUrl
辨識你所要控制的 queue。
23 行
1 2 3 4 5 6 7 8 9 var sendMessageRequest = new SendMessageRequest{ QueueUrl = myQueueUrl, MessageBody = message, MessageGroupId = "oomusou" }; amazonSqsClient.SendMessage(sendMessageRequest); Console.WriteLine("Send message {0}" , message);
建立 SendMessageRequest
DTO,其中 QueueUrl
為 queue 的 URL;而 MessageBody
為要傳入 queue 的內容。
FIFO queue 與 Standard queue 最大的差別在於 MessageGroupId
,根據 AWS 文件,只有相同的 MessageGroupId
才能使用 FIFO queue,並保證 message 會 先進先出
,若沒指定 MessageGroupId
則視為 Standard Queue,並不保證 message 會 先進先出
。
再將 SendMessageRequest
DTO 傳入 AmazonSQSClient.SendMessage()
,正式將資料寫入 queue。
從 Queue 讀出 Message
Program.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 using System;using Amazon;using Amazon.SQS;using Amazon.SQS.Model;namespace SQSReceiveMessage { class Program { public static void Main (string [] args ) { var amazonSqsClient = new AmazonSQSClient(); string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo" ; try { for (var i = 0 ; i < 3 ; i++) { var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = myQueueUrl }; var receiveMessageResponse = amazonSqsClient.ReceiveMessage(receiveMessageRequest); if (receiveMessageResponse.Messages != null ) { var message = receiveMessageResponse.Messages[0 ]; if (!string .IsNullOrEmpty(message.Body)) { Console.WriteLine("Read message: {0}" , message.Body); } var messageRecieptHandle = message.ReceiptHandle; var deleteMessageRequest = new DeleteMessageRequest { QueueUrl = myQueueUrl, ReceiptHandle = messageRecieptHandle }; amazonSqsClient.DeleteMessage(deleteMessageRequest); Console.WriteLine("Delete message: {0}" , message.Body); } } } catch (AmazonSQSException ex) { Console.WriteLine("Caught Exception: " + ex.Message); Console.WriteLine("Response Status Code: " + ex.StatusCode); Console.WriteLine("Error Code: " + ex.ErrorCode); Console.WriteLine("Error Type: " + ex.ErrorType); Console.WriteLine("Request ID: " + ex.RequestId); } Console.WriteLine("Press Enter to continue..." ); Console.Read(); } } }
39 行
1 2 3 4 5 6 var receiveMessageRequest = new ReceiveMessageRequest{ QueueUrl = myQueueUrl }; var receiveMessageResponse = amazonSqsClient.ReceiveMessage(receiveMessageRequest);
建立 ReceiveMessageRequest
DTO,其中 QueueUrl
為 queue 的 URL。
再將 ReceiveMessageRequest
DTO 傳入 AmazonSQSClient.ReceiveMessage()
,正式從 queue 讀出資料。
45 行
1 2 3 4 5 6 7 8 9 if (receiveMessageResponse.Messages != null ){ var message = receiveMessageResponse.Messages[0 ]; if (!string .IsNullOrEmpty(message.Body)) { Console.WriteLine("Read message: {0}" , message.Body); } }
其中 receiveMessageResponse.Messages
為 List<Message>
,預設每次 ReceiveMessage()
只會從 queue 讀出一筆 message,且資料在 message.Body
內。
55 行
1 2 3 4 5 6 7 8 var messageRecieptHandle = message.ReceiptHandle;var deleteMessageRequest = new DeleteMessageRequest{ QueueUrl = myQueueUrl, ReceiptHandle = messageRecieptHandle }; amazonSqsClient.DeleteMessage(deleteMessageRequest);
建立 DeleteMessageRequest
DTO,其中 QueueUrl
為 queue 的 URL;此外 ReceipeHandle
為要刪除 message 的 handle。
ReceipeHandle
由 message.ReceiptHandle
獲得。
再將 DeleteMessageRequest
DTO 傳入 AmazonSQSClient.DeleteMessage()
,正式從 queue 刪除資料。
根據 AWS SQS 特性,若不從 SQS 刪除 message, 若繼續下 ReceiveMessage()
,仍然會讀取到相同的 message,唯有刪除了 message
之後,才會依 FIFO 順序讀到下一筆 message
依序寫入 message0
、message1
與 message2
三筆資料進有 FIFO 的 AWS SQS。
依序讀出 message0
、message1
與 message2
。
Q&A
Q : 如何查看 AWS SQS API ?
到 AWS SDK for .NET API Reference 官網,查詢 Amazon.SQS
的 AmazonSQSClient
,所有 SQS 的 API 都是由這裡展開出來。
Conclusion
FIFO queue 與 Standard queue 最大的差異在於 SendMessage()
的 DTO 必須包含 MessageGroupId
,才能保證相同 MessageGroupId
有 FIFO 的 先進先出
特性
ReceiveMessage()
之後還必須 DeleteMessage()
,才能確保 SendMessage()
會收到下一筆 message,否則仍然會繼續收到目前的 message
Sample Code
完整的範例可以在我的 GitHub 上找到
Reference
AWS , AWS SDK for .NET Developer Guide AWS , AWS Toolkit for Visual Studio AWS , Messaging Using Amazon SQS AWS , AWS SDK for .NET API Reference