监控SSL证书可用性并发送Telegram通知

实现功能

  • 检查时间策略

  • 告警时间策略

  • 通知时间策略

  • 采集器公网地址汇报

websites.yml

websites:
    group_1:
        api:
            - xxxx.com
            - xxxx.com
        app:
            - xxxx.com
            - xxxx.com
    group_2:
        api:
            - xxxx.com
            - xxxx.com
        app:
            - xxxx.com
            - xxxx.com

check_ssl.py

#!/usr/bin/python3
# pip install python-whois requests psutil pyyaml

import configparser
import threading
import time
import os
import datetime
import sys
import ssl
import socket
import yaml
import psutil
import requests






PUBLIC_NETWORK = os.environ.get("PUBLIC_NETWORK", '8.8.8.8')        # 公网IP地址
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", '')
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", '')
SSL_ALTER_INTERVAL = os.environ.get("SSL_ALTER_INTERVAL", 86400)       # 告警间隔,单位秒
SSL_CHECK_INTERVAL = os.environ.get("SSL_CHECK_INTERVAL", 3600)        # 证书检测间隔时间,单位秒

last_alert_times = {} 


if TELEGRAM_BOT_TOKEN is None or TELEGRAM_CHAT_ID is None:
    print(f"TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID can't be empty")
    sys.exit(1)


def get_cpu_count():
    """获取 CPU 数量"""
    return os.cpu_count()



def allocate_threads():
    """根据 CPU 数量分配线程数"""
    cpu_count = get_cpu_count()
    if cpu_count is None:
        return 1              # 如果无法获取 CPU 数量,则默认为一个线程
    elif cpu_count == 1:
        return 1              # 如果只有一个 CPU,则使用一个线程
    else:
        return min(cpu_count * 3, 32)  #  根据 CPU 数量动态分配线程数;min() 函数用于在多个参数中找到最小值, 例如,最多使用 16 个线程


def get_cpu_use():
    '''获取 CPU 的负载率'''
    cpu_load = psutil.cpu_percent(interval=1)
    return cpu_load


def read_config_ini(filename):
    '''读取配置文件'''
    config = configparser.ConfigParser()
    config.read(filename)
    return config


# 读取配置文件
def read_config_yaml(filename):
    with open(filename, 'r') as file:
        config = yaml.safe_load(file)
    return config



def send_telegram_alert(message):
    '''发送 Telegram 告警'''
    
    current_time = get_current_time()
    print(f"{current_time} Sending Telegram alert for: {message}")

    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    data = {"chat_id": TELEGRAM_CHAT_ID, "text": f"Collector: {PUBLIC_NETWORK} \n{message}"}
    response = requests.post(url, data=data)
    if response.status_code == 200:
        print(f"{current_time} Message sent successfully!")
    else:
        print(f"{current_time} Failed to send message. Status code: {response.status_code}")




def get_current_time():
    '''获取当前时间'''
    current_datetime  = datetime.datetime.now()
    formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
    return formatted_datetime

def get_cert_expiry(domain):
    '''获取证书时间'''
    # try:
    context = ssl.create_default_context()
    with context.wrap_socket(socket.socket(), server_hostname=domain) as ssock:
        ssock.connect((domain, 443))
        cert = ssock.getpeercert()
        # 获取证书到期日期
        expiry_date_str = cert['notAfter']
        expiry_date = datetime.datetime.strptime(expiry_date_str, "%b %d %H:%M:%S %Y %Z")
        return expiry_date
    # except Exception as e:
    #     send_telegram_alert(f"Content: {domain} {e}")
    #     print("An error occurred:", e)

def check_certificate_expiry(url, remark):
    '''检查是否使用 https 协议'''
    
    try:
        
        if url.startswith("https://"):
            domain = url.split("//")[1]
            expiry_date = get_cert_expiry(domain)
            if expiry_date:
                current_date = datetime.datetime.now()
                days_remaining = (expiry_date - current_date).days

                # # 获取上一次告警时间
                last_alert_time = last_alert_times.get(f"{remark}_{domain}", 0)
                c_time = time.time()

                if c_time - last_alert_time > SSL_ALTER_INTERVAL:
                    # 在到期前特定天数发送 Telegram 通知
                    if days_remaining < 7:
                        send_telegram_alert(f"Content: 🤦Certificate will expire in ${days_remaining} days \n{remark}: {domain}")
                        
                    last_alert_times[f"{remark}_{domain}"] = c_time
                # print('正在检查:', domain)


                current_time = get_current_time()
                print(f"{current_time} {domain} Certificate expiration date:{expiry_date}, left until expiration {days_remaining} day")
    except Exception as e:
        # # 获取上一次告警时间
        last_alert_time = last_alert_times.get(f"{remark}_{domain}", 0)
        c_time = time.time()

        if c_time - last_alert_time > SSL_ALTER_INTERVAL:
            send_telegram_alert(f"Content: {url} {e}")
        print("An error occurred:", e)




def main():
    '''主函数'''
    
    

    # config = read_config_yaml('websites.yml')  # 读取配置文件

    while True:              # 持续循环检查
        config = read_config_yaml('websites.yml')  # 读取配置文件
        must_schems = "https://"
        ssl_threads = []
        THREADS = allocate_threads()

        # 根据环境判断使用进程,还是线程
        current_time = get_current_time()
        cpu_use = get_cpu_use()
        print('='*70)

        if cpu_use > 100:     # cpu load 大于 100
            msg = f"{current_time} cpu load is high ({cpu_use}%), continue, sleep {SSL_CHECK_INTERVAL} s......."
            print(msg)
            send_telegram_alert(f"Content: {msg}")
            time.sleep(SSL_CHECK_INTERVAL)
            continue


        # print(config)
        for website in config['websites']:
            for type_name in config['websites'][website]: 
                for index, domain in enumerate(config['websites'][website][type_name]):
                    t = threading.Thread(target=check_certificate_expiry, args=(f"{must_schems}{domain}", f"{website}_{type_name}"))  # 使用多线程检查网站状态
                    t.start()
                    ssl_threads.append(t)

                    if len(ssl_threads) >= THREADS:
                        for t in ssl_threads:
                            t.join()
                        ssl_threads = []

        time.sleep(SSL_CHECK_INTERVAL)  # 休眠后再次检查





if __name__ == "__main__":
    main()

效果

Docker 容器方式

root@monitor-1:/data/monitor/check# cat ssl_Dockerfile 
FROM python:3.9-slim

MAINTAINER ck

WORKDIR /app
COPY check_ssl.py main.py


RUN pip install requests psutil pyyaml python-whois --no-cache-dir


ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone


CMD ["python", "main.py"]
docker build --no-cache -t check_ssl -f ssl_Dockerfile .
docker-compose up -d

docker-compose.yaml

root@monitor-1:/data/monitor/check# cat docker-compose.yml 
version: "3"
services:
  check_ssl:
    image: check_ssl:latest
    container_name: check_ssl
    network_mode: host
    dns:
      - 8.8.8.8
    environment:
      - PUBLIC_NETWORK=47.57.142.166
      - TELEGRAM_BOT_TOKEN=6685540055:AAHTc5hXy77YV9TPck72HYhQcU53FH82YhM
      - TELEGRAM_CHAT_ID=-1002013511378
    volumes:
      -  ${PWD}/websites.yml:/app/websites.yml

Last updated