监控端口可用性并发送Telegram通知
配置文件 config.ini
[IP1]
ip = 192.168.0.1
ports = 80,443
remarks = Http,Https
[IP2]
ip = 192.168.0.2
ports = 80,443
remarks = Http,Https
程序 main.py
文件热加载
import socket
import threading
import requests
import time
import configparser
import os
import datetime
# Telegram bot token和chat ID,请替换成你自己的
TELEGRAM_BOT_TOKEN = 'you-bot-token'
TELEGRAM_CHAT_ID = 'you-chat-id'
# 线程数
THREADS = 4
# 记录上一次告警时间的字典,以 (IP, Port) 为键,时间戳为值
last_alert_times = {}
if os.environ.get('DOCKER_CONTAINER'):
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", TELEGRAM_BOT_TOKEN)
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", TELEGRAM_CHAT_ID)
THREADS = os.environ.get("THREADS", THREADS)
def send_telegram_alert(message):
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
response = requests.post(url, data=data)
if response.status_code == 200:
print("Message sent successfully!")
else:
print(f"Failed to send message. Status code: {response.status_code}")
def check_port(ip, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
return True
else:
return False
except Exception as e:
print(f"Error occurred: {e}")
return False
finally:
sock.close()
def port_scan(ip, ports, remarks):
for port, remark in zip(ports, remarks):
current_datetime = datetime.datetime.now()
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
if check_port(ip, port):
print(f"{formatted_datetime} Port {port} on {ip} is open. ({remark})")
else:
print(f"{formatted_datetime} Port {port} on {ip} is closed. ({remark})")
# # 获取上一次告警时间
last_alert_time = last_alert_times.get((ip, port), 0)
current_time = time.time()
# 如果距离上一次告警时间超过1分钟,则发送新的告警
if current_time - last_alert_time > 60:
# 发送告警
send_telegram_alert(f"🤦🤦🤦 Port {port} on {ip} is closed. ({remark})")
# 更新告警时间
last_alert_times[(ip, port)] = current_time
def read_config(filename):
config = configparser.ConfigParser()
config.read(filename)
return config
def main():
config_file = 'config.ini'
config = read_config(config_file)
while True:
# 每次循环,取一次最新的配置文件
config = read_config(config_file)
# 创建线程列表
threads = []
# 遍历配置文件中的IP、端口和备注,并创建线程
for section in config.sections():
ip = config[section]['ip']
ports = [int(port) for port in config[section]['ports'].split(',')]
remarks = config[section]['remarks'].split(',')
# 每个IP一个线程
t = threading.Thread(target=port_scan, args=(ip, ports, remarks))
t.start()
threads.append(t)
# 控制线程数
if len(threads) >= THREADS:
for t in threads:
t.join()
threads = []
# 等待5秒后再次执行
time.sleep(5)
if __name__ == "__main__":
main()
效果
docker 打包(可选)
Dockefile
FROM alpine:latest
RUN apk add --no-cache python3 py3-pip py3-requests tzdata
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN ln -sf /usr/bin/python3 /usr/bin/python
WORKDIR /app
COPY . .
CMD ["python", "main.py"]
打包镜像
docker build --no-cache -t check_port .
docker-compose 启动
[root@elk docker_data]# cat docker-compose.yaml
version: "3"
services:
check_port:
image: check_port:latest
container_name: check_port
cpus: 0.1 # 表示使用系统的 50% CPU,可以根据需要调整
mem_limit: 512m # # 限制内存为512MB,可以根据需要调整
environment:
#- TZ=Asia/Shanghai
- TELEGRAM_BOT_TOKEN=you-telegram-token
- TELEGRAM_CHAT_ID=you-telegram-id
- DOCKER_CONTAINER=true
volumes:
- ${PWD}/check_port/config.ini:/app/config.ini
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "5"
Last updated