AWS Step FunctionsでDynamoDBを操作するためのAWS SDK統合

AWS Step FunctionsからAWS SDKを使用してDynamoDBテーブルの作成、項目の追加・更新・削除などを行う方法を解説します

はじめに

AWS Step Functionsは、AWSのサーバーレスワークフローサービスで、複数のAWSサービスを組み合わせて複雑なビジネスロジックを実装できます。Step Functionsの強力な機能の一つに「AWS SDK統合」があります。これにより、Lambda関数を作成することなく、Step Functionsから直接AWSサービスのAPIを呼び出すことができます。

この記事では、AWS Step FunctionsからAWS SDK統合を使用してAmazon DynamoDBの操作を行う方法について詳しく解説します。テーブルの作成、項目の追加・更新・取得・削除など、基本的なDynamoDB操作をStep Functionsから直接実行する方法を学びましょう。

AWS SDK統合とDynamoDBの最適化統合

Step FunctionsでDynamoDBを操作するには、2つの方法があります:

  1. 最適化された統合(Optimized Integration): 基本的なCRUD操作(GetItem、PutItem、UpdateItem、DeleteItem)に特化した統合方法です。
  2. AWS SDK統合: DynamoDBのすべてのAPI操作にアクセスできる汎用的な統合方法です。

最適化された統合は、以下の形式でリソースを指定します:

arn:aws:states:::dynamodb:操作名

例えば、項目を取得するには:

arn:aws:states:::dynamodb:getItem

AWS SDK統合は、以下の形式でリソースを指定します:

arn:aws:states:::aws-sdk:dynamodb:操作名

例えば、テーブルを作成するには:

arn:aws:states:::aws-sdk:dynamodb:createTable

IAMロールの設定

Step FunctionsからDynamoDB操作を行うには、適切なIAMロールの設定が必要です。以下は、DynamoDBテーブルの作成、項目の操作などを行うために必要な権限を持つポリシーの例です:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetItem",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem",
        "dynamodb:CreateTable",
        "dynamodb:DeleteTable",
        "dynamodb:Query",
        "dynamodb:Scan",
        "dynamodb:BatchWriteItem",
        "dynamodb:BatchGetItem"
      ],
      "Resource": "*"
    }
  ]
}

実際の運用では、より制限された権限を設定することをお勧めします。例えば、特定のテーブルに対する操作のみを許可するなど、最小権限の原則に従って設定しましょう。

DynamoDBテーブルの作成

Step FunctionsからDynamoDBテーブルを作成するには、AWS SDK統合のcreateTable APIアクションを使用します。以下は、テーブルを作成するTaskステートの例です:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:dynamodb:createTable",
  "Parameters": {
    "TableName": "MyTable",
    "AttributeDefinitions": [
      {
        "AttributeName": "id",
        "AttributeType": "S"
      }
    ],
    "KeySchema": [
      {
        "AttributeName": "id",
        "KeyType": "HASH"
      }
    ],
    "ProvisionedThroughput": {
      "ReadCapacityUnits": 5,
      "WriteCapacityUnits": 5
    }
  },
  "ResultPath": "$.tableInfo",
  "Next": "NextState"
}

このタスクは、「MyTable」という名前のテーブルを作成します。プライマリキーは「id」という文字列型の属性です。

DynamoDB項目の取得(GetItem)

DynamoDBから項目を取得するには、最適化された統合のgetItem APIアクションを使用します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::dynamodb:getItem",
  "Parameters": {
    "TableName": "MyTable",
    "Key": {
      "id": {
        "S.$": "$.itemId"
      }
    }
  },
  "ResultPath": "$.item",
  "Next": "NextState"
}

このタスクは、入力の$.itemIdの値をキーとして使用し、「MyTable」から項目を取得します。結果は$.itemに格納されます。

AWS SDK統合を使用する場合は、以下のように記述します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:dynamodb:getItem",
  "Parameters": {
    "TableName": "MyTable",
    "Key": {
      "id": {
        "S.$": "$.itemId"
      }
    },
    "ConsistentRead": true
  },
  "ResultPath": "$.item",
  "Next": "NextState"
}

DynamoDB項目の追加(PutItem)

テーブルに新しい項目を追加するには、putItem APIアクションを使用します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::dynamodb:putItem",
  "Parameters": {
    "TableName": "MyTable",
    "Item": {
      "id": {
        "S.$": "$.itemId"
      },
      "name": {
        "S.$": "$.name"
      },
      "age": {
        "N.$": "$.age"
      },
      "timestamp": {
        "N.$": "$$.State.EnteredTime"
      }
    },
    "ReturnValues": "ALL_OLD"
  },
  "ResultPath": "$.putResult",
  "Next": "NextState"
}

