# 监控网站可用性并发送Telegram通知

### 实现功能 <a href="#e5-ae-9e-e7-8e-b0-e5-8a-9f-e8-83-bd" id="e5-ae-9e-e7-8e-b0-e5-8a-9f-e8-83-bd"></a>

* 检查时间策略
* 告警时间策略
* 通知时间策略
* 连续失败策略
* 采集器公网地址汇报

### `websites.yml` <a href="#websites.yml" id="websites.yml"></a>

```yaml
websites_schems: "https"
websites:
    group_1:
        api:
            - xxxx.com
            - xxxx.com
        app:
            - xxxx.com
            - xxxx.com
    group_2:
        api:
            - xxxx.com
            - xxxx.com
        app:
            - xxxx.com
            - xxxx.com
```

### check\_websites.py <a href="#check_websites.py" id="check_websites.py"></a>

```python
# pip install python-whois requests psutil pyyaml

import configparser
import threading
import requests
import time
import os
import datetime
import sys
import yaml
import psutil
import warnings
import httpx
import concurrent.futures


# 忽略 InsecureRequestWarning 警告
warnings.filterwarnings("ignore", message="Unverified HTTPS request")




PUBLIC_NETWORK = os.environ.get("PUBLIC_NETWORK", '8.8.8.8')         # 公网IP地址
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", '')
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", '')
WEB_ALTER_INTERVAL = os.environ.get("WEB_ALTER_INTERVAL", 300)                 # 告警间隔，单位秒
WEB_CHECK_INTERVAL = os.environ.get("WEB_CHECK_INTERVAL", 30)             # 网站检测间隔时间，单位秒
#ENABLE_VERIFY = bool(os.environ.get("ENABLE_VERIFY", True))             # 是否验证证书

last_alert_times = {}              # 存储上次网站检测发送告警的时间
retry_time = 5   # 重试次数



if TELEGRAM_BOT_TOKEN is None or TELEGRAM_CHAT_ID is None:
    print(f"TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID can't be empty")
    sys.exit(1)


def get_cpu_count():
    """获取 CPU 数量"""
    return os.cpu_count()



def allocate_threads():
    """根据 CPU 数量分配线程数"""
    cpu_count = get_cpu_count()
    if cpu_count is None:
        return 1              # 如果无法获取 CPU 数量，则默认为一个线程
    elif cpu_count == 1:
        return 1              # 如果只有一个 CPU，则使用一个线��
    else:
        return min(cpu_count * 3, 32)  #  根据 CPU 数量动态分配线程数；min() 函数用于在多个参数中找到最小值, 例如，最多使用 16 个线程


def get_cpu_use():
    '''获取 CPU 的负载率'''
    cpu_load = psutil.cpu_percent(interval=1)
    return cpu_load


def read_config_ini(filename):
    '''读取配置文件'''
    config = configparser.ConfigParser()
    config.read(filename)
    return config


# 读取配置文件
def read_config_yaml(filename):
    with open(filename, 'r') as file:
        config = yaml.safe_load(file)
    return config

# 创建线程局部变量
# thread_local = threading.local()
# thread_local.success = False
# thread_local.code = 0

def fetch_status(url, code=None):
    timeout = (3,5)  # 请求超时，连接超时
    

    for i in range(retry_time):
        try:
            print(f'{i} time。。。。。。。。。。。。。。。。')
            with httpx.Client(timeout=timeout) as client:
                response = client.head(url)
                code = response.status_code
                return url, code
        except Exception as e:
            # print(f'请求出现错误: {str(e)}')
            if i == 2:
                code = 999
                return url, code
            pass
                    




def check_website(urls, remark, schems, num_threads):
    """检查网站状态"""
    try:
        # response = requests.head(url,timeout=(3, 5), verify=False)
        # num_threads = 10  # 设置线程数量
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
            current_time = get_current_time()

            futures = [executor.submit(fetch_status, f"{schems}://{url}") for url in urls]

            results = [future.result() for future in concurrent.futures.as_completed(futures)]
            # print(results)
            

            for r in results:
                if isinstance(r, tuple):
                    r_list = list(r)
                    url = r_list[0]
                    status_code = r_list[-1]

                    print(f"{current_time} URL: {url}, 状态码: {status_code}")
                    # if status_code > 200 or isinstance(status_code, str):
                    if status_code >= 400:
                        # # 获取上一次告警时间
                        last_alert_time = last_alert_times.get(f"{remark}_{url}", 0)
                        c_time = time.time()
                        m_msg = ''

                        if c_time - last_alert_time > WEB_ALTER_INTERVAL:
                            if status_code == 999:
                                m_msg = "此節點無法訪問此域名"
                            else:
                                m_msg = status_code
                            send_telegram_alert(f"消息内容: 🤦request not in between 0~399, Please check\n返回状态: {status_code} {m_msg}\n商户代理: {remark} \n商户域名: {url} ")
                            last_alert_times[f"{remark}_{url}"] = c_time
            
            # for future, url in zip(concurrent.futures.as_completed(futures), urls):
            #     status_code = future.result()
            #     print(f"{current_time} URL: {url}, 状态码: {status_code}")

            #     # if status_code > 200 or isinstance(status_code, str):
            #     if status_code >= 400:
            #         # # 获取上一次告警时间
            #         last_alert_time = last_alert_times.get(f"{remark}_{url}", 0)
            #         c_time = time.time()

            #         if c_time - last_alert_time > WEB_ALTER_INTERVAL:
            #             send_telegram_alert(f"消息内容: 🤦request not in between 0~399, Please check\n返回状态: {status_code} \n商户代理: {remark} \n商户域名: {url} ")
            #             last_alert_times[f"{remark}_{url}"] = c_time


    except Exception as e:
        print(f"{current_time} Exception occurred while checking {remark}  {url}: {e}")
        last_alert_time = last_alert_times.get(f"{remark}_{url}", 0)
        c_time = time.time()
        if c_time - last_alert_time > WEB_ALTER_INTERVAL:
            send_telegram_alert(f"消息内容: 🤦The request could not be completed, RequestTimeout 3s; ReadTimeout 5s; ( {e} ) \n商户代理: {remark} \n商户域名: {url} ")
            last_alert_times[f"{remark}_{url}"] = c_time

def send_telegram_alert(message):
    '''发送 Telegram 告警'''
    
    current_time = get_current_time()
    print(f"{current_time} Sending Telegram alert for: {message}")

    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    data = {"chat_id": TELEGRAM_CHAT_ID, "text": f"节点地址: {PUBLIC_NETWORK}\n重试频���: {retry_time}次\n{message}"}
    response = requests.post(url, data=data)
    if response.status_code == 200:
        print(f"{current_time} Message sent successfully!")
    else:
        print(f"{current_time} Failed to send message. Status code: {response.status_code}")





def get_current_time():
    '''获取当前时间'''
    current_datetime  = datetime.datetime.now()
    formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
    return formatted_datetime



def main():
    '''主函数'''

    # config = read_config_yaml('websites.yml')  # 读取配置文件



    while True:              # 持续循环检查
        config = read_config_yaml('websites.yml')  # 读取配置文件
        
        web_threads = []
        num_threads = allocate_threads()

        current_time = get_current_time()
        cpu_use = get_cpu_use()
        print('='*70)

        if cpu_use > 100:     # cpu load 大于 100
            msg = f"{current_time} cpu load is high ({cpu_use}%), continue, sleep {WEB_CHECK_INTERVAL}s......."
            print(msg)
            send_telegram_alert(f"Content: {msg}")
            time.sleep(WEB_CHECK_INTERVAL)
            continue
    
        print(f"{current_time} use {num_threads} threading......")


        # print(config)
        for website in config['websites']:
            for type_name in config['websites'][website]: 
                check_website(config['websites'][website][type_name], f"{website} {type_name}", config['websites_schems'], num_threads)
        time.sleep(WEB_CHECK_INTERVAL)  # 休眠后再次检查





if __name__ == "__main__":
    main()
```

