生成AI活用していますか?
生成AIの進化が止まりませんが、インフラエンジニアとして気になるのは
「このAI、実運用のオペレーションまで任せられたりするのか?」という点です。
そこで今回は、Googleの Gemini API (gemini-2.5-flash) を活用し、
「ログをGeminiに読ませて、状況判断と復旧コマンドの選択までをAIに委ねる」
というスクリプトをPythonで書き、実際に動作するかテストしてみました。
ちなみにPythonスクリプトについてもGeminiと会話して作っています。
構成
仕組みはシンプルです。PythonスクリプトをCronで定期実行し、異常があればGemini APIに問い合わせます。
1. 監視: リソース(CPU/Disk)と主要サービス(Nginx/MySQL等)をチェック。
2. 収集: 異常があれば、関連するログ(syslog, error.logなど)を取得。
3. 思考 (Gemini): ログとシステム情報をAPIに投げる。
4. 判断: Geminiから「原因」と「実行すべきアクション(再起動など)」を受け取る。
5. 実行: 必要に応じてスクリプトがコマンド(systemctl restart等)を実行。
6. 通知: 原因と実行内容をSlackに通知
手順
EPELリポジトリがない場合はインストール
# yum install -y epel-release
Python 3.11 本体と pip をインストール
# yum install -y python3.11 python3.11-pip
PythonスクリプトからAPI経由でGeminiを呼び出したいので、google-generativeaiをインストールします。
# python3.11 -m pip install google-generativeai requests
PythonスクリプトからAPI経由でGeminiを呼び出したいので、google-generativeaiをインストールします。
# vi /usr/local/bin/auto_healer.py
ちょっと長いですが以下スクリプトをコピペします。
#!/usr/bin/env python3.11
# -*- coding: utf-8 -*-
import socket
import subprocess
import requests
import os
import sys
import json
import time
import google.generativeai as genai
import traceback
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# ==========================================
# 設定エリア
# ==========================================
os.environ["GEMINI_API_KEY"] = "##geminiのAPIキーをこちらに登録します##"
SLACK_WEBHOOK_URL = "##SlackのWebhook URLをこちらに登録します##"
STATE_FILE = "/tmp/monitor_state.json"
MODEL_NAME = "gemini-2.5-flash"
# --- 監視しきい値 ---
THRESHOLD_CPU_LOAD = 5.0
THRESHOLD_MEM_PCT = 90.0
DISK_WARNING_THRESHOLD = 80.0
DISK_CRITICAL_THRESHOLD = 95.0
DISK_RAPID_GROWTH_PCT = 2.0
PING_TARGET = "8.8.8.8"
# ==========================================
CANDIDATE_DB_SVCS = ["mariadb", "mysqld", "mysql"]
CANDIDATE_WEB_SVCS = ["nginx", "httpd", "apache2"]
CANDIDATE_PHP_SVCS = ["php-fpm", "php8.3-fpm", "php8.2-fpm", "php8.1-fpm", "php7.4-fpm"]
CANDIDATE_SSH_SVCS = ["sshd", "ssh"]
CANDIDATE_CRON_SVCS = ["crond", "cron"]
CANDIDATE_FTP_SVCS = ["vsftpd.target", "proftpd", "pure-ftpd"]
CANDIDATE_SMTP_SVCS = ["postfix", "sendmail", "exim4", "exim"]
CANDIDATE_DB_LOGS = ["/var/log/mysql/error.log", "/var/log/mariadb/mariadb.log", "/var/log/mysqld.log"]
CANDIDATE_WEB_LOGS = ["/var/log/nginx/error.log", "/var/log/httpd/error_log", "/var/log/apache2/error.log"]
CANDIDATE_PHP_LOGS = ["/var/log/php-fpm/error.log", "/var/log/php-fpm/www-error.log"]
CANDIDATE_SYS_LOGS = ["/var/log/messages", "/var/log/syslog", "/var/log/secure", "/var/log/maillog", "/var/log/cron"]
# ★現在のユーザーがrootかどうか判定
IS_ROOT = (os.geteuid() == 0)
def get_global_ip():
try: return requests.get("https://ifconfig.me", timeout=3).text.strip()
except: return "127.0.0.1"
def run_cmd_secure(cmd_list):
"""
一般ユーザーなら先頭に sudo を付与して実行するラッパー
"""
final_cmd = cmd_list
if not IS_ROOT:
final_cmd = ["sudo"] + cmd_list
# 実行 (stdoutは捨てる、stderrも捨てる)
return subprocess.run(final_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def detect_service_name(candidates):
for svc in candidates:
try:
# statusコマンドもsudo経由で行う(念のため)
res = run_cmd_secure(["systemctl", "status", svc])
if res.returncode != 4: return svc
except: continue
return None
def detect_log_path(candidates):
for path in candidates:
if os.path.exists(path): return path
return None
def check_port(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
try:
res = s.connect_ex((host, port))
s.close()
return res == 0
except: return False
def check_http(url):
try:
headers = {'User-Agent': 'HealthCheckBot/1.0'}
res = requests.get(url, headers=headers, timeout=10, verify=False)
if 200 <= res.status_code < 500: return True, f"OK ({res.status_code})"
return False, f"Status Error: {res.status_code}"
except Exception as e: return False, f"Conn Error: {e}"
def is_service_active(svc_name):
if not svc_name: return True
try:
# sudo systemctl is-active
cmd = ["systemctl", "is-active", "--quiet", svc_name]
if not IS_ROOT: cmd = ["sudo"] + cmd
subprocess.check_call(cmd)
return True
except subprocess.CalledProcessError:
return False
def check_ping(target):
try:
subprocess.check_call(["ping", "-c", "1", "-W", "2", target], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
except: return False
def get_tail_log(filepath, lines=50):
if not filepath or not os.path.exists(filepath): return ""
try:
# ログ閲覧は権限が必要なため sudo tail を使う
cmd = ["tail", "-n", str(lines), filepath]
if not IS_ROOT: cmd = ["sudo"] + cmd
return subprocess.check_output(cmd).decode('utf-8', errors='ignore')
except: return ""
def execute_restart(svc_name):
if not svc_name: return False, "Service not found"
cmd = ["systemctl", "restart", svc_name]
if not IS_ROOT: cmd = ["sudo"] + cmd
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
return True, f"Executed: {' '.join(cmd)}"
except subprocess.CalledProcessError as e:
return False, f"Failed: {' '.join(cmd)}\nError: {e.output.decode('utf-8')}"
def check_disk_usage(previous_disk_state):
alerts = []
is_critical = False
current_disk_state = {}
try:
# df も念のため sudo
cmd = ["df", "-P"]
if not IS_ROOT: cmd = ["sudo"] + cmd
output = subprocess.check_output(cmd).decode().splitlines()[1:]
except: return False, ["Disk Check Failed"], {}
for line in output:
parts = line.split()
if len(parts) < 6: continue
mount_point = parts[5]
try: capacity_pct = int(parts[4].replace("%", ""))
except: continue
if any(x in parts[0] for x in ["tmpfs", "overlay", "loop"]): continue
current_disk_state[mount_point] = capacity_pct
if capacity_pct < DISK_WARNING_THRESHOLD: continue
if capacity_pct >= DISK_CRITICAL_THRESHOLD:
alerts.append(f"Disk CRITICAL: {mount_point} ({capacity_pct}%)")
is_critical = True
continue
prev_pct = previous_disk_state.get(mount_point, capacity_pct)
if (capacity_pct - prev_pct) >= DISK_RAPID_GROWTH_PCT:
alerts.append(f"Disk RAPID RISE: {mount_point} (+{capacity_pct - prev_pct}%)")
is_critical = True
return is_critical, alerts, current_disk_state
def check_other_resources():
alerts, info = [], []
is_critical = False
try:
with open("/proc/loadavg", "r") as f:
load = float(f.read().split()[0])
info.append(f"Load: {load}")
if load > THRESHOLD_CPU_LOAD:
alerts.append(f"High Load: {load}"); is_critical = True
except: pass
try:
# free も念のため sudo
cmd = ["free"]
if not IS_ROOT: cmd = ["sudo"] + cmd
out = subprocess.check_output(cmd).decode().splitlines()[1].split()
pct = (int(out[2]) / int(out[1])) * 100
info.append(f"Mem: {pct:.1f}%")
if pct > THRESHOLD_MEM_PCT:
alerts.append(f"High Mem: {pct:.1f}%"); is_critical = True
except: pass
return is_critical, alerts, ", ".join(info)
def load_state():
defaults = {"db":True, "web":True, "php":True, "ssh":True, "cron":True, "ftp":True, "smtp":True, "ping":True, "disk":{}}
if not os.path.exists(STATE_FILE): return defaults
try:
with open(STATE_FILE, 'r') as f:
st = json.load(f)
defaults.update(st)
return defaults
except: return defaults
def save_state(state_dict):
try:
with open(STATE_FILE, 'w') as f: json.dump(state_dict, f)
except: pass
def send_slack(msg, color="good"):
if not SLACK_WEBHOOK_URL: return
try: requests.post(SLACK_WEBHOOK_URL, json={"attachments": [{"color": color, "text": msg}]}, timeout=5)
except: pass
def ask_gemini(context, logs, svc_map, sys_info):
if not os.environ.get("GEMINI_API_KEY"): return {"action": "ESCALATE", "reason": "API Key missing"}
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
model = genai.GenerativeModel(MODEL_NAME)
prompt = f"""
You are a SRE. Analyze status, metrics, and logs.
[Situation] {context}
[System] {sys_info}
[Services] {json.dumps(svc_map)}
[Logs]
{logs}
[Instruction]
- Identify the cause.
- If Service Down -> Recommend RESTART (except DB).
- If DB is problematic -> Explain the issue but note that auto-restart is disabled.
Action IDs:
RESTART_DB, RESTART_WEB, RESTART_PHP, RESTART_SSH, RESTART_CRON, RESTART_FTP, RESTART_SMTP, ESCALATE
[Output] JSON Only:
"""
try:
response = model.generate_content(prompt)
text = response.text.replace("```json", "").replace("```", "").strip()
return json.loads(text)
except Exception as e: return {"action": "ESCALATE", "reason": f"AI Error: {str(e)}"}
def process_decision_and_notify(decision, svc_map):
action = decision.get("action")
reason = decision.get("reason")
# DB保護
if action == "RESTART_DB":
send_slack(f"🛑 [実行抑止] DB再起動は安全のため自動実行されません。\n理由: {reason}", "warning")
return
mapping = {
"RESTART_WEB": "web", "RESTART_PHP": "php",
"RESTART_SSH": "ssh", "RESTART_CRON": "cron", "RESTART_FTP": "ftp", "RESTART_SMTP": "smtp"
}
target_key = mapping.get(action)
target_svc = svc_map.get(target_key) if target_key else None
if action == "ESCALATE" or not target_svc:
send_slack(f"🛑 [実行なし] 手動対応推奨\n理由: {reason}\nAction: {action}", "warning")
else:
success, exec_msg = execute_restart(target_svc)
if success: send_slack(f"✅ [実行成功] {target_svc} 復旧\n理由: {reason}\n処理: {exec_msg}", "good")
else: send_slack(f"❌ [実行失敗] {target_svc} エラー\n理由: {reason}\nエラー: {exec_msg}", "danger")
def main():
print(f"--- Start Diagnostics ({MODEL_NAME}) / User: {os.getlogin() if hasattr(os, 'getlogin') else 'unknown'} ---")
g_ip = get_global_ip()
st = load_state()
svcs = {
"db": detect_service_name(CANDIDATE_DB_SVCS),
"web": detect_service_name(CANDIDATE_WEB_SVCS),
"php": detect_service_name(CANDIDATE_PHP_SVCS),
"ssh": detect_service_name(CANDIDATE_SSH_SVCS),
"cron": detect_service_name(CANDIDATE_CRON_SVCS),
"ftp": detect_service_name(CANDIDATE_FTP_SVCS),
"smtp": detect_service_name(CANDIDATE_SMTP_SVCS)
}
logs_path = {
"db": detect_log_path(CANDIDATE_DB_LOGS),
"web": detect_log_path(CANDIDATE_WEB_LOGS),
"php": detect_log_path(CANDIDATE_PHP_LOGS),
"sys": detect_log_path(CANDIDATE_SYS_LOGS)
}
is_disk_crit, disk_alerts, disk_st = check_disk_usage(st.get("disk", {}))
is_res_crit, res_alerts, res_info = check_other_resources()
cur = {}
cur["db"] = check_port("127.0.0.1", 3306) if svcs["db"] else True
cur["web"] = check_http(f"http://{g_ip}/")[0] if svcs["web"] else True
cur["php"] = is_service_active(svcs["php"]) if svcs["php"] else True
cur["ssh"] = (is_service_active(svcs["ssh"]) and check_port("127.0.0.1", 22)) if svcs["ssh"] else True
cur["cron"] = is_service_active(svcs["cron"]) if svcs["cron"] else True
cur["ftp"] = (is_service_active(svcs["ftp"]) and check_port("127.0.0.1", 21)) if svcs["ftp"] else True
cur["smtp"] = (is_service_active(svcs["smtp"]) and check_port("127.0.0.1", 25)) if svcs["smtp"] else True
cur["ping"] = check_ping(PING_TARGET)
print(f"Status: {cur}")
# 復旧確認
for k, v in cur.items():
if k == "disk": continue
if not st.get(k, True) and v:
send_slack(f"🎉 [復旧確認] {k.upper()} サービス/疎通が回復しました。", "good")
def collect_logs():
l = ""
if svcs["db"]: l += f"[DB]\n{get_tail_log(logs_path['db'])}\n"
if svcs["web"]: l += f"[WEB]\n{get_tail_log(logs_path['web'])}\n"
if svcs["php"]: l += f"[PHP]\n{get_tail_log(logs_path['php'])}\n"
l += f"[SYS]\n{get_tail_log(logs_path['sys'])}\n"
return l
# A. Network Down
if not cur["ping"]:
send_slack(f"⚠️ ネットワーク疎通断 ({PING_TARGET}へのPing失敗)", "danger")
st.update(cur); st["disk"] = disk_st; save_state(st)
return
# B. Disk Alert
if is_disk_crit:
msg = "Disk Alert: " + " / ".join(disk_alerts)
send_slack(f"⚠️ {msg}", "danger")
decision = ask_gemini(msg, collect_logs(), svcs, res_info)
process_decision_and_notify(decision, svcs)
st.update(cur); st["disk"] = disk_st; save_state(st)
return
# C. Resource Alert
if is_res_crit:
msg = "Resource Alert: " + " / ".join(res_alerts)
send_slack(f"⚠️ {msg}", "warning")
decision = ask_gemini(msg, collect_logs(), svcs, res_info)
process_decision_and_notify(decision, svcs)
# D. Service Down
check_order = ["ssh", "cron", "db", "web", "php", "ftp", "smtp"]
for key in check_order:
if not cur[key]:
svc_name = svcs[key]
msg = f"{key.upper()} Service Down ({svc_name})"
send_slack(f"⚠️ {msg}", "danger")
logs = f"[SYS]\n{get_tail_log(logs_path['sys'])}\n"
if key in ["db", "web", "php"]:
logs += f"[{key.upper()}]\n{get_tail_log(logs_path.get(key))}"
decision = ask_gemini(msg, logs, svcs, res_info)
process_decision_and_notify(decision, svcs)
st.update(cur); st["disk"] = disk_st; save_state(st)
if __name__ == "__main__":
try: main()
except Exception as e:
print(traceback.format_exc())
send_slack(f"💀 Script Error: {str(e)}", "danger")
権限の変更を行います。
# chmod +x /usr/local/bin/auto_healer.py
これでエラーが出なければOKです
# python3.11 /usr/local/bin/auto_healer.py
実行するユーザーも作成します。今回は「gemini-agent」としています。
# useradd -s /sbin/nologin gemini-agent
gemini-agentに一定の再起動コマンドと、調査時に必要な一部コマンドのsudo権限を付与しておきます。
# visudo -f /etc/sudoers.d/gemini-agent
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart *
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/systemctl is-active *
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/systemctl status *
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/tail
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/df
gemini-agent ALL=(ALL) NOPASSWD: /usr/bin/free
スクリプトの所有者を変更
# chown gemini-agent:gemini-agent /usr/local/bin/auto_healer.py
gemini-agentのCronに追加し5分毎にスクリプトを実行させます。
# crontab -u gemini-agent -e
# 実行ユーザーは gemini-agent になるので、sudoを内部で使います
*/5 * * * * /usr/local/bin/auto_healer.py > /dev/null 2>&1
動作テスト
一度ユーザーを切り替えてみて、 gemini-agentでコマンドを実行できるかテストしてみます。
以下のように各サービスのステータスが確認でき、Trueとなっていますので正常となっています。
# su - gemini-agent -s /bin/bash
$ /usr/local/bin/auto_healer.py
--- Start Diagnostics (gemini-2.5-flash) / User: future ---
Status: {'db': True, 'web': True, 'php': True, 'ssh': True, 'cron': True, 'ftp': True, 'smtp': True, 'ping': True}
一度rootに戻ってわざとhttpdなどを止めて実行してみます。
Web箇所のステータスがFalseとなり、Slackにも通知が来ていました。ちょっと文言とかは分かりづらいですが無事再起動したようです。
$ exit
# systemctl stop httpd
# su - gemini-agent -s /bin/bash
$ /usr/local/bin/auto_healer.py
--- Start Diagnostics (gemini-2.5-flash) / User: future ---
Status: {'db': True, 'web': False, 'php': True, 'ssh': True, 'cron': True, 'ftp': True, 'smtp': True, 'ping': True}

再度ステータスを確認すると、Trueになって起動していました。
$ /usr/local/bin/auto_healer.py
--- Start Diagnostics (gemini-2.5-flash) / User: future ---
Status: {'db': True, 'web': True, 'php': True, 'ssh': True, 'cron': True, 'ftp': True, 'smtp': True, 'ping': True}
また今回は特定に異常があった場合、安易に再起動しないようにすることもできるのか検証したかったので、DBに関しては再起動しないようにスクリプトに記載したところ、正常に機能していました。

終わりに
Gemini APIを使ったサーバー自動復旧を試してみた結果、以下の手応えと課題を感じました。
【面白かった点】
文脈理解: 単なるポート監視ではなく、「ログを見て判断する」ため、誤検知を減らせる可能性がある。
柔軟性: プロンプト次第で「特定のログが出ていたら再起動しない」といった複雑なルールも自然言語で定義できる。
Flashモデルの速さ: 監視スクリプトに組み込んでもタイムラグは気にならないレベル。
【課題】
・まれに存在しないログを根拠にする可能性がゼロではないため、商用環境では「提案までAIがやり、実行ボタンは人間が押す」という半自動化が良いかもしれない。
・サーバー内の生ログを外部APIに送信するため、個人情報や機密情報のマスキング処理は必須。※今回は検証環境下で実施しています。
とはいえ、「AIにオペレーションの一部を委譲する」というアプローチは、未来のインフラ運用の形として非常に可能性を感じる実験でした。