このタスクは、入力からitemIdnameageの値を取得し、現在の時刻とともに新しい項目として「MyTable」に追加します。

DynamoDB項目の更新(UpdateItem)

既存の項目を更新するには、updateItem APIアクションを使用します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::dynamodb:updateItem",
  "Parameters": {
    "TableName": "MyTable",
    "Key": {
      "id": {
        "S.$": "$.itemId"
      }
    },
    "UpdateExpression": "SET #name = :name, #status = :status, #updatedAt = :timestamp",
    "ExpressionAttributeNames": {
      "#name": "name",
      "#status": "status",
      "#updatedAt": "updatedAt"
    },
    "ExpressionAttributeValues": {
      ":name": {
        "S.$": "$.newName"
      },
      ":status": {
        "S": "UPDATED"
      },
      ":timestamp": {
        "N.$": "$$.State.EnteredTime"
      }
    },
    "ReturnValues": "ALL_NEW"
  },
  "ResultPath": "$.updateResult",
  "Next": "NextState"
}

このタスクは、指定されたIDの項目の名前とステータスを更新し、更新時刻も記録します。

DynamoDB項目の削除(DeleteItem)

項目を削除するには、deleteItem APIアクションを使用します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::dynamodb:deleteItem",
  "Parameters": {
    "TableName": "MyTable",
    "Key": {
      "id": {
        "S.$": "$.itemId"
      }
    },
    "ReturnValues": "ALL_OLD"
  },
  "ResultPath": "$.deleteResult",
  "Next": "NextState"
}

このタスクは、指定されたIDの項目を「MyTable」から削除します。

DynamoDBのクエリ操作(Query)

特定の条件に一致する項目を検索するには、AWS SDK統合のquery APIアクションを使用します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:dynamodb:query",
  "Parameters": {
    "TableName": "MyTable",
    "IndexName": "GSI1",
    "KeyConditionExpression": "status = :status AND createdAt > :date",
    "ExpressionAttributeValues": {
      ":status": {
        "S": "PENDING"
      },
      ":date": {
        "S.$": "$.startDate"
      }
    }
  },
  "ResultPath": "$.queryResult",
  "Next": "NextState"
}

このタスクは、ステータスが「PENDING」で、指定された日付より後に作成された項目を検索します。

DynamoDBのスキャン操作(Scan)

テーブル全体をスキャンするには、AWS SDK統合のscan APIアクションを使用します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:dynamodb:scan",
  "Parameters": {
    "TableName": "MyTable",
    "FilterExpression": "contains(#tags, :tag)",
    "ExpressionAttributeNames": {
      "#tags": "tags"
    },
    "ExpressionAttributeValues": {
      ":tag": {
        "S.$": "$.tagToSearch"
      }
    }
  },
  "ResultPath": "$.scanResult",
  "Next": "NextState"
}

このタスクは、「tags」属性に特定のタグを含む項目をすべて検索します。

バッチ操作(BatchGetItem、BatchWriteItem)

複数の項目を一度に取得するには、AWS SDK統合のbatchGetItem APIアクションを使用します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:dynamodb:batchGetItem",
  "Parameters": {
    "RequestItems": {
      "MyTable": {
        "Keys": [
          {
            "id": {
              "S.$": "$.ids[0]"
            }
          },
          {
            "id": {
              "S.$": "$.ids[1]"
            }
          }
        ]
      }
    }
  },
  "ResultPath": "$.batchGetResult",
  "Next": "NextState"
}

複数の項目を一度に書き込むには、AWS SDK統合のbatchWriteItem APIアクションを使用します:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:dynamodb:batchWriteItem",
  "Parameters": {
    "RequestItems": {
      "MyTable": [
        {
          "PutRequest": {
            "Item": {
              "id": {
                "S.$": "$.items[0].id"
              },
              "name": {
                "S.$": "$.items[0].name"
              }
            }
          }
        },
        {
          "DeleteRequest": {
            "Key": {
              "id": {
                "S.$": "$.itemToDelete"
              }
            }
          }
        }
      ]
    }
  },
  "ResultPath": "$.batchWriteResult",
  "Next": "NextState"
}

このタスクは、一つの項目を追加し、別の項目を削除するバッチ操作を実行します。

実践的なワークフロー例:タスク管理システム

以下は、DynamoDBを使用したタスク管理システムの実践的なワークフロー例です:

