皆さんはDynamoDB Streamsって使ったことはありますか?
これがまた便利な仕組みで、使いこなすことができればDynamoDBウィザードになれるかもです。
目次
1. DynamoDB Streamsとは
DynamoDB Streamsは、テーブル内のデータ変更(追加、更新、削除)をリアルタイムで検知し、ストリームとして提供する機能です。
このストリームを利用することで、例えばデータの更新が発生した際にLambda関数をトリガーとして実行させることが可能となります。
その際、Lambda関数にはDBテーブルの変更履歴が渡されます。
2. システム構成例
・登録処理ではDynamoDBにデータが登録され、S3にもデータがアップロードされる
・DBデータにはTTLが設定されており、有効期限が過ぎるとデータが削除される
このようなシステムがあった場合、DynamoDB上のデータは時間経過で自動削除されていくも、
S3上のオブジェクトはジャンクデータとして残り続けます。
★ 構成イメージ
★ TTLでDBデータ自動削除後
そんなときに活用できるのが、DynamoDB Streamsです。
3. DynamoDB Streams導入
DBデータが削除されたら、自動でS3上の関連データも削除したいものです。
・DBデータがTTLで削除されたことを検知したい
・削除されたDBデータを取得し、Lambda関数を実行したい
実現したいことはこのような感じです。
ストリームの設定
商品テーブルにストリームの設定を行います。
「エクスポートおよびストリーム」タブより、「DynamoDBストリームの詳細」項目内の「オンにする」ボタンを押下
「ストリームをオンにする」ボタンを押下
※表示タイプとは、ストリームに送信するデータの内容です。
今回は変更されたデータのPK/SKのみが送信される「キー属性のみ」を選択しました。
これでDynamoDB側は設定完了です。
Lambdaの設定
DynamoDBストリームの送信先として、Lambdaを設定します。
TTLで削除されたデータを受け取り、そのデータと紐付く商品画像をS3上から削除する処理を書きます。
Lambdaのソースコードは次の通りです。
import os
import boto3
def lambda_handler(event, context):
for record in event['Records']:
# 削除レコードのみ処理
if record['eventName'] != 'REMOVE':
return
# S3クライアント生成
s3_client = boto3.client('s3')
# S3より商品画像ファイル削除
product_id = record['dynamodb']['Keys']['パーティションキー']['S']
s3_client.delete_object(
Bucket = "S3バケット名",
Key = f"{product_id}.png"
)
「トリガーを追加」ボタンを押下
トリガーには「DynamoDB」を選択し、対象となるテーブルを選択します。
「トリガーをアクティブ化」にチェックを付け、「追加」ボタンを押下
※Lambdaの実行ロールには、DynamoDB、及びS3に接続するためのポリシーが必要です。
今回はテストなので、「AWSLambdaInvocation-DynamoDB」と「AmazonS3FullAccess」を設定しました。
これでLambda側も設定完了です。
4. 動作確認
現在のDBとS3の内容です。
★ DynamoDB
★ S3
~~~ TTLにより、商品ID : 1のデータ自動削除 ~~~
★ DynamoDB
DBデータが削除されたため、Lambda関数がトリガーされてS3のデータを削除してくれるはず.....
★ S3
削除されていました!
5. 終わりに
DynamoDB Streamsを導入することで、データの変化をリアルタイムで検知できるようになり、特にデータ駆動型のアプリケーション開発において非常に強力なツールとなるでしょう。
今回は限定的な使い方でしたが、
さまざまな使い方を考え、DynamoDBのエキスパートとして君臨しましょう!