### 效果 <a href="#e6-95-88-e6-9e-9c" id="e6-95-88-e6-9e-9c"></a>

### Docker 容器方式 <a href="#docker-e5-ae-b9-e5-99-a8-e6-96-b9-e5-bc-8f" id="docker-e5-ae-b9-e5-99-a8-e6-96-b9-e5-bc-8f"></a>

* website\_Dockerfile

```
FROM python:3.9-slim

MAINTAINER ck

WORKDIR /app
COPY check_website.py main.py


RUN pip install requests psutil pyyaml httpx --no-cache-dir


ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone


CMD ["python", "main.py"]
```

```shell
docker build --no-cache -t check_website -f website_Dockerfile .
docker-compose up -d
```

### Docker-compose.yaml <a href="#docker-compose.yaml" id="docker-compose.yaml"></a>

```
root@monitor-1:/data/monitor/check# cat docker-compose.yml 
version: "3"
services:
  check_website:
    image: check_website:latest
    container_name: check_website
    network_mode: host
    dns:
      - 8.8.8.8
    environment:
      - PUBLIC_NETWORK=8.8.8.8
      - TELEGRAM_BOT_TOKEN=xxxxxxxx
      - TELEGRAM_CHAT_ID=xxxxx
        #- ENABLE_VERIFY=False
    volumes:
      -  ${PWD}/websites.yml:/app/websites.yml
```

\`


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://close.gitbook.io/yun-wei-bi-ji/python/python-xiao-ji-qiao/jian-kong-wang-zhan-ke-yong-xing-bing-fa-song-telegram-tong-zhi.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
