AWS Step FunctionsでDynamoDBを操作するためのAWS SDK統合
AWS Step FunctionsからAWS SDKを使用してDynamoDBテーブルの作成、項目の追加・更新・削除などを行う方法を解説します
はじめに
AWS Step Functionsは、AWSのサーバーレスワークフローサービスで、複数のAWSサービスを組み合わせて複雑なビジネスロジックを実装できます。Step Functionsの強力な機能の一つに「AWS SDK統合」があります。これにより、Lambda関数を作成することなく、Step Functionsから直接AWSサービスのAPIを呼び出すことができます。
この記事では、AWS Step FunctionsからAWS SDK統合を使用してAmazon DynamoDBの操作を行う方法について詳しく解説します。テーブルの作成、項目の追加・更新・取得・削除など、基本的なDynamoDB操作をStep Functionsから直接実行する方法を学びましょう。
AWS SDK統合とDynamoDBの最適化統合
Step FunctionsでDynamoDBを操作するには、2つの方法があります:
- 最適化された統合(Optimized Integration): 基本的なCRUD操作(GetItem、PutItem、UpdateItem、DeleteItem)に特化した統合方法です。
- AWS SDK統合: DynamoDBのすべてのAPI操作にアクセスできる汎用的な統合方法です。
最適化された統合は、以下の形式でリソースを指定します:
arn:aws:states:::dynamodb:操作名
例えば、項目を取得するには:
arn:aws:states:::dynamodb:getItem
AWS SDK統合は、以下の形式でリソースを指定します:
arn:aws:states:::aws-sdk:dynamodb:操作名
例えば、テーブルを作成するには:
arn:aws:states:::aws-sdk:dynamodb:createTable
IAMロールの設定
Step FunctionsからDynamoDB操作を行うには、適切なIAMロールの設定が必要です。以下は、DynamoDBテーブルの作成、項目の操作などを行うために必要な権限を持つポリシーの例です:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:CreateTable",
"dynamodb:DeleteTable",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:BatchWriteItem",
"dynamodb:BatchGetItem"
],
"Resource": "*"
}
]
}
実際の運用では、より制限された権限を設定することをお勧めします。例えば、特定のテーブルに対する操作のみを許可するなど、最小権限の原則に従って設定しましょう。
DynamoDBテーブルの作成
Step FunctionsからDynamoDBテーブルを作成するには、AWS SDK統合のcreateTable APIアクションを使用します。以下は、テーブルを作成するTaskステートの例です:
{
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:createTable",
"Parameters": {
"TableName": "MyTable",
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
},
"ResultPath": "$.tableInfo",
"Next": "NextState"
}
このタスクは、「MyTable」という名前のテーブルを作成します。プライマリキーは「id」という文字列型の属性です。
DynamoDB項目の取得(GetItem)
DynamoDBから項目を取得するには、最適化された統合のgetItem APIアクションを使用します:
{
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "MyTable",
"Key": {
"id": {
"S.$": "$.itemId"
}
}
},
"ResultPath": "$.item",
"Next": "NextState"
}
このタスクは、入力の$.itemIdの値をキーとして使用し、「MyTable」から項目を取得します。結果は$.itemに格納されます。
AWS SDK統合を使用する場合は、以下のように記述します:
{
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:getItem",
"Parameters": {
"TableName": "MyTable",
"Key": {
"id": {
"S.$": "$.itemId"
}
},
"ConsistentRead": true
},
"ResultPath": "$.item",
"Next": "NextState"
}
DynamoDB項目の追加(PutItem)
テーブルに新しい項目を追加するには、putItem APIアクションを使用します:
{
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "MyTable",
"Item": {
"id": {
"S.$": "$.itemId"
},
"name": {
"S.$": "$.name"
},
"age": {
"N.$": "$.age"
},
"timestamp": {
"N.$": "$$.State.EnteredTime"
}
},
"ReturnValues": "ALL_OLD"
},
"ResultPath": "$.putResult",
"Next": "NextState"
}
このタスクは、入力からitemId、name、ageの値を取得し、現在の時刻とともに新しい項目として「MyTable」に追加します。
DynamoDB項目の更新(UpdateItem)
既存の項目を更新するには、updateItem APIアクションを使用します:
{
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "MyTable",
"Key": {
"id": {
"S.$": "$.itemId"
}
},
"UpdateExpression": "SET #name = :name, #status = :status, #updatedAt = :timestamp",
"ExpressionAttributeNames": {
"#name": "name",
"#status": "status",
"#updatedAt": "updatedAt"
},
"ExpressionAttributeValues": {
":name": {
"S.$": "$.newName"
},
":status": {
"S": "UPDATED"
},
":timestamp": {
"N.$": "$$.State.EnteredTime"
}
},
"ReturnValues": "ALL_NEW"
},
"ResultPath": "$.updateResult",
"Next": "NextState"
}
このタスクは、指定されたIDの項目の名前とステータスを更新し、更新時刻も記録します。
DynamoDB項目の削除(DeleteItem)
項目を削除するには、deleteItem APIアクションを使用します:
{
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:deleteItem",
"Parameters": {
"TableName": "MyTable",
"Key": {
"id": {
"S.$": "$.itemId"
}
},
"ReturnValues": "ALL_OLD"
},
"ResultPath": "$.deleteResult",
"Next": "NextState"
}
このタスクは、指定されたIDの項目を「MyTable」から削除します。
DynamoDBのクエリ操作(Query)
特定の条件に一致する項目を検索するには、AWS SDK統合のquery APIアクションを使用します:
{
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:query",
"Parameters": {
"TableName": "MyTable",
"IndexName": "GSI1",
"KeyConditionExpression": "status = :status AND createdAt > :date",
"ExpressionAttributeValues": {
":status": {
"S": "PENDING"
},
":date": {
"S.$": "$.startDate"
}
}
},
"ResultPath": "$.queryResult",
"Next": "NextState"
}
このタスクは、ステータスが「PENDING」で、指定された日付より後に作成された項目を検索します。
DynamoDBのスキャン操作(Scan)
テーブル全体をスキャンするには、AWS SDK統合のscan APIアクションを使用します:
{
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:scan",
"Parameters": {
"TableName": "MyTable",
"FilterExpression": "contains(#tags, :tag)",
"ExpressionAttributeNames": {
"#tags": "tags"
},
"ExpressionAttributeValues": {
":tag": {
"S.$": "$.tagToSearch"
}
}
},
"ResultPath": "$.scanResult",
"Next": "NextState"
}
このタスクは、「tags」属性に特定のタグを含む項目をすべて検索します。
バッチ操作(BatchGetItem、BatchWriteItem)
複数の項目を一度に取得するには、AWS SDK統合のbatchGetItem APIアクションを使用します:
{
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:batchGetItem",
"Parameters": {
"RequestItems": {
"MyTable": {
"Keys": [
{
"id": {
"S.$": "$.ids[0]"
}
},
{
"id": {
"S.$": "$.ids[1]"
}
}
]
}
}
},
"ResultPath": "$.batchGetResult",
"Next": "NextState"
}
複数の項目を一度に書き込むには、AWS SDK統合のbatchWriteItem APIアクションを使用します:
{
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:batchWriteItem",
"Parameters": {
"RequestItems": {
"MyTable": [
{
"PutRequest": {
"Item": {
"id": {
"S.$": "$.items[0].id"
},
"name": {
"S.$": "$.items[0].name"
}
}
}
},
{
"DeleteRequest": {
"Key": {
"id": {
"S.$": "$.itemToDelete"
}
}
}
}
]
}
},
"ResultPath": "$.batchWriteResult",
"Next": "NextState"
}
このタスクは、一つの項目を追加し、別の項目を削除するバッチ操作を実行します。
実践的なワークフロー例:タスク管理システム
以下は、DynamoDBを使用したタスク管理システムの実践的なワークフロー例です:
{
"Comment": "タスク管理ワークフロー",
"StartAt": "CreateTask",
"States": {
"CreateTask": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "Tasks",
"Item": {
"taskId": {
"S.$": "$.taskId"
},
"timestamp": {
"N.$": "$$.State.EnteredTime"
},
"status": {
"S": "STARTED"
},
"description": {
"S.$": "$.description"
}
}
},
"ResultPath": "$.createResult",
"Next": "WaitForProcessing"
},
"WaitForProcessing": {
"Type": "Wait",
"Seconds": 30,
"Next": "UpdateTaskStatus"
},
"UpdateTaskStatus": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "Tasks",
"Key": {
"taskId": {
"S.$": "$.taskId"
}
},
"UpdateExpression": "SET #status = :status, #completedAt = :timestamp",
"ExpressionAttributeNames": {
"#status": "status",
"#completedAt": "completedAt"
},
"ExpressionAttributeValues": {
":status": {
"S": "COMPLETED"
},
":timestamp": {
"N.$": "$$.State.EnteredTime"
}
},
"ReturnValues": "ALL_NEW"
},
"ResultPath": "$.updateResult",
"Next": "GetTaskDetails"
},
"GetTaskDetails": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "Tasks",
"Key": {
"taskId": {
"S.$": "$.taskId"
}
}
},
"ResultPath": "$.taskDetails",
"End": true
}
}
}
このワークフローは以下の処理を行います:
- 新しいタスクをDynamoDBに作成し、ステータスを「STARTED」に設定
- 30秒間待機(タスク処理をシミュレート)
- タスクのステータスを「COMPLETED」に更新し、完了時刻を記録
- 最終的なタスクの詳細を取得
エラーハンドリング
AWS SDK統合を使用する際のエラーハンドリングは重要です。DynamoDB操作に関連するエラーは、DynamoDB.ErrorNameの形式で返されます。例えば、テーブルが存在しない場合はDynamoDB.ResourceNotFoundExceptionエラーが発生します。
以下は、エラーハンドリングを含むテーブル作成の例です:
{
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:createTable",
"Parameters": {
"TableName": "MyTable",
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
},
"Catch": [
{
"ErrorEquals": ["DynamoDB.ResourceInUseException"],
"Next": "TableAlreadyExists"
},
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleOtherErrors"
}
],
"Next": "TableCreated"
}
条件付き操作
DynamoDBでは、条件式を使用して項目の操作を制御できます。以下は、条件付き更新の例です:
{
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "MyTable",
"Key": {
"id": {
"S.$": "$.itemId"
}
},
"UpdateExpression": "SET #status = :newStatus",
"ConditionExpression": "#status = :currentStatus",
"ExpressionAttributeNames": {
"#status": "status"
},
"ExpressionAttributeValues": {
":newStatus": {
"S": "APPROVED"
},
":currentStatus": {
"S": "PENDING"
}
},
"ReturnValues": "ALL_NEW"
},
"Catch": [
{
"ErrorEquals": ["DynamoDB.ConditionalCheckFailedException"],
"Next": "ConditionFailed"
}
],
"Next": "UpdateSucceeded"
}
この例では、項目のステータスが「PENDING」の場合のみ、「APPROVED」に更新します。条件が満たされない場合は、ConditionalCheckFailedExceptionエラーが発生し、「ConditionFailed」ステートに遷移します。
まとめ
この記事では、AWS Step FunctionsからAWS SDK統合を使用してAmazon DynamoDBの操作を行う方法について解説しました。テーブルの作成、項目の追加・更新・取得・削除、クエリ、スキャン、バッチ操作など、基本的なDynamoDB操作をStep Functionsから直接実行できることがわかりました。
AWS SDK統合を活用することで、Lambda関数を作成することなく、Step Functionsのワークフロー内でDynamoDBを操作できます。これにより、サーバーレスアプリケーションの開発がより簡単になり、コードの保守性も向上します。
実際のプロジェクトでは、セキュリティのためにIAMロールの権限を最小限に設定し、エラーハンドリングを適切に実装することをお勧めします。また、大量のデータを扱う場合は、DynamoDBのスロットリング制限やStep Functionsの入出力サイズ制限(256KB)に注意してください。