ブログ

AWS Lambda で Amazon Aurora のスロークエリログを Slack 通知する

はじめに

こんにちは! ソフトウェアエンジニアの新幡駿と笠井誠斗です。

フライウィールでは、企業のデータ利活用をさまざまな形で支援しています。その一環として、処理されたデータを元に顧客が操作できるように、顧客向け管理画面の Web アプリケーションも開発しています。また、それを支えるデータベースとして Amazon Aurora MySQL がよく用いられています。

そんな管理画面の開発の際に、処理のボトルネックとなるような実行時間がかかる SQL クエリ、いわゆる「スロークエリ」が問題となることがあります。そのようなスロークエリを早く検知・解決することは、長期的に見てシステムの運用負荷の軽減につながります。そこでフライウィールでは、Aurora MySQL の スロークエリを Slack に通知してくれる Slack bot を開発、導入しています。

上記画像がスロークエリ通知 Slack bot の投稿の例です。 処理にかかった時間や、 どんな内容のクエリだったのかが表示されています。クエリ本文は、 syntax highlight を活用するため、メッセージに直接埋め込むのではなく、SQL のファイルタイプを指定したファイルを添付する形をとっています。

今回の記事では、このようなスロークエリ通知 Slack bot の作成の仕方、及び実際に運用して得られた知見を紹介したいと思います。

全体の構成

全体の構成からご紹介します。

まず、 Aurora MySQL から CloudWatch Logs にスロークエリログを出力するように設定します。そして、そのスロークエリログを AWS Lambda を用いて Slack bot に投稿させるようにします。なお、Slack bot の token は AWS Secrets Manger に登録し、その secret の ID と投稿先 Slack channel の ID を環境変数として Lambda function に渡しています。

この構成や実装に際しては、以下のブログを参考にさせていただきましたので、この記事と併せてご参照ください。
Amazon Aurora のスロークエリを Slack に通知する – enomotodev’s blog

次からそれぞれの構成要素を詳しく見ていきましょう。

スロークエリログ出力の設定

まず、Amazon RDS のスロークエリログを CloudWatch Logs に出力するように設定する必要があります。

この設定は、クラスタを選択して Modify(変更)ボタンを押し、 Additional configuration (追加設定) > Log exports(ログのエクスポート) の Slow query log(スロークエリログ) のチェックボックスを選択することで可能です。

以下の公式ドキュメントもご参照ください。
Publishing Amazon Aurora MySQL logs to Amazon CloudWatch Logs – Amazon Aurora 

Slack bot の設定

次に Slack bot の設定を見ていきましょう。

Slack API: Applications のページから Slack App を作成することができます。上記の通知例の通り、syntax highlight のためにクエリ本文をファイルとして添付する形としたかったため、 Bot Token Scopes として files:write を追加しています。

AWS Secrets Manager への Slack App bot token の登録

AWS Secrets Manager に、前述の Slack App bot token を登録します。これにより、 Lambda Function がセキュアに token を取得して使用できます。フライウィールでは後述する Terraform で Secret の管理を行っていることもあり、詳しい設定方法はここでは割愛いたしますが、手動で設定する場合には以下のドキュメントをご参照ください。
Create an AWS Secrets Manager secret – AWS Secrets Manager 

AWS Lambda のコード

スロークエリログを受け取り、Slack 通知を行うような Lambda function を作成します。

実装の大まかな流れとしては以下のようになります。

1.   CloudWatch Logs からログが Base64 エンコードされた zip ファイルとして渡されるのでそれを decode し unzip する
2.   parse_slowquery_message という関数でスロークエリログを正規表現でのマッチングによりパースする
3.   パースした結果からユーザ名を取得し、ログを通知するか決める
4.   Secrets Manager から Slack App bot token を取得する
5.   Slack の file upload を用いて、スロークエリを投稿する

以降、それぞれのステップについて説明した後で、最後にコード全体を紹介します。

1. CloudWatch Logs から decode して unzip する

Using Lambda with CloudWatch Logs – AWS Lambda にあるように、 CloudWatch Logs は Base64 エンコードされた zip ファイルとして Lambda function に渡されることになります。そのため、最初に以下のように  unzip する処理が必要です。

