使用 .NET Framework 與 AWS Toolkit for Visual Studio

在非同步的資料處理當中,Queue 是常見的應用,本文使用 .NET Framework + AWS Toolkit for Visual Studio 控制有 FIFO 的 AWS SQS。

Version


Windows 10 Pro 1709 16299.251
.NET Framework 4.6.2
Visual Studio 2017 15.5.7
AWS Toolkit for Visual Studio

安裝 AWS Toolkit for Visual Studio


sqs001

  1. 到 AWS 官網下載 AWS Toolkit for Visual Studio
  2. 選擇 AWS Toolkit for Visual Studio 2017

sqs002

  1. Download 開始下載

sqs003

  1. AWSToolkitPackage.vsix 會下載到 Downloads 目錄下
  2. 執行 AWSToolkitPackage 安裝 AWS Toolkit for Visual Studio

sqs004

安裝完成後,重新啟動 Visual Studio 2017 就會出現 AWS Getting Started 首頁。

建立 Project


sqs005

File -> New Project

Visual C# -> AWS Samples -> App Services -> AWS SQS Sample

sqs006

第一次建立專案時,會要求你建立 Account ProfileRegion

  1. + 新增

sqs007

  1. 輸入 Access Key IDSecret Access Key

sqs008

  1. 會自動選擇美西的 server

AWS 並不是所有的 SQS 都支援 FIFO,如東京的 server 的 SQS 就不支援 FIFO

寫入 Message 進 Queue


Program.cs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
using System;
using Amazon.SQS;
using Amazon.SQS.Model;

namespace SQSSendMessage
{
class Program
{
public static void Main(string[] args)
{

// SQSClient from AWSSDK
var amazonSqsClient = new AmazonSQSClient();

// FIFO quene URL
string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo";

try
{
// Send 3 messages to FIFO queue
for (var i = 0; i < 3; i++)
{
var message = "message" + i;
var sendMessageRequest = new SendMessageRequest
{
QueueUrl = myQueueUrl,
MessageBody = message,
MessageGroupId = "Senao"
};

amazonSqsClient.SendMessage(sendMessageRequest);
Console.WriteLine("Send message {0}", message);
}
}
catch (AmazonSQSException ex)
{
Console.WriteLine("Caught Exception: " + ex.Message);
Console.WriteLine("Response Status Code: " + ex.StatusCode);
Console.WriteLine("Error Code: " + ex.ErrorCode);
Console.WriteLine("Error Type: " + ex.ErrorType);
Console.WriteLine("Request ID: " + ex.RequestId);
}

Console.WriteLine("Press Enter to continue...");
Console.Read();
}
}
}

11 行

1
2
// SQSClient from AWSSDK
var amazonSqsClient = new AmazonSQSClient();

要使用 SQS,首先要建立 AmazonSQSClient 物件,所有的操作都從 AmazonSQSClient 開始。

14 行

1
2
// FIFO quene URL
string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo";

無論是 寫入 message讀出 message 或者 刪除 message,都必須靠 QueueUrl 辨識你所要控制的 queue。

23 行

1
2
3
4
5
6
7
8
9
var sendMessageRequest = new SendMessageRequest
{
QueueUrl = myQueueUrl,
MessageBody = message,
MessageGroupId = "oomusou"
};

amazonSqsClient.SendMessage(sendMessageRequest);
Console.WriteLine("Send message {0}", message);

建立 SendMessageRequest DTO,其中 QueueUrl 為 queue 的 URL;而 MessageBody 為要傳入 queue 的內容。

FIFO queue 與 Standard queue 最大的差別在於 MessageGroupId,根據 AWS 文件,只有相同的 MessageGroupId 才能使用 FIFO queue,並保證 message 會 先進先出,若沒指定 MessageGroupId 則視為 Standard Queue,並不保證 message 會 先進先出

再將 SendMessageRequest DTO 傳入 AmazonSQSClient.SendMessage(),正式將資料寫入 queue。

從 Queue 讀出 Message


Program.cs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
using System;
using Amazon;
using Amazon.SQS;
using Amazon.SQS.Model;