{
  "Comment": "タスク管理ワークフロー",
  "StartAt": "CreateTask",
  "States": {
    "CreateTask": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:putItem",
      "Parameters": {
        "TableName": "Tasks",
        "Item": {
          "taskId": {
            "S.$": "$.taskId"
          },
          "timestamp": {
            "N.$": "$$.State.EnteredTime"
          },
          "status": {
            "S": "STARTED"
          },
          "description": {
            "S.$": "$.description"
          }
        }
      },
      "ResultPath": "$.createResult",
      "Next": "WaitForProcessing"
    },
    "WaitForProcessing": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "UpdateTaskStatus"
    },
    "UpdateTaskStatus": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:updateItem",
      "Parameters": {
        "TableName": "Tasks",
        "Key": {
          "taskId": {
            "S.$": "$.taskId"
          }
        },
        "UpdateExpression": "SET #status = :status, #completedAt = :timestamp",
        "ExpressionAttributeNames": {
          "#status": "status",
          "#completedAt": "completedAt"
        },
        "ExpressionAttributeValues": {
          ":status": {
            "S": "COMPLETED"
          },
          ":timestamp": {
            "N.$": "$$.State.EnteredTime"
          }
        },
        "ReturnValues": "ALL_NEW"
      },
      "ResultPath": "$.updateResult",
      "Next": "GetTaskDetails"
    },
    "GetTaskDetails": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:getItem",
      "Parameters": {
        "TableName": "Tasks",
        "Key": {
          "taskId": {
            "S.$": "$.taskId"
          }
        }
      },
      "ResultPath": "$.taskDetails",
      "End": true
    }
  }
}

このワークフローは以下の処理を行います:

  1. 新しいタスクをDynamoDBに作成し、ステータスを「STARTED」に設定
  2. 30秒間待機(タスク処理をシミュレート)
  3. タスクのステータスを「COMPLETED」に更新し、完了時刻を記録
  4. 最終的なタスクの詳細を取得

エラーハンドリング

AWS SDK統合を使用する際のエラーハンドリングは重要です。DynamoDB操作に関連するエラーは、DynamoDB.ErrorNameの形式で返されます。例えば、テーブルが存在しない場合はDynamoDB.ResourceNotFoundExceptionエラーが発生します。

以下は、エラーハンドリングを含むテーブル作成の例です:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:dynamodb:createTable",
  "Parameters": {
    "TableName": "MyTable",
    "AttributeDefinitions": [
      {
        "AttributeName": "id",
        "AttributeType": "S"
      }
    ],
    "KeySchema": [
      {
        "AttributeName": "id",
        "KeyType": "HASH"
      }
    ],
    "ProvisionedThroughput": {
      "ReadCapacityUnits": 5,
      "WriteCapacityUnits": 5
    }
  },
  "Catch": [
    {
      "ErrorEquals": ["DynamoDB.ResourceInUseException"],
      "Next": "TableAlreadyExists"
    },
    {
      "ErrorEquals": ["States.ALL"],
      "Next": "HandleOtherErrors"
    }
  ],
  "Next": "TableCreated"
}

条件付き操作

DynamoDBでは、条件式を使用して項目の操作を制御できます。以下は、条件付き更新の例です:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::dynamodb:updateItem",
  "Parameters": {
    "TableName": "MyTable",
    "Key": {
      "id": {
        "S.$": "$.itemId"
      }
    },
    "UpdateExpression": "SET #status = :newStatus",
    "ConditionExpression": "#status = :currentStatus",
    "ExpressionAttributeNames": {
      "#status": "status"
    },
    "ExpressionAttributeValues": {
      ":newStatus": {
        "S": "APPROVED"
      },
      ":currentStatus": {
        "S": "PENDING"
      }
    },
    "ReturnValues": "ALL_NEW"
  },
  "Catch": [
    {
      "ErrorEquals": ["DynamoDB.ConditionalCheckFailedException"],
      "Next": "ConditionFailed"
    }
  ],
  "Next": "UpdateSucceeded"
}

この例では、項目のステータスが「PENDING」の場合のみ、「APPROVED」に更新します。条件が満たされない場合は、ConditionalCheckFailedExceptionエラーが発生し、「ConditionFailed」ステートに遷移します。

まとめ

この記事では、AWS Step FunctionsからAWS SDK統合を使用してAmazon DynamoDBの操作を行う方法について解説しました。テーブルの作成、項目の追加・更新・取得・削除、クエリ、スキャン、バッチ操作など、基本的なDynamoDB操作をStep Functionsから直接実行できることがわかりました。

AWS SDK統合を活用することで、Lambda関数を作成することなく、Step Functionsのワークフロー内でDynamoDBを操作できます。これにより、サーバーレスアプリケーションの開発がより簡単になり、コードの保守性も向上します。

実際のプロジェクトでは、セキュリティのためにIAMロールの権限を最小限に設定し、エラーハンドリングを適切に実装することをお勧めします。また、大量のデータを扱う場合は、DynamoDBのスロットリング制限やStep Functionsの入出力サイズ制限(256KB)に注意してください。

参考資料