buffer = BytesIO(base64.b64decode(json.dumps(event["awslogs"]["data"])))
with gzip.GzipFile(mode="rb", fileobj=buffer) as sb:
    data = sb.read()

2. スロークエリのパース

ステップ 1 の結果として得られるスロークエリは以下のような形をしています。

# Time: 2023-10-22T02:57:55.655927Z
# User@Host: xxxxxxxxx[xxxxxxxxx] @ [10.13.103.170] Id: 2638113
# Query_time: 35.549734 Lock_time: 0.000164 Rows_sent: 1 Rows_examined: 15535
use xxxx;
SET timestamp=1697943475;
SELECT SLEEP(20);

このスロークエリログから、クエリを発行したユーザ名やクエリ自体などを抽出するために、正規表現によるマッチングを以下のように実装しています。

def parse_slowquery_message(text: str) -> dict[str, str]:
    """Parse slow query log sent from cloudwatch."""
    pattern = re.compile(
        r"""
        User@Host:\ ([\w-]+)\[[\w-]+\]\ @.*     # user
        Id:\s* (\d+).*                          # id
        Query_time:\ ([\d\.]+).*                # query_time
        Lock_time:\ ([\d\.]+).*                 # lock_time
        Rows_sent:\ (\d+).*                     # rows_sent
        Rows_examined:\ (\d+)\n                 # rows_examined
        (.*)                                    # query
        """,
        re.S | re.X,
    ) 
    match = re.search(pattern, text)
    if not match:
        raise ValueError(f"Failed to parse text: {text}")
    return {
        "user": match[1],
        "id": match[2],
        "query_time": match[3],
        "lock_time": match[4],
        "rows_sent": match[5],
        "rows_examined": match[6],
        "query": match[7],
    }

この実装の正規表現については https://github.com/fiverr/node-rds-slow-log-parse/blob/master/internal/pattern.go を参考にしています。

3. パースした結果からユーザ名を取得し、ログを通知するか決める

スロークエリの原因となったクエリを発行したユーザ名をチェックし、Slack にログを通知するか、あるいはスキップして無視するかを決定します。parse_slowquery_message で得られた辞書型のオブジェクトのうち、 user フィールドにユーザ名が入っているので、そこをチェックします。

これは、通知を行う際には特定のユーザーだけ監視したい、という要望によるものです。例えば、Web アプリケーションの発行したクエリは処理時間の短さが重要であり注目したい一方で、データパイプラインが発行するクエリやオペレータが障害調査や運用のために発行したクエリなどについては、処理時間が多少かかっても問題がないため通知する必要がありません。

ここでは環境変数 USER_NAME で監視対象にしたいユーザ名を指定する形になっています。具体的にはこの環境変数に Web アプリケーションがデータベースアクセスに使用しているユーザ名を設定することになります。

res = parse_slowquery_message(message)
if not res["user"] == USER_NAME:
    logger.fatal("Query is from user %s, skipping", res["user"])
    sys.exit(0)

4. Secrets Manager から Slack App bot token を取得する

Slack に投稿する際の bot token を取得するために Secrets Manager を呼び出します。この際、Secret 取得時に指定する ID について、環境変数 SLACK_BOT_SECRET_ID の値を使用する形にしています。こうすることで、「環境変数を変えれば別の Secret ID を設定できる」という形になり、再利用性が高まります。

secrets_manager_client = boto3.client(service_name="secretsmanager")
get_secret_value_response = secrets_manager_client.get_secret_value(
    SecretId=SLACK_BOT_SECRET_ID
)

5. Slack の file upload を用いて、スロークエリを投稿する

files.upload method | Slack を用いて、スロークエリを投稿します。前述の通り、通知の表示時に syntax highlight が効くよう、クエリ本文は file type に SQL を指定したファイルを添付する形としています。また、クエリ本文以外の情報も通知に含めるため、 Slack コメント本文である  initial_comment を指定しそこに Query Time などの情報を付与しています。


    initial_comment = textwrap.dedent(
        f"""
        *New Slow Query Alert* :rotating_light:
        Author {res['user']} : {res['id']}
        Query Time: {res['query_time']}
        Lock Time: {res['lock_time']}
        Rows Sent: {res['rows_sent']}
        Rows Examined: {res['rows_examined']}
    """
    )
    try:
        result = slack_client.files_upload(
            channels=SLACK_CHANNEL_ID,
            initial_comment=initial_comment,
            content=res["query"],
            filetype="sql",
            title="New Slow Query Alert",
        )
        logger.info(result)

    except SlackApiError as e:
        logger.error(f"Error uploading file: {e}")
        raise e

