Gemini API (Flash 2.5) にサーバーの障害対応ができるのか試してみた

Y
2026-01-16
2026-01-16

生成AI活用していますか?

生成AIの進化が止まりませんが、インフラエンジニアとして気になるのは
「このAI、実運用のオペレーションまで任せられたりするのか?」という点です。

そこで今回は、Googleの Gemini API (gemini-2.5-flash) を活用し、
「ログをGeminiに読ませて、状況判断と復旧コマンドの選択までをAIに委ねる」
というスクリプトをPythonで書き、実際に動作するかテストしてみました。
ちなみにPythonスクリプトについてもGeminiと会話して作っています。

構成

仕組みはシンプルです。PythonスクリプトをCronで定期実行し、異常があればGemini APIに問い合わせます。

1. 監視: リソース(CPU/Disk)と主要サービス(Nginx/MySQL等)をチェック。

2. 収集: 異常があれば、関連するログ(syslog, error.logなど)を取得。

3. 思考 (Gemini): ログとシステム情報をAPIに投げる。

4. 判断: Geminiから「原因」と「実行すべきアクション(再起動など)」を受け取る。

5. 実行: 必要に応じてスクリプトがコマンド(systemctl restart等)を実行。

6. 通知: 原因と実行内容をSlackに通知

手順


EPELリポジトリがない場合はインストール

# yum install -y epel-release

Python 3.11 本体と pip をインストール

# yum install -y python3.11 python3.11-pip

PythonスクリプトからAPI経由でGeminiを呼び出したいので、google-generativeaiをインストールします。

# python3.11 -m pip install google-generativeai requests

PythonスクリプトからAPI経由でGeminiを呼び出したいので、google-generativeaiをインストールします。

# vi /usr/local/bin/auto_healer.py

ちょっと長いですが以下スクリプトをコピペします。

#!/usr/bin/env python3.11
# -*- coding: utf-8 -*-

import socket
import subprocess
import requests
import os
import sys
import json
import time
import google.generativeai as genai
import traceback
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# ==========================================
#  設定エリア
# ==========================================

os.environ["GEMINI_API_KEY"] = "##geminiのAPIキーをこちらに登録します##"
SLACK_WEBHOOK_URL = "##SlackのWebhook URLをこちらに登録します##"
STATE_FILE = "/tmp/monitor_state.json"
MODEL_NAME = "gemini-2.5-flash"

# --- 監視しきい値 ---
THRESHOLD_CPU_LOAD = 5.0
THRESHOLD_MEM_PCT  = 90.0
DISK_WARNING_THRESHOLD = 80.0
DISK_CRITICAL_THRESHOLD = 95.0
DISK_RAPID_GROWTH_PCT = 2.0
PING_TARGET = "8.8.8.8"

# ==========================================

CANDIDATE_DB_SVCS   = ["mariadb", "mysqld", "mysql"]
CANDIDATE_WEB_SVCS  = ["nginx", "httpd", "apache2"]
CANDIDATE_PHP_SVCS  = ["php-fpm", "php8.3-fpm", "php8.2-fpm", "php8.1-fpm", "php7.4-fpm"]
CANDIDATE_SSH_SVCS  = ["sshd", "ssh"]
CANDIDATE_CRON_SVCS = ["crond", "cron"]
CANDIDATE_FTP_SVCS  = ["vsftpd.target", "proftpd", "pure-ftpd"]
CANDIDATE_SMTP_SVCS = ["postfix", "sendmail", "exim4", "exim"]

CANDIDATE_DB_LOGS   = ["/var/log/mysql/error.log", "/var/log/mariadb/mariadb.log", "/var/log/mysqld.log"]
CANDIDATE_WEB_LOGS  = ["/var/log/nginx/error.log", "/var/log/httpd/error_log", "/var/log/apache2/error.log"]
CANDIDATE_PHP_LOGS  = ["/var/log/php-fpm/error.log", "/var/log/php-fpm/www-error.log"]
CANDIDATE_SYS_LOGS  = ["/var/log/messages", "/var/log/syslog", "/var/log/secure", "/var/log/maillog", "/var/log/cron"]