namespace SQSReceiveMessage
{
class Program
{
public static void Main(string[] args)
{

// SQSClient from AWSSDK
var amazonSqsClient = new AmazonSQSClient();

// FIFO quene URL
string myQueueUrl = "https://sqs.us-west-2.amazonaws.com/781160412246/ecfe.fifo";

try
{
// Send 3 messages to FIFO queue
for (var i = 0; i < 3; i++)
{
// Read message from FIFO queue
var receiveMessageRequest = new ReceiveMessageRequest
{
QueueUrl = myQueueUrl
};

var receiveMessageResponse = amazonSqsClient.ReceiveMessage(receiveMessageRequest);
if (receiveMessageResponse.Messages != null)
{
var message = receiveMessageResponse.Messages[0];

if (!string.IsNullOrEmpty(message.Body))
{
Console.WriteLine("Read message: {0}", message.Body);
}


// Delete message from FIFO queue
var messageRecieptHandle = message.ReceiptHandle;
var deleteMessageRequest = new DeleteMessageRequest
{
QueueUrl = myQueueUrl,
ReceiptHandle = messageRecieptHandle
};
amazonSqsClient.DeleteMessage(deleteMessageRequest);
Console.WriteLine("Delete message: {0}", message.Body);
}
}
}
catch (AmazonSQSException ex)
{
Console.WriteLine("Caught Exception: " + ex.Message);
Console.WriteLine("Response Status Code: " + ex.StatusCode);
Console.WriteLine("Error Code: " + ex.ErrorCode);
Console.WriteLine("Error Type: " + ex.ErrorType);
Console.WriteLine("Request ID: " + ex.RequestId);
}


Console.WriteLine("Press Enter to continue...");
Console.Read();
}
}
}

39 行

1
2
3
4
5
6
var receiveMessageRequest = new ReceiveMessageRequest
{
QueueUrl = myQueueUrl
};

var receiveMessageResponse = amazonSqsClient.ReceiveMessage(receiveMessageRequest);

建立 ReceiveMessageRequest DTO,其中 QueueUrl 為 queue 的 URL。

再將 ReceiveMessageRequest DTO 傳入 AmazonSQSClient.ReceiveMessage(),正式從 queue 讀出資料。

45 行

1
2
3
4
5
6
7
8
9
if (receiveMessageResponse.Messages != null)
{
var message = receiveMessageResponse.Messages[0];

if (!string.IsNullOrEmpty(message.Body))
{
Console.WriteLine("Read message: {0}", message.Body);
}
}

其中 receiveMessageResponse.MessagesList<Message>,預設每次 ReceiveMessage() 只會從 queue 讀出一筆 message,且資料在 message.Body 內。

55 行

1
2
3
4
5
6
7
8
// Delete message from FIFO queue
var messageRecieptHandle = message.ReceiptHandle;
var deleteMessageRequest = new DeleteMessageRequest
{
QueueUrl = myQueueUrl,
ReceiptHandle = messageRecieptHandle
};
amazonSqsClient.DeleteMessage(deleteMessageRequest);

建立 DeleteMessageRequest DTO,其中 QueueUrl 為 queue 的 URL;此外 ReceipeHandle 為要刪除 message 的 handle。

ReceipeHandlemessage.ReceiptHandle 獲得。

再將 DeleteMessageRequest DTO 傳入 AmazonSQSClient.DeleteMessage(),正式從 queue 刪除資料。

根據 AWS SQS 特性,若不從 SQS 刪除 message, 若繼續下 ReceiveMessage() ,仍然會讀取到相同的 message,唯有刪除了 message 之後,才會依 FIFO 順序讀到下一筆 message

sqs010

依序寫入 message0message1message2 三筆資料進有 FIFO 的 AWS SQS。

sqs011

依序讀出 message0message1message2

Q&A


Q : 如何查看 AWS SQS API ?

sqs009

AWS SDK for .NET API Reference 官網,查詢 Amazon.SQSAmazonSQSClient,所有 SQS 的 API 都是由這裡展開出來。

Conclusion


  • FIFO queue 與 Standard queue 最大的差異在於 SendMessage() 的 DTO 必須包含 MessageGroupId,才能保證相同 MessageGroupId 有 FIFO 的 先進先出 特性
  • ReceiveMessage() 之後還必須 DeleteMessage(),才能確保 SendMessage() 會收到下一筆 message,否則仍然會繼續收到目前的 message

Sample Code


完整的範例可以在我的 GitHub 上找到

Reference


AWS, AWS SDK for .NET Developer Guide
AWS, AWS Toolkit for Visual Studio
AWS, Messaging Using Amazon SQS
AWS, AWS SDK for .NET API Reference

2018-03-14