実際に Lambda function を作成する際は、 import されているパッケージを含めた zip ファイルを作成する必要があります。詳しくは Working with .zip file archives for Python Lambda functions – AWS Lambda をご覧ください。

以下にコード全体を記載いたします。少し長いので、クリックして開いてご覧ください。

クリックして AWS Lambda のコード全体を開く
import base64
import gzip
import json
import logging
import os
import re
import sys
import textwrap
from io import BytesIO

import boto3
from aws_lambda_typing import context as context_
from aws_lambda_typing import events
from botocore.exceptions import ClientError
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

logger = logging.getLogger()
logger.setLevel(logging.INFO)
SLACK_CHANNEL_ID = os.environ.get("SLACK_CHANNEL_ID") # 投稿する Slack のチャンネルの Id
SLACK_BOT_SECRET_ID = os.environ.get("SLACK_BOT_SECRET_ID") # AWS Secrets Manager に登録した Secrets の Id
USER_NAME = os.environ.get("USER_NAME") # スロークエリをモニタリングしたデータベースのユーザ


def parse_slowquery_message(text: str) -> dict[str, str]:
    """Parse slow query log sent from cloudwatch."""
    pattern = re.compile(
        r"""
        User@Host:\ ([\w-]+)\[[\w-]+\]\ @.*     # user
        Id:\s* (\d+).*                          # id
        Query_time:\ ([\d\.]+).*                # query_time
        Lock_time:\ ([\d\.]+).*                 # lock_time
        Rows_sent:\ (\d+).*                     # rows_sent
        Rows_examined:\ (\d+)\n                 # rows_examined
        (.*)                                    # query
        """,
        re.S | re.X,
    )
    logger.info(text)
    match = re.search(pattern, text)
    if not match:
        raise ValueError(f"Failed to parse text: {text}")
    return {
        "user": match[1],
        "id": match[2],
        "query_time": match[3],
        "lock_time": match[4],
        "rows_sent": match[5],
        "rows_examined": match[6],
        "query": match[7],
    }

def lambda_handler(
    event: events.CloudWatchLogsEvent, _context: context_.Context
) -> None:
    if not SLACK_CHANNEL_ID:
        raise ValueError("SLACK_CHANNEL_ID is not set")
    if not SLACK_BOT_SECRET_ID:
        raise ValueError("SLACK_BOT_SECRET_ID is not set")
    if not USER_NAME:
        raise ValueError("USER_NAME is not set")
    # Base64エンコードされた.gzipファイルアーカイブをdecodeしてunzip
    buffer = BytesIO(base64.b64decode(json.dumps(event["awslogs"]["data"])))
    with gzip.GzipFile(mode="rb", fileobj=buffer) as sb:
        data = sb.read()

    log = json.loads(data.decode("utf-8"))
    message = str(log["logEvents"][0]["message"])
    res = parse_slowquery_message(message)
    if not res["user"] == USER_NAME:
        logger.fatal("Query is from user %s, skipping", res["user"])
        sys.exit(0)
    try:
        secrets_manager_client = boto3.client(service_name="secretsmanager")
        get_secret_value_response = secrets_manager_client.get_secret_value(
            SecretId=SLACK_BOT_SECRET_ID
        )
    except ClientError as e:
        raise e
    secret = json.loads(get_secret_value_response["SecretString"])
    slack_client = WebClient(token=secret["SLACK_BOT_TOKEN"])
    initial_comment = textwrap.dedent(
        f"""
        *New Slow Query Alert* :rotating_light:
        Author {res['user']} : {res['id']}
        Query Time: {res['query_time']}
        Lock Time: {res['lock_time']}
        Rows Sent: {res['rows_sent']}
        Rows Examined: {res['rows_examined']}
    """
    )
    try:
        result = slack_client.files_upload(
            channels=SLACK_CHANNEL_ID,
            initial_comment=initial_comment,
            content=res["query"],
            filetype="sql",
            title="New Slow Query Alert",
        )
        logger.info(result)


    except SlackApiError as e:
        logger.error(f"Error uploading file: {e}")
        raise e

