监控SSL证书可用性并发送Telegram通知
实现功能
检查时间策略
告警时间策略
通知时间策略
采集器公网地址汇报
websites.yml
websites.yml
websites:
group_1:
api:
- xxxx.com
- xxxx.com
app:
- xxxx.com
- xxxx.com
group_2:
api:
- xxxx.com
- xxxx.com
app:
- xxxx.com
- xxxx.com
check_ssl.py
#!/usr/bin/python3
# pip install python-whois requests psutil pyyaml
import configparser
import threading
import time
import os
import datetime
import sys
import ssl
import socket
import yaml
import psutil
import requests
PUBLIC_NETWORK = os.environ.get("PUBLIC_NETWORK", '8.8.8.8') # 公网IP地址
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", '')
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", '')
SSL_ALTER_INTERVAL = os.environ.get("SSL_ALTER_INTERVAL", 86400) # 告警间隔,单位秒
SSL_CHECK_INTERVAL = os.environ.get("SSL_CHECK_INTERVAL", 3600) # 证书检测间隔时间,单位秒
last_alert_times = {}
if TELEGRAM_BOT_TOKEN is None or TELEGRAM_CHAT_ID is None:
print(f"TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID can't be empty")
sys.exit(1)
def get_cpu_count():
"""获取 CPU 数量"""
return os.cpu_count()
def allocate_threads():
"""根据 CPU 数量分配线程数"""
cpu_count = get_cpu_count()
if cpu_count is None:
return 1 # 如果无法获取 CPU 数量,则默认为一个线程
elif cpu_count == 1:
return 1 # 如果只有一个 CPU,则使用一个线程
else:
return min(cpu_count * 3, 32) # 根据 CPU 数量动态分配线程数;min() 函数用于在多个参数中找到最小值, 例如,最多使用 16 个线程
def get_cpu_use():
'''获取 CPU 的负载率'''
cpu_load = psutil.cpu_percent(interval=1)
return cpu_load
def read_config_ini(filename):
'''读取配置文件'''
config = configparser.ConfigParser()
config.read(filename)
return config
# 读取配置文件
def read_config_yaml(filename):
with open(filename, 'r') as file:
config = yaml.safe_load(file)
return config
def send_telegram_alert(message):
'''发送 Telegram 告警'''
current_time = get_current_time()
print(f"{current_time} Sending Telegram alert for: {message}")
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {"chat_id": TELEGRAM_CHAT_ID, "text": f"Collector: {PUBLIC_NETWORK} \n{message}"}
response = requests.post(url, data=data)
if response.status_code == 200:
print(f"{current_time} Message sent successfully!")
else:
print(f"{current_time} Failed to send message. Status code: {response.status_code}")
def get_current_time():
'''获取当前时间'''
current_datetime = datetime.datetime.now()
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
return formatted_datetime
def get_cert_expiry(domain):
'''获取证书时间'''
# try:
context = ssl.create_default_context()
with context.wrap_socket(socket.socket(), server_hostname=domain) as ssock:
ssock.connect((domain, 443))
cert = ssock.getpeercert()
# 获取证书到期日期
expiry_date_str = cert['notAfter']
expiry_date = datetime.datetime.strptime(expiry_date_str, "%b %d %H:%M:%S %Y %Z")
return expiry_date
# except Exception as e:
# send_telegram_alert(f"Content: {domain} {e}")
# print("An error occurred:", e)
def check_certificate_expiry(url, remark):
'''检查是否使用 https 协议'''
try:
if url.startswith("https://"):
domain = url.split("//")[1]
expiry_date = get_cert_expiry(domain)
if expiry_date:
current_date = datetime.datetime.now()
days_remaining = (expiry_date - current_date).days
# # 获取上一次告警时间
last_alert_time = last_alert_times.get(f"{remark}_{domain}", 0)
c_time = time.time()
if c_time - last_alert_time > SSL_ALTER_INTERVAL:
# 在到期前特定天数发送 Telegram 通知
if days_remaining < 7:
send_telegram_alert(f"Content: 🤦Certificate will expire in ${days_remaining} days \n{remark}: {domain}")
last_alert_times[f"{remark}_{domain}"] = c_time
# print('正在检查:', domain)
current_time = get_current_time()
print(f"{current_time} {domain} Certificate expiration date:{expiry_date}, left until expiration {days_remaining} day")
except Exception as e:
# # 获取上一次告警时间
last_alert_time = last_alert_times.get(f"{remark}_{domain}", 0)
c_time = time.time()
if c_time - last_alert_time > SSL_ALTER_INTERVAL:
send_telegram_alert(f"Content: {url} {e}")
print("An error occurred:", e)
def main():
'''主函数'''
# config = read_config_yaml('websites.yml') # 读取配置文件
while True: # 持续循环检查
config = read_config_yaml('websites.yml') # 读取配置文件
must_schems = "https://"
ssl_threads = []
THREADS = allocate_threads()
# 根据环境判断使用进程,还是线程
current_time = get_current_time()
cpu_use = get_cpu_use()
print('='*70)
if cpu_use > 100: # cpu load 大于 100
msg = f"{current_time} cpu load is high ({cpu_use}%), continue, sleep {SSL_CHECK_INTERVAL} s......."
print(msg)
send_telegram_alert(f"Content: {msg}")
time.sleep(SSL_CHECK_INTERVAL)
continue
# print(config)
for website in config['websites']:
for type_name in config['websites'][website]:
for index, domain in enumerate(config['websites'][website][type_name]):
t = threading.Thread(target=check_certificate_expiry, args=(f"{must_schems}{domain}", f"{website}_{type_name}")) # 使用多线程检查网站状态
t.start()
ssl_threads.append(t)
if len(ssl_threads) >= THREADS:
for t in ssl_threads:
t.join()
ssl_threads = []
time.sleep(SSL_CHECK_INTERVAL) # 休眠后再次检查
if __name__ == "__main__":
main()
效果
Docker 容器方式
root@monitor-1:/data/monitor/check# cat ssl_Dockerfile
FROM python:3.9-slim
MAINTAINER ck
WORKDIR /app
COPY check_ssl.py main.py
RUN pip install requests psutil pyyaml python-whois --no-cache-dir
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
CMD ["python", "main.py"]
docker build --no-cache -t check_ssl -f ssl_Dockerfile .
docker-compose up -d
docker-compose.yaml
root@monitor-1:/data/monitor/check# cat docker-compose.yml
version: "3"
services:
check_ssl:
image: check_ssl:latest
container_name: check_ssl
network_mode: host
dns:
- 8.8.8.8
environment:
- PUBLIC_NETWORK=47.57.142.166
- TELEGRAM_BOT_TOKEN=6685540055:AAHTc5hXy77YV9TPck72HYhQcU53FH82YhM
- TELEGRAM_CHAT_ID=-1002013511378
volumes:
- ${PWD}/websites.yml:/app/websites.yml
Last updated