监控网站可用性并发送Telegram通知

实现功能

  • 检查时间策略

  • 告警时间策略

  • 通知时间策略

  • 连续失败策略

  • 采集器公网地址汇报

websites.yml

websites_schems: "https"
websites:
    group_1:
        api:
            - xxxx.com
            - xxxx.com
        app:
            - xxxx.com
            - xxxx.com
    group_2:
        api:
            - xxxx.com
            - xxxx.com
        app:
            - xxxx.com
            - xxxx.com

check_websites.py

# pip install python-whois requests psutil pyyaml

import configparser
import threading
import requests
import time
import os
import datetime
import sys
import yaml
import psutil
import warnings
import httpx
import concurrent.futures


# 忽略 InsecureRequestWarning 警告
warnings.filterwarnings("ignore", message="Unverified HTTPS request")




PUBLIC_NETWORK = os.environ.get("PUBLIC_NETWORK", '8.8.8.8')         # 公网IP地址
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", '')
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", '')
WEB_ALTER_INTERVAL = os.environ.get("WEB_ALTER_INTERVAL", 300)                 # 告警间隔,单位秒
WEB_CHECK_INTERVAL = os.environ.get("WEB_CHECK_INTERVAL", 30)             # 网站检测间隔时间,单位秒
#ENABLE_VERIFY = bool(os.environ.get("ENABLE_VERIFY", True))             # 是否验证证书

last_alert_times = {}              # 存储上次网站检测发送告警的时间
retry_time = 5   # 重试次数



if TELEGRAM_BOT_TOKEN is None or TELEGRAM_CHAT_ID is None:
    print(f"TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID can't be empty")
    sys.exit(1)


def get_cpu_count():
    """获取 CPU 数量"""
    return os.cpu_count()



def allocate_threads():
    """根据 CPU 数量分配线程数"""
    cpu_count = get_cpu_count()
    if cpu_count is None:
        return 1              # 如果无法获取 CPU 数量,则默认为一个线程
    elif cpu_count == 1:
        return 1              # 如果只有一个 CPU,则使用一个线��
    else:
        return min(cpu_count * 3, 32)  #  根据 CPU 数量动态分配线程数;min() 函数用于在多个参数中找到最小值, 例如,最多使用 16 个线程


def get_cpu_use():
    '''获取 CPU 的负载率'''
    cpu_load = psutil.cpu_percent(interval=1)
    return cpu_load


def read_config_ini(filename):
    '''读取配置文件'''
    config = configparser.ConfigParser()
    config.read(filename)
    return config


# 读取配置文件
def read_config_yaml(filename):
    with open(filename, 'r') as file:
        config = yaml.safe_load(file)
    return config

# 创建线程局部变量
# thread_local = threading.local()
# thread_local.success = False
# thread_local.code = 0

def fetch_status(url, code=None):
    timeout = (3,5)  # 请求超时,连接超时
    

    for i in range(retry_time):
        try:
            print(f'{i} time。。。。。。。。。。。。。。。。')
            with httpx.Client(timeout=timeout) as client:
                response = client.head(url)
                code = response.status_code
                return url, code
        except Exception as e:
            # print(f'请求出现错误: {str(e)}')
            if i == 2:
                code = 999
                return url, code
            pass
                    




