系统资源监控
system_monitor.py
# ==================================================
# 安装依赖: pip install psutil aiohttp apscheduler
# 设置 Telegram 环境变量
# export TG_BOT_TOKEN="你的bot_token"
# export TG_CHAT_ID="你的chat_id"
# 运行: python system_monitor.py
# ==================================================
import os
import psutil
import socket
import datetime
import logging
import asyncio
import aiohttp
from logging.handlers import RotatingFileHandler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
# =========================
# 配置区
# =========================
THRESHOLD = {"cpu": 80, "mem": 75, "disk": 59}
ALARM_TRIGGER_COUNT = 3 # 连续 3 次超过阈值就告警
ALARM_COOLDOWN = 30 # 单指标告警冷却时间 30 秒
MERGE_ALARM_INTERVAL = 30 # 合并告警 30 秒推送一次
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", "替换为你的BotToken")
TG_CHAT_ID = os.getenv("TG_CHAT_ID", "替换为你的ChatID")
TG_API_URL = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
LOG_FILE = "system_monitor.log"
# =========================
# 日志配置
# =========================
def setup_logger():
logger = logging.getLogger("SystemMonitor")
logger.setLevel(logging.INFO)
file_handler = RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=5)
file_handler.setFormatter(logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(filename)s:%(lineno)d | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%H:%M:%S"
))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = setup_logger()
# =========================
# Telegram 通知
# =========================
class TelegramNotifier:
def __init__(self):
self.api_url = TG_API_URL
self.chat_id = TG_CHAT_ID
async def send(self, title: str, content: str):
logger.info(f"准备发送Telegram告警: {title}")
try:
async with aiohttp.ClientSession() as session:
payload = {"chat_id": self.chat_id,"text": f"*{title}*\n\n{content}","parse_mode": "Markdown"}
async with session.post(self.api_url, data=payload) as resp:
if resp.status == 200:
logger.info("✅ Telegram告警发送成功")
else:
text = await resp.text()
logger.error(f"❌ Telegram告警失败: {resp.status}, {text}")
except Exception as e:
logger.error(f"Telegram告警异常: {e}")
notifier = TelegramNotifier()
# =========================
# 系统信息获取
# =========================
def get_host_info():
try:
hostname = socket.gethostname()
logger.info(f"获取主机名: {hostname}")
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
logger.info(f"获取IP地址: {ip}")
return hostname, ip
except Exception as e:
logger.error(f"获取主机信息失败: {e}")
return "未知主机", "未知IP"
def get_system_info():
try:
mem = psutil.virtual_memory()
disk = psutil.disk_usage('/')
cpu_percent = psutil.cpu_percent(interval=1)
logger.info(f"系统信息: CPU {cpu_percent}%, 内存 {mem.percent}%, 磁盘 {disk.percent}%")
return {
"cpu": cpu_percent,
"mem": mem.percent,
"mem_total": round(mem.total / (1024 ** 3), 2),
"mem_used": round(mem.used / (1024 ** 3), 2),
"disk": disk.percent,
"disk_total": round(disk.total / (1024 ** 3), 2),
"disk_used": round(disk.used / (1024 ** 3), 2)
}
except Exception as e:
logger.error(f"获取系统信息失败: {e}")
return {}
# =========================
# 异常计数器
# =========================
alarm_counter = {"cpu": 0, "mem": 0, "disk": 0}
last_alarm_time = {"cpu": None, "mem": None, "disk": None}
pending_alarms = []
# =========================
# 核心监控逻辑
# =========================
async def monitor():
hostname, ip = get_host_info()
info = get_system_info()
now = datetime.datetime.now()
if not info:
logger.error("无法获取系统状态,跳过监控")
return
triggered_items = []
def check_metric(metric: str, value: float):
logger.info(f"检查指标 {metric.upper()} 当前值 {value}% 阈值 {THRESHOLD[metric]}%")
if value > THRESHOLD[metric]:
alarm_counter[metric] += 1
logger.info(f"{metric.upper()} 超阈值,连续计数 {alarm_counter[metric]}")
if alarm_counter[metric] >= ALARM_TRIGGER_COUNT:
last_time = last_alarm_time[metric]
if not last_time or (now - last_time).total_seconds() > ALARM_COOLDOWN:
triggered_items.append(metric)
last_alarm_time[metric] = now
logger.info(f"{metric.upper()} 告警条件满足,将触发告警")
else:
if alarm_counter[metric] != 0:
logger.info(f"{metric.upper()} 重置计数器")
alarm_counter[metric] = 0
check_metric("cpu", info["cpu"])
check_metric("mem", info["mem"])
check_metric("disk", info["disk"])
# 单指标告警
for metric in triggered_items:
msg = f"🚨 {metric.upper()} 告警\n主机: {hostname} | IP: {ip}\n" \
f"{metric.upper()} 连续 {ALARM_TRIGGER_COUNT} 次超阈值\n当前值: {info[metric]}%\n"
logger.info(f"发送单指标告警: {metric.upper()}")
await notifier.send(f"{metric.upper()} 告警", msg)
# 待合并告警
if triggered_items:
pending_alarms.append({
"time": now.strftime("%Y-%m-%d %H:%M:%S"),
"hostname": hostname,
"ip": ip,
"details": [f"{m.upper()} 当前值 {info[m]}%" for m in triggered_items],
"info": info
})
else:
logger.info("系统资源正常,无异常项")
# =========================
# 合并告警推送
# =========================
async def push_merged_alarms():
if not pending_alarms:
logger.info("暂无合并告警需要发送")
return
logger.info(f"准备发送合并告警,共 {len(pending_alarms)} 条")
content = "*🚨 系统资源合并告警*\n\n"
for alarm in pending_alarms:
content += f"🖥️ `{alarm['hostname']}` | {alarm['ip']} | {alarm['time']}\n"
content += "\n".join(alarm["details"]) + "\n\n"
await notifier.send("系统资源告警汇总", content)
pending_alarms.clear()
logger.info("✅ 已发送合并告警")
# =========================
# APScheduler 定时任务
# =========================
def start_scheduler(loop):
scheduler = AsyncIOScheduler(
event_loop=loop,
timezone="Asia/Shanghai",
#executors={"default": ThreadPoolExecutor(5)}
)
#scheduler.add_job(monitor, "interval", minutes=1, id="system_monitor", max_instances=1)
scheduler.add_job(monitor, "interval", seconds=5, id="system_monitor", max_instances=1)
scheduler.add_job(push_merged_alarms, "interval", seconds=MERGE_ALARM_INTERVAL,
id="push_alarms", max_instances=1)
logger.info("✅ 系统监控服务已启动,1分钟检查,10分钟合并告警")
scheduler.start()
return scheduler
# =========================
# 主入口
# =========================
if __name__ == "__main__":
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
scheduler = start_scheduler(loop)
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
logger.warning("⛔ 系统监控服务已停止")
输出情况 system_monitor.log
2025-08-31 19:37:27 | INFO | system_monitor.py:85 | 获取主机名: VM-0-5-ubuntu
2025-08-31 19:37:27 | INFO | system_monitor.py:89 | 获取IP地址: 10.7.0.5
2025-08-31 19:37:28 | INFO | system_monitor.py:100 | 系统信息: CPU 13.6%, 内存 34.7%, 磁盘 71.4%
2025-08-31 19:37:28 | INFO | system_monitor.py:135 | 检查指标 CPU 当前值 13.6% 阈值 80%
2025-08-31 19:37:28 | INFO | system_monitor.py:135 | 检查指标 MEM 当前值 34.7% 阈值 75%
2025-08-31 19:37:28 | INFO | system_monitor.py:135 | 检查指标 DISK 当前值 71.4% 阈值 59%
2025-08-31 19:37:28 | INFO | system_monitor.py:138 | DISK 超阈值,连续计数 14
2025-08-31 19:37:28 | INFO | system_monitor.py:171 | 系统资源正常,无异常项
2025-08-31 19:37:32 | INFO | system_monitor.py:85 | 获取主机名: VM-0-5-ubuntu
2025-08-31 19:37:32 | INFO | system_monitor.py:89 | 获取IP地址: 10.7.0.5
2025-08-31 19:37:33 | INFO | system_monitor.py:100 | 系统信息: CPU 17.5%, 内存 34.6%, 磁盘 71.4%
2025-08-31 19:37:33 | INFO | system_monitor.py:135 | 检查指标 CPU 当前值 17.5% 阈值 80%
2025-08-31 19:37:33 | INFO | system_monitor.py:135 | 检查指标 MEM 当前值 34.6% 阈值 75%
2025-08-31 19:37:33 | INFO | system_monitor.py:135 | 检查指标 DISK 当前值 71.4% 阈值 59%
2025-08-31 19:37:33 | INFO | system_monitor.py:138 | DISK 超阈值,连续计数 15
2025-08-31 19:37:33 | INFO | system_monitor.py:144 | DISK 告警条件满足,将触发告警
2025-08-31 19:37:33 | INFO | system_monitor.py:158 | 发送单指标告警: DISK
2025-08-31 19:37:33 | INFO | system_monitor.py:64 | 准备发送Telegram告警: DISK 告警
2025-08-31 19:37:34 | INFO | system_monitor.py:70 | ✅ Telegram告警发送成功
Last updated