# ★現在のユーザーがrootかどうか判定
IS_ROOT = (os.geteuid() == 0)

def get_global_ip():
    try: return requests.get("https://ifconfig.me", timeout=3).text.strip()
    except: return "127.0.0.1"

def run_cmd_secure(cmd_list):
    """
    一般ユーザーなら先頭に sudo を付与して実行するラッパー
    """
    final_cmd = cmd_list
    if not IS_ROOT:
        final_cmd = ["sudo"] + cmd_list
    
    # 実行 (stdoutは捨てる、stderrも捨てる)
    return subprocess.run(final_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def detect_service_name(candidates):
    for svc in candidates:
        try:
            # statusコマンドもsudo経由で行う(念のため)
            res = run_cmd_secure(["systemctl", "status", svc])
            if res.returncode != 4: return svc
        except: continue
    return None

def detect_log_path(candidates):
    for path in candidates:
        if os.path.exists(path): return path
    return None

def check_port(host, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(3)
    try:
        res = s.connect_ex((host, port))
        s.close()
        return res == 0
    except: return False

def check_http(url):
    try:
        headers = {'User-Agent': 'HealthCheckBot/1.0'}
        res = requests.get(url, headers=headers, timeout=10, verify=False)
        if 200 <= res.status_code < 500: return True, f"OK ({res.status_code})"
        return False, f"Status Error: {res.status_code}"
    except Exception as e: return False, f"Conn Error: {e}"

def is_service_active(svc_name):
    if not svc_name: return True
    try:
        # sudo systemctl is-active
        cmd = ["systemctl", "is-active", "--quiet", svc_name]
        if not IS_ROOT: cmd = ["sudo"] + cmd
        subprocess.check_call(cmd)
        return True
    except subprocess.CalledProcessError:
        return False

def check_ping(target):
    try:
        subprocess.check_call(["ping", "-c", "1", "-W", "2", target], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        return True
    except: return False

def get_tail_log(filepath, lines=50):
    if not filepath or not os.path.exists(filepath): return ""
    try:
        # ログ閲覧は権限が必要なため sudo tail を使う
        cmd = ["tail", "-n", str(lines), filepath]
        if not IS_ROOT: cmd = ["sudo"] + cmd
        return subprocess.check_output(cmd).decode('utf-8', errors='ignore')
    except: return ""

def execute_restart(svc_name):
    if not svc_name: return False, "Service not found"
    
    cmd = ["systemctl", "restart", svc_name]
    if not IS_ROOT: cmd = ["sudo"] + cmd
    
    try:
        subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        return True, f"Executed: {' '.join(cmd)}"
    except subprocess.CalledProcessError as e:
        return False, f"Failed: {' '.join(cmd)}\nError: {e.output.decode('utf-8')}"

def check_disk_usage(previous_disk_state):
    alerts = []
    is_critical = False
    current_disk_state = {}
    try:
        # df も念のため sudo
        cmd = ["df", "-P"]
        if not IS_ROOT: cmd = ["sudo"] + cmd
        output = subprocess.check_output(cmd).decode().splitlines()[1:]
    except: return False, ["Disk Check Failed"], {}

    for line in output:
        parts = line.split()
        if len(parts) < 6: continue
        mount_point = parts[5]
        try: capacity_pct = int(parts[4].replace("%", ""))
        except: continue
        if any(x in parts[0] for x in ["tmpfs", "overlay", "loop"]): continue

        current_disk_state[mount_point] = capacity_pct
        if capacity_pct < DISK_WARNING_THRESHOLD: continue
        if capacity_pct >= DISK_CRITICAL_THRESHOLD:
            alerts.append(f"Disk CRITICAL: {mount_point} ({capacity_pct}%)")
            is_critical = True
            continue
        prev_pct = previous_disk_state.get(mount_point, capacity_pct)
        if (capacity_pct - prev_pct) >= DISK_RAPID_GROWTH_PCT:
            alerts.append(f"Disk RAPID RISE: {mount_point} (+{capacity_pct - prev_pct}%)")
            is_critical = True
    return is_critical, alerts, current_disk_state

def check_other_resources():
    alerts, info = [], []
    is_critical = False
    try:
        with open("/proc/loadavg", "r") as f:
            load = float(f.read().split()[0])
            info.append(f"Load: {load}")
            if load > THRESHOLD_CPU_LOAD:
                alerts.append(f"High Load: {load}"); is_critical = True
    except: pass
    try:
        # free も念のため sudo
        cmd = ["free"]
        if not IS_ROOT: cmd = ["sudo"] + cmd
        out = subprocess.check_output(cmd).decode().splitlines()[1].split()
        pct = (int(out[2]) / int(out[1])) * 100
        info.append(f"Mem: {pct:.1f}%")
        if pct > THRESHOLD_MEM_PCT:
            alerts.append(f"High Mem: {pct:.1f}%"); is_critical = True
    except: pass
    return is_critical, alerts, ", ".join(info)

def load_state():
    defaults = {"db":True, "web":True, "php":True, "ssh":True, "cron":True, "ftp":True, "smtp":True, "ping":True, "disk":{}}
    if not os.path.exists(STATE_FILE): return defaults
    try:
        with open(STATE_FILE, 'r') as f:
            st = json.load(f)
            defaults.update(st)
            return defaults
    except: return defaults

def save_state(state_dict):
    try:
        with open(STATE_FILE, 'w') as f: json.dump(state_dict, f)
    except: pass

def send_slack(msg, color="good"):
    if not SLACK_WEBHOOK_URL: return
    try: requests.post(SLACK_WEBHOOK_URL, json={"attachments": [{"color": color, "text": msg}]}, timeout=5)
    except: pass

def ask_gemini(context, logs, svc_map, sys_info):
    if not os.environ.get("GEMINI_API_KEY"): return {"action": "ESCALATE", "reason": "API Key missing"}
    genai.configure(api_key=os.environ["GEMINI_API_KEY"])
    model = genai.GenerativeModel(MODEL_NAME)
    
    prompt = f"""
    You are a SRE. Analyze status, metrics, and logs.
    [Situation] {context}
    [System] {sys_info}
    [Services] {json.dumps(svc_map)}
    [Logs]
    {logs}
    
    [Instruction]
    - Identify the cause.
    - If Service Down -> Recommend RESTART (except DB).
    - If DB is problematic -> Explain the issue but note that auto-restart is disabled.
    
    Action IDs:
    RESTART_DB, RESTART_WEB, RESTART_PHP, RESTART_SSH, RESTART_CRON, RESTART_FTP, RESTART_SMTP, ESCALATE
    
    [Output] JSON Only:
    """
    try:
        response = model.generate_content(prompt)
        text = response.text.replace("```json", "").replace("```", "").strip()
        return json.loads(text)
    except Exception as e: return {"action": "ESCALATE", "reason": f"AI Error: {str(e)}"}

def process_decision_and_notify(decision, svc_map):
    action = decision.get("action")
    reason = decision.get("reason")
    
    # DB保護
    if action == "RESTART_DB":
        send_slack(f"🛑 [実行抑止] DB再起動は安全のため自動実行されません。\n理由: {reason}", "warning")
        return

    mapping = {
        "RESTART_WEB": "web", "RESTART_PHP": "php",
        "RESTART_SSH": "ssh", "RESTART_CRON": "cron", "RESTART_FTP": "ftp", "RESTART_SMTP": "smtp"
    }
    target_key = mapping.get(action)
    target_svc = svc_map.get(target_key) if target_key else None
    
    if action == "ESCALATE" or not target_svc:
        send_slack(f"🛑 [実行なし] 手動対応推奨\n理由: {reason}\nAction: {action}", "warning")
    else:
        success, exec_msg = execute_restart(target_svc)
        if success: send_slack(f"✅ [実行成功] {target_svc} 復旧\n理由: {reason}\n処理: {exec_msg}", "good")
        else: send_slack(f"❌ [実行失敗] {target_svc} エラー\n理由: {reason}\nエラー: {exec_msg}", "danger")

def main():
    print(f"--- Start Diagnostics ({MODEL_NAME}) / User: {os.getlogin() if hasattr(os, 'getlogin') else 'unknown'} ---")
    g_ip = get_global_ip()
    st = load_state()
    
    svcs = {
        "db": detect_service_name(CANDIDATE_DB_SVCS),
        "web": detect_service_name(CANDIDATE_WEB_SVCS),
        "php": detect_service_name(CANDIDATE_PHP_SVCS),
        "ssh": detect_service_name(CANDIDATE_SSH_SVCS),
        "cron": detect_service_name(CANDIDATE_CRON_SVCS),
        "ftp": detect_service_name(CANDIDATE_FTP_SVCS),
        "smtp": detect_service_name(CANDIDATE_SMTP_SVCS)
    }
    
    logs_path = {
        "db": detect_log_path(CANDIDATE_DB_LOGS),
        "web": detect_log_path(CANDIDATE_WEB_LOGS),
        "php": detect_log_path(CANDIDATE_PHP_LOGS),
        "sys": detect_log_path(CANDIDATE_SYS_LOGS)
    }

    is_disk_crit, disk_alerts, disk_st = check_disk_usage(st.get("disk", {}))
    is_res_crit, res_alerts, res_info = check_other_resources()
    
    cur = {}
    cur["db"]   = check_port("127.0.0.1", 3306) if svcs["db"] else True
    cur["web"]  = check_http(f"http://{g_ip}/")[0] if svcs["web"] else True
    cur["php"]  = is_service_active(svcs["php"]) if svcs["php"] else True
    cur["ssh"]  = (is_service_active(svcs["ssh"]) and check_port("127.0.0.1", 22)) if svcs["ssh"] else True
    cur["cron"] = is_service_active(svcs["cron"]) if svcs["cron"] else True
    cur["ftp"]  = (is_service_active(svcs["ftp"]) and check_port("127.0.0.1", 21)) if svcs["ftp"] else True
    cur["smtp"] = (is_service_active(svcs["smtp"]) and check_port("127.0.0.1", 25)) if svcs["smtp"] else True
    cur["ping"] = check_ping(PING_TARGET)

    print(f"Status: {cur}")

    # 復旧確認
    for k, v in cur.items():
        if k == "disk": continue
        if not st.get(k, True) and v:
            send_slack(f"🎉 [復旧確認] {k.upper()} サービス/疎通が回復しました。", "good")

    def collect_logs():
        l = ""
        if svcs["db"]: l += f"[DB]\n{get_tail_log(logs_path['db'])}\n"
        if svcs["web"]: l += f"[WEB]\n{get_tail_log(logs_path['web'])}\n"
        if svcs["php"]: l += f"[PHP]\n{get_tail_log(logs_path['php'])}\n"
        l += f"[SYS]\n{get_tail_log(logs_path['sys'])}\n"
        return l

    # A. Network Down
    if not cur["ping"]:
        send_slack(f"⚠️ ネットワーク疎通断 ({PING_TARGET}へのPing失敗)", "danger")
        st.update(cur); st["disk"] = disk_st; save_state(st)
        return

    # B. Disk Alert
    if is_disk_crit:
        msg = "Disk Alert: " + " / ".join(disk_alerts)
        send_slack(f"⚠️ {msg}", "danger")
        decision = ask_gemini(msg, collect_logs(), svcs, res_info)
        process_decision_and_notify(decision, svcs)
        st.update(cur); st["disk"] = disk_st; save_state(st)
        return

    # C. Resource Alert
    if is_res_crit:
        msg = "Resource Alert: " + " / ".join(res_alerts)
        send_slack(f"⚠️ {msg}", "warning")
        decision = ask_gemini(msg, collect_logs(), svcs, res_info)
        process_decision_and_notify(decision, svcs)

    # D. Service Down
    check_order = ["ssh", "cron", "db", "web", "php", "ftp", "smtp"]
    for key in check_order:
        if not cur[key]:
            svc_name = svcs[key]
            msg = f"{key.upper()} Service Down ({svc_name})"
            send_slack(f"⚠️ {msg}", "danger")
            
            logs = f"[SYS]\n{get_tail_log(logs_path['sys'])}\n"
            if key in ["db", "web", "php"]:
                 logs += f"[{key.upper()}]\n{get_tail_log(logs_path.get(key))}"
            
            decision = ask_gemini(msg, logs, svcs, res_info)
            process_decision_and_notify(decision, svcs)

    st.update(cur); st["disk"] = disk_st; save_state(st)

if __name__ == "__main__":
    try: main()
    except Exception as e:
        print(traceback.format_exc())
        send_slack(f"💀 Script Error: {str(e)}", "danger")

権限の変更を行います。

# chmod +x /usr/local/bin/auto_healer.py

これでエラーが出なければOKです

# python3.11 /usr/local/bin/auto_healer.py

実行するユーザーも作成します。今回は「gemini-agent」としています。

# useradd -s /sbin/nologin gemini-agent

gemini-agentに一定の再起動コマンドと、調査時に必要な一部コマンドのsudo権限を付与しておきます。

# visudo -f /etc/sudoers.d/gemini-agent

gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart *

gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/systemctl is-active *
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/systemctl status *
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/tail
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/df
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/free

スクリプトの所有者を変更

# chown gemini-agent:gemini-agent /usr/local/bin/auto_healer.py

gemini-agentのCronに追加し5分毎にスクリプトを実行させます。

# crontab -u gemini-agent -e

# 実行ユーザーは gemini-agent になるので、sudoを内部で使います
*/5 * * * * /usr/local/bin/auto_healer.py > /dev/null 2>&1

動作テスト

一度ユーザーを切り替えてみて、 gemini-agentでコマンドを実行できるかテストしてみます。
以下のように各サービスのステータスが確認でき、Trueとなっていますので正常となっています。

# su - gemini-agent -s /bin/bash

$ /usr/local/bin/auto_healer.py

--- Start Diagnostics (gemini-2.5-flash) / User: future ---
Status: {'db': True, 'web': True, 'php': True, 'ssh': True, 'cron': True, 'ftp': True, 'smtp': True, 'ping': True}

一度rootに戻ってわざとhttpdなどを止めて実行してみます。
Web箇所のステータスがFalseとなり、Slackにも通知が来ていました。ちょっと文言とかは分かりづらいですが無事再起動したようです。

$ exit

# systemctl stop httpd

# su - gemini-agent -s /bin/bash

$ /usr/local/bin/auto_healer.py

--- Start Diagnostics (gemini-2.5-flash) / User: future ---
Status: {'db': True, 'web': False, 'php': True, 'ssh': True, 'cron': True, 'ftp': True, 'smtp': True, 'ping': True}

1-Jan-14-2026-02-43-14-9433-PM

再度ステータスを確認すると、Trueになって起動していました。

$ /usr/local/bin/auto_healer.py

--- Start Diagnostics (gemini-2.5-flash) / User: future ---
Status: {'db': True, 'web': True, 'php': True, 'ssh': True, 'cron': True, 'ftp': True, 'smtp': True, 'ping': True}

また今回は特定に異常があった場合、安易に再起動しないようにすることもできるのか検証したかったので、DBに関しては再起動しないようにスクリプトに記載したところ、正常に機能していました。

2-Jan-14-2026-02-43-14-9515-PM

終わりに

Gemini APIを使ったサーバー自動復旧を試してみた結果、以下の手応えと課題を感じました。

【面白かった点】
文脈理解: 単なるポート監視ではなく、「ログを見て判断する」ため、誤検知を減らせる可能性がある。
柔軟性: プロンプト次第で「特定のログが出ていたら再起動しない」といった複雑なルールも自然言語で定義できる。
Flashモデルの速さ: 監視スクリプトに組み込んでもタイムラグは気にならないレベル。

【課題】
・まれに存在しないログを根拠にする可能性がゼロではないため、商用環境では「提案までAIがやり、実行ボタンは人間が押す」という半自動化が良いかもしれない。
・サーバー内の生ログを外部APIに送信するため、個人情報や機密情報のマスキング処理は必須。※今回は検証環境下で実施しています。

とはいえ、「AIにオペレーションの一部を委譲する」というアプローチは、未来のインフラ運用の形として非常に可能性を感じる実験でした。