def check_website(urls, remark, schems, num_threads):
    """检查网站状态"""
    try:
        # response = requests.head(url,timeout=(3, 5), verify=False)
        # num_threads = 10  # 设置线程数量
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
            current_time = get_current_time()

            futures = [executor.submit(fetch_status, f"{schems}://{url}") for url in urls]

            results = [future.result() for future in concurrent.futures.as_completed(futures)]
            # print(results)
            

            for r in results:
                if isinstance(r, tuple):
                    r_list = list(r)
                    url = r_list[0]
                    status_code = r_list[-1]

                    print(f"{current_time} URL: {url}, 状态码: {status_code}")
                    # if status_code > 200 or isinstance(status_code, str):
                    if status_code >= 400:
                        # # 获取上一次告警时间
                        last_alert_time = last_alert_times.get(f"{remark}_{url}", 0)
                        c_time = time.time()
                        m_msg = ''

                        if c_time - last_alert_time > WEB_ALTER_INTERVAL:
                            if status_code == 999:
                                m_msg = "此節點無法訪問此域名"
                            else:
                                m_msg = status_code
                            send_telegram_alert(f"消息内容: 🤦request not in between 0~399, Please check\n返回状态: {status_code} {m_msg}\n商户代理: {remark} \n商户域名: {url} ")
                            last_alert_times[f"{remark}_{url}"] = c_time
            
            # for future, url in zip(concurrent.futures.as_completed(futures), urls):
            #     status_code = future.result()
            #     print(f"{current_time} URL: {url}, 状态码: {status_code}")

            #     # if status_code > 200 or isinstance(status_code, str):
            #     if status_code >= 400:
            #         # # 获取上一次告警时间
            #         last_alert_time = last_alert_times.get(f"{remark}_{url}", 0)
            #         c_time = time.time()

            #         if c_time - last_alert_time > WEB_ALTER_INTERVAL:
            #             send_telegram_alert(f"消息内容: 🤦request not in between 0~399, Please check\n返回状态: {status_code} \n商户代理: {remark} \n商户域名: {url} ")
            #             last_alert_times[f"{remark}_{url}"] = c_time


    except Exception as e:
        print(f"{current_time} Exception occurred while checking {remark}  {url}: {e}")
        last_alert_time = last_alert_times.get(f"{remark}_{url}", 0)
        c_time = time.time()
        if c_time - last_alert_time > WEB_ALTER_INTERVAL:
            send_telegram_alert(f"消息内容: 🤦The request could not be completed, RequestTimeout 3s; ReadTimeout 5s; ( {e} ) \n商户代理: {remark} \n商户域名: {url} ")
            last_alert_times[f"{remark}_{url}"] = c_time

def send_telegram_alert(message):
    '''发送 Telegram 告警'''
    
    current_time = get_current_time()
    print(f"{current_time} Sending Telegram alert for: {message}")

    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    data = {"chat_id": TELEGRAM_CHAT_ID, "text": f"节点地址: {PUBLIC_NETWORK}\n重试频���: {retry_time}\n{message}"}
    response = requests.post(url, data=data)
    if response.status_code == 200:
        print(f"{current_time} Message sent successfully!")
    else:
        print(f"{current_time} Failed to send message. Status code: {response.status_code}")





def get_current_time():
    '''获取当前时间'''
    current_datetime  = datetime.datetime.now()
    formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
    return formatted_datetime



def main():
    '''主函数'''

    # config = read_config_yaml('websites.yml')  # 读取配置文件



    while True:              # 持续循环检查
        config = read_config_yaml('websites.yml')  # 读取配置文件
        
        web_threads = []
        num_threads = allocate_threads()

        current_time = get_current_time()
        cpu_use = get_cpu_use()
        print('='*70)

        if cpu_use > 100:     # cpu load 大于 100
            msg = f"{current_time} cpu load is high ({cpu_use}%), continue, sleep {WEB_CHECK_INTERVAL}s......."
            print(msg)
            send_telegram_alert(f"Content: {msg}")
            time.sleep(WEB_CHECK_INTERVAL)
            continue
    
        print(f"{current_time} use {num_threads} threading......")


        # print(config)
        for website in config['websites']:
            for type_name in config['websites'][website]: 
                check_website(config['websites'][website][type_name], f"{website} {type_name}", config['websites_schems'], num_threads)
        time.sleep(WEB_CHECK_INTERVAL)  # 休眠后再次检查





if __name__ == "__main__":
    main()

效果

Docker 容器方式

  • website_Dockerfile

FROM python:3.9-slim

MAINTAINER ck

WORKDIR /app
COPY check_website.py main.py


RUN pip install requests psutil pyyaml httpx --no-cache-dir


ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone


CMD ["python", "main.py"]
docker build --no-cache -t check_website -f website_Dockerfile .
docker-compose up -d

Docker-compose.yaml

root@monitor-1:/data/monitor/check# cat docker-compose.yml 
version: "3"
services:
  check_website:
    image: check_website:latest
    container_name: check_website
    network_mode: host
    dns:
      - 8.8.8.8
    environment:
      - PUBLIC_NETWORK=8.8.8.8
      - TELEGRAM_BOT_TOKEN=xxxxxxxx
      - TELEGRAM_CHAT_ID=xxxxx
        #- ENABLE_VERIFY=False
    volumes:
      -  ${PWD}/websites.yml:/app/websites.yml

`

Last updated