CloudWatch Logs Subscription Filter の設定

最後に、Lambda function が CloudWatch からスロークエリログを随時受け取るように Subscription Filter を設定します。フライウィールでは、後述する Terraform で設定行っているため説明は割愛いたしますが、手動で設定する場合には以下のドキュメントを参照ください。
Using Lambda with CloudWatch Logs – AWS Lambda
Using CloudWatch Logs subscription filters – Amazon CloudWatch

Terraform を用いたAWS リソースの宣言

さて、これまで見てきた今回のスロークエリ通知の仕組みは、特定のプロジェクトだけでなく、さまざまなプロジェクトで活用できるものです。フライウィールではさまざまなプロジェクトの開発が行われており、このセットアップを各プロジェクトに向けて毎度手動で行うのはちょっと面倒臭そうです。

フライウィールでは、Terraform を用いて宣言的にインフラストラクチャを管理しています。それを活用し、必要なAWS リソースの宣言を Terraform module として定義して再利用を可能とすることで、セットアップの手間を最小限にしています。その仕組みもご紹介します。

Variable の宣言

まず、module として使用するために変数を宣言します。具体的には以下のようになります。

variable "aws_region" {
  type = string
}

variable "aws_account_id" {
  type = string
}

variable "rds_cluster_name" {
  description = "RDS cluster to monitor slow query logs."
  type        = string
}

# Lambda の SLACK_CHANNEL_ID に渡す
variable "slack_channel_id" {
  description = "Slack channel ID to notify slow query logs."
  type        = string
}
# Slack App bot token の値
variable "slackbot_secret_value" {
  description = "Slackbot secret value stored in AWS Secrets Manager."
  type        = string  sensitive   = true
}
# Lambda の USER_NAME に渡す
variable "user_name" {
  description = "Database user name to track slow queries."
  type        = string
}

variable "lambda_s3_bucket" {
  description = "ID of the S3 bucket where the lambda zip file is stored."
  type        = string
}

variable "lambda_s3_prefix" {
  description = "Prefix of the S3 bucket where the lambda zip file is stored."
  type        = string
}

locals {
  # クラスタを指定したら自動的に RDS のスロークエリの log group 名は定まっている
  rds_slow_query_log_group_name = "/aws/rds/cluster/${var.rds_cluster_name}/slowquery"
  rds_slow_query_log_group_arn  = "arn:aws:logs:${var.aws_region}:${var.aws_account_id}:log-group:${local.rds_slow_query_log_group_name}:*"

  # Lambda の SLACK_BOT_SECRET_ID に渡す
  slackbot_secret_name          = "slowquery_slackbot_secret"
}

AWS Secrets Manager への Slack App bot token の登録

Slack App bot token の AWS Secrets Manager への登録を記述します。上記で宣言された変数のうち var.slackbot_secret_value や local.slackbot_secret_name の値を参照しています。

resource "aws_secretsmanager_secret" "slackbot_secret" {
  name = local.slackbot_secret_name
}

resource "aws_secretsmanager_secret_version" "slackbot_secret" {
  secret_id     = aws_secretsmanager_secret.slackbot_secret.id
  secret_string = var.slackbot_secret_value
}

Lambda Function / IAM Role の宣言

次に、Lambda function 本体とそれに紐づける IAM Role の設定をします。今回の Lambda function はログ (function 自体のデバッグログ) を出力するための CloudWatch Logs にログをポストする権限と、Slack トークンを取得するための AWS Secrets Manager のシークレットを取得する権限を付与しています。具体的には以下のようになります。

resource "aws_iam_role" "role_for_lambda_post_slow_query_to_slack" {
  name = "role_for_lambda_post_slow_query_to_slack"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect    = "Allow"
        Action    = "sts:AssumeRole"
        Principal = { Service = "lambda.amazonaws.com" }
      }
    ]
  })
}

resource "aws_iam_role_policy" "allow_access_to_cloudwatchlogs" {
  name = "allow_access_to_cloudwatchlogs"
  role = aws_iam_role.role_for_lambda_post_slow_query_to_slack.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy" "allow_access_to_secretsmanager" {
  name = "allow_access_to_secretsmanager"
  role = aws_iam_role.role_for_lambda_post_slow_query_to_slack.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect   = "Allow"
        Action   = "secretsmanager:GetSecretValue"
        Resource = aws_secretsmanager_secret.slackbot_secret.arn
      }
    ]
  })
}

