监控端口可用性并发送Telegram通知

配置文件 config.ini

[IP1]
ip = 192.168.0.1
ports = 80,443
remarks = Http,Https

[IP2]
ip = 192.168.0.2
ports = 80,443
remarks = Http,Https

程序 main.py

文件热加载

import socket
import threading
import requests
import time
import configparser
import os
import datetime

# Telegram bot token和chat ID,请替换成你自己的
TELEGRAM_BOT_TOKEN = 'you-bot-token'
TELEGRAM_CHAT_ID = 'you-chat-id'

# 线程数
THREADS = 4

# 记录上一次告警时间的字典,以 (IP, Port) 为键,时间戳为值
last_alert_times = {}



if os.environ.get('DOCKER_CONTAINER'):
    TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", TELEGRAM_BOT_TOKEN)
    TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", TELEGRAM_CHAT_ID)
    THREADS = os.environ.get("THREADS", THREADS)



def send_telegram_alert(message):
    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    data = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
    response = requests.post(url, data=data)
    if response.status_code == 200:
        print("Message sent successfully!")
    else:
        print(f"Failed to send message. Status code: {response.status_code}")


def check_port(ip, port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return True
        else:
            return False
    except Exception as e:
        print(f"Error occurred: {e}")
        return False
    finally:
        sock.close()

def port_scan(ip, ports, remarks):
    for port, remark in zip(ports, remarks):
        current_datetime  = datetime.datetime.now()
        formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
        if check_port(ip, port):
            print(f"{formatted_datetime} Port {port} on {ip} is open. ({remark})")
        else:
            print(f"{formatted_datetime} Port {port} on {ip} is closed. ({remark})")
            # # 获取上一次告警时间
            last_alert_time = last_alert_times.get((ip, port), 0)
            current_time = time.time()
            # 如果距离上一次告警时间超过1分钟,则发送新的告警
            if current_time - last_alert_time > 60:
                # 发送告警
                send_telegram_alert(f"🤦🤦🤦 Port {port} on {ip} is closed. ({remark})")
                # 更新告警时间
                last_alert_times[(ip, port)] = current_time


def read_config(filename):
    config = configparser.ConfigParser()
    config.read(filename)
    return config


def main():
    config_file = 'config.ini'
    config = read_config(config_file)

    while True:
        # 每次循环,取一次最新的配置文件
        config = read_config(config_file)

        # 创建线程列表
        threads = []

        # 遍历配置文件中的IP、端口和备注,并创建线程
        for section in config.sections():
            ip = config[section]['ip']
            ports = [int(port) for port in config[section]['ports'].split(',')]
            remarks = config[section]['remarks'].split(',')
            # 每个IP一个线程
            t = threading.Thread(target=port_scan, args=(ip, ports, remarks))
            t.start()
            threads.append(t)

            # 控制线程数
            if len(threads) >= THREADS:
                for t in threads:
                    t.join()
                threads = []

        # 等待5秒后再次执行
        time.sleep(5)

if __name__ == "__main__":
    main()



效果

docker 打包(可选)

Dockefile

FROM alpine:latest

RUN apk add --no-cache python3 py3-pip  py3-requests tzdata

ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN ln -sf /usr/bin/python3 /usr/bin/python

WORKDIR /app
COPY . .

CMD ["python", "main.py"]

打包镜像


docker build --no-cache -t check_port .

docker-compose 启动

[root@elk docker_data]# cat docker-compose.yaml 
version: "3"
services:
  check_port:
    image: check_port:latest
    container_name: check_port
    cpus: 0.1       # 表示使用系统的 50% CPU,可以根据需要调整
    mem_limit: 512m # # 限制内存为512MB,可以根据需要调整
    environment:
      #- TZ=Asia/Shanghai
      - TELEGRAM_BOT_TOKEN=you-telegram-token
      - TELEGRAM_CHAT_ID=you-telegram-id
      - DOCKER_CONTAINER=true
    volumes:
      -  ${PWD}/check_port/config.ini:/app/config.ini
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "5"

Last updated