resource "aws_lambda_function" "post_slow_query_to_slack" {
  s3_bucket     = var.lambda_s3_bucket
  s3_key        = var.lambda_s3_prefix
  function_name = "post_slow_query_to_slack_function"
  role          = aws_iam_role.role_for_lambda_post_slow_query_to_slack.arn
  handler       = "flywheel.devtools.post_slow_query_to_slack.post_slow_query_to_slack.lambda_handler"
  runtime       = "python3.11"
  memory_size   = 128
  timeout       = 300

  environment {
    variables = {
      SLACK_CHANNEL_ID    = var.slack_channel_id
      SLACK_BOT_SECRET_ID = aws_secretsmanager_secret.slackbot_secret.id
      USER_NAME           = var.user_name
    }
  }
}

 

CloudWatch Logs Subscription Filter の設定

CloudWatch Logs に Subscription Filter を設定することで、ログ出力の度にLambda function が発火する設定をします。ここで CloudWatch Logs に Lambda function を実行するための Service Role を設定する必要があったのでこれも宣言します。

resource "aws_cloudwatch_log_subscription_filter" "slow_query_logfilter" {
  name            = "slow_query_logfilter"
  log_group_name  = local.rds_slow_query_log_group_name
  filter_pattern  = ""
  destination_arn = aws_lambda_function.post_slow_query_to_slack.arn
  distribution    = "ByLogStream"
}

resource "aws_lambda_permission" "post_slow_query_to_slack_allow_lambda_execution_from_cloudwatch_logs_subscription" {
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.post_slow_query_to_slack.function_name
  principal     = "logs.${var.aws_region}.amazonaws.com"
  source_arn    = local.rds_slow_query_log_group_arn
}

Module の呼び出し

ここまでできたら、あとは module block を宣言して、terraform apply するだけで、複数プロジェクトのSetupができます。
slackbot_secret_value の指定はコードからは実施せず、Input Variables – Configuration Language | Terraform | HashiCorp Developer などを参考に apply 実行時に渡します。

module "slow_query_slack_notification" {
  source = "./modules/slow_query_slack_notification"

  aws_region = "ap-northeast-1"
  aws_account_id = "XXXXXXXXX"
  rds_cluster_name = "cluster-xxxx"
  slack_channel_id = "XXXXXX"
  user_name = "operation-xxx"
  lambda_s3_bucket = "fw-XXXXXXXXX-bucket"
  lambda_s3_prefix = "dev/slow-query-slack-notification/lambda.zip"
}

実際に運用して

さて、ここまでスロークエリ通知 Slack bot の作成方法をご紹介しましたが、フライフィールではこの bot を実際に運用して日々の業務を行なっております。
通知 bot 導入以前のスロークエリ改善のプロセスは、

1.   QA 担当者が遅い画面に気づく
2.   エンジニアにフィードバックされ、エンジニアが遅いエンドポイントを特定する
3.   エンジニアが、遅いエンドポイントを元に、スロークエリになっていそうなクエリを見つける
4.   実際に Amazon CloudWatch Logs のスロークエリログと照らしあわせる

といった形になることが多く、スロークエリを特定するのに時間がかかっていました。スロークエリが Slack に通知されることで、QA 担当者に報告される前に、スロークエリに気づくことができるようになり、運用の改善に繋がりました。

さらに、Terraform でインフラを module として宣言的に管理していたことによって、Web アプリケーションを運用している全てのプロジェクトへ展開することがとても容易にできました。

まとめ

今回の記事では、フライウィールでの運用負荷を軽減させる取り組みの一つである、スロークエリ通知 Slack bot について紹介させていただきました。フライウィールでは、Terraform でインフラが管理されているので、簡単にこの bot の仕組みを作成し、さらに複数プロジェクトに導入することができました。

フライウィールでは、このようなエンジニアが生産性を高く働けるような技術基盤があります。現在、ソフトウェアエンジニアを積極採用中ですので、少しでもご興味をお持ち頂けましたら、まずはお気軽にカジュアル面談・ご応募頂ければと思います!