ELK-Kafka-Filebeat
docker 环境
# 在 docker 节点执行
# 腾讯云 docker hub 镜像
# export REGISTRY_MIRROR="https://mirror.ccs.tencentyun.com"
# DaoCloud 镜像
# export REGISTRY_MIRROR="http://f1361db2.m.daocloud.io"
# 阿里云 docker hub 镜像
export REGISTRY_MIRROR=https://registry.cn-hangzhou.aliyuncs.com
# 安装 docker
# 参考文档如下
# https://docs.docker.com/install/linux/docker-ce/centos/
# https://docs.docker.com/install/linux/linux-postinstall/
# 卸载旧版本
yum remove -y docker \
docker-client \
docker-client-latest \
docker-ce-cli \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine
# 设置 yum repository
yum install -y yum-utils \
device-mapper-persistent-data \
lvm2
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安装并启动 docker
yum install -y docker-ce-19.03.11 docker-ce-cli-19.03.11 containerd.io-1.2.13
mkdir /etc/docker || true
cat > /etc/docker/daemon.json <<EOF
{
"registry-mirrors": ["${REGISTRY_MIRROR}"],
"exec-opts": ["native.cgroupdriver=systemd"],
"log-driver": "json-file",
"log-opts": {
"max-size": "100m"
},
"storage-driver": "overlay2",
"storage-opts": [
"overlay2.override_kernel_check=true"
]
}
EOF
mkdir -p /etc/systemd/system/docker.service.d
# Restart Docker
systemctl daemon-reload
systemctl enable docker
systemctl restart docker
# 关闭 防火墙
systemctl stop firewalld
systemctl disable firewalld
# 关闭 SeLinux
setenforce 0
sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config
# 关闭 swap
swapoff -a
yes | cp /etc/fstab /etc/fstab_bak
cat /etc/fstab_bak |grep -v swap > /etc/fstab
Docker Compose 环境
# 下载 Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# 修改该文件的权限为可执行
chmod +x /usr/local/bin/docker-compose
# 验证信息
docker-compose --version
环境初始化
# 需要设置系统内核参数,否则 ES 会因为内存不足无法启动
# 改变设置
sysctl -w vm.max_map_count=262144
# 使之立即生效
sysctl -p
# 创建 logstash 目录,并将 Logstash 的配置文件 logstash.conf 拷贝到该目录
mkdir -p /mydata/logstash
# 需要创建 elasticsearch/data 目录并设置权限,否则 ES 会因为无权限访问而启动失败
mkdir -p /mydata/elasticsearch/data/
chmod 1000:1000 /mydata/elasticsearch/data/
docker-compose 文件
version: '3'
services:
elasticsearch:
image: elasticsearch:7.6.2
container_name: elasticsearch
user: root
environment:
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "cluster.name=elasticsearch" #设置集群名称为elasticsearch
- "discovery.type=single-node" #以单一节点模式启动
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- ./elasticsearch.yml:/usr/share/elasticsearch/elasticsearch.yml
- ./mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins #插件文件挂载
- ./mydata/elasticsearch/data:/usr/share/elasticsearch/data #数据文件挂载
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
ports:
- 9200:9200
- 9300:9300
networks:
- elastic
logstash:
image: logstash:7.6.2
container_name: logstash
environment:
- TZ=Asia/Shanghai
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf #挂载logstash的配置文件
depends_on:
- elasticsearch # kibana在elasticsearch启动之后再启动
links:
- elasticsearch:es # 可以用es这个域名访问elasticsearch服务
ports:
- 5044:5044
networks:
- elastic
kibana:
image: kibana:7.6.2
container_name: kibana
links:
- elasticsearch:es #可以用es这个域名访问elasticsearch服务
depends_on:
- elasticsearch #kibana在elasticsearch启动之后再启动
environment:
- ./kibana.yml:/usr/share/kibana/config/kibana.yml #设置访问elasticsearch的地址
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
ports:
- 5601:5601
networks:
- elastic
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
volumes:
- ./mydata/zookeeper/data:/data
- ./mydata/zookeeper/log:/data/log
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
networks:
- elastic
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
volumes:
- ./mydata/kafka:/kafka
- /var/run/docker.sock:/var/run/docker.sock
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
links:
- zookeeper
ports:
- "9092:9092"
networks:
- elastic
environment:
- KAFKA_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUT:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=OUT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES=2000000
- KAFKA_CREATE_TOPICS=logs:1:1
networks:
elastic:
elasticsearch 配置文件
# elasticsearch.yml
# 名称,网络, 在集群上,除了各自名称 name 不一样,其他配置都是一样
cluster.name: "docker-cluster"
network.host: 0.0.0.0
discovery.seed_hosts: ["es:9300"]
cluster.initial_master_nodes: ["es:9300"]
# 数据,日志
path.data: /usr/share/elasticsearch/data
path.logs: /usr/share/elasticsearch/log
# 跨域
http.cors.enabled: true
http.cors.allow-origin: "*"
# 开启权限认证后, es-head-master 访问 es 需要的配置
http.cors.allow-headers: Authorization, X-Requested-With,Content-Length,Content-Type
# 开启权限认证
xpack.security.enabled: true
# 传输通信
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /usr/share/elasticsearch/config/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: /usr/share/elasticsearch/config/elastic-certificates.p12
logstash 配置文件
# cat logstash.conf
input {
kafka {
bootstrap_servers => ["kafka:29092"]
group_id => "nginx_access"
topics => ["nginx_access"]
codec => json
}
kafka {
bootstrap_servers => ["kafka:29092"]
group_id => "nginx_error"
topics => ["nginx_error"]
codec => json
}
kafka {
bootstrap_servers => ["kafka:29092"]
group_id => "mysql_error"
topics => ["mysql_error"]
codec => json
}
kafka {
bootstrap_servers => ["kafka:29092"]
group_id => "mysql_slow"
topics => ["mysql_slow"]
codec => json
}
}
output {
elasticsearch {
hosts => ["es:9200"]
#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#index => "logs-%{+YYYY.MM.dd}"
#index => "kafka-%{+YYYY.MM.dd}"
index => "kafka-%{[fields][kafka_topic]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "elastic"
}
}
kiana 配置文件
# cat kibana.yml
i18n.locale: "zh-CN"
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://es:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
启动
[root@es2 data]# docker-compose up -d
[root@es2 data]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
541e4f5421b9 logstash:7.6.2 "/usr/local/bin/dock…" 23 hours ago Up 2 minutes 0.0.0.0:5044->5044/tcp, :::5044->5044/tcp, 9600/tcp logstash
573f14a5750e wurstmeister/kafka "start-kafka.sh" 23 hours ago Up 2 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
af7405513781 kibana:7.6.2 "/usr/local/bin/dumb…" 23 hours ago Up 2 minutes 0.0.0.0:5601->5601/tcp, :::5601->5601/tcp kibana
693ad300c571 elasticsearch:7.6.2 "/usr/local/bin/dock…" 23 hours ago Up 2 minutes 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp elasticsearch
327580f65c76 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 23 hours ago Up 2 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
插件安装
# elasticsearch 需要安装中文分词器 IKAnalyzer,并重新启动。
#docker exec -it elasticsearch /bin/bash
#此命令需要在容器中运行
#/usr/share/elasticsearch/bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.2/elasticsearch-analysis-ik-7.6.2.zip
# 生产证书
#/usr/share/elasticsearch/bin/elasticsearch-certutil ca
#/usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12
#mv elastic-* /usr/share/elasticsearch/data/
# 初始化密码: elastic
#/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
#docker restart elasticsearch
# logstas h需要安装 json_lines 插件,并重新启动。
docker exec -it logstash /bin/bash
logstash-plugin install logstash-codec-json_lines
docker restart logstash
filebeat 客户端安装
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.4.2-linux-x86_64.tar.gz
tar xzvf filebeat-7.4.2-linux-x86_64.tar.gz
cd filebeat-7.4.2-linux-x86_64
filebeat.yaml 配置
filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/local/nginx/logs/access*.log
json.keys_under_root: true
json.overwrite_keys: true
fields:
kafka_topic: nginx_access # 自定义字段
- type: log
enabled: true
paths:
- /usr/local/nginx/logs/error*.log
json.keys_under_root: true
json.overwrite_keys: true
fields:
kafka_topic: nginx_error # 自定义字段
- type: log
enabled: true
paths:
- /var/log/mariadb/mariadb.log
fields:
kafka_topic: mysql_error # 自定义字段
- type: log
enabled: true
paths:
- /var/log/mariadb/slow.log
json.keys_under_root: true
json.overwrite_keys: true
#tags: ["mysql_slow"]
fields:
kafka_topic: mysql_slow # 自定义字段
#filebeat.config.modules:
# path: ${path.config}/modules.d/*.yml
# reload.enabled: false
# reload.period: 1s
setup.kibana:
host: "http://kafka:5601"
output.kafka:
hosts: ["kafka:9092"]
topic: "%{[fields.kafka_topic]}" # 发送到 kafka 的那个 topic,用于生产事件
version: 1.0.0 # kafka版本
partition.round_robin: # 向 kafka 分区 发送的方式(这里是轮询)
reachable_only: false
required_acks: 1 # kafka 应答方式
compression: gzip # 设置输出压缩编解码器。必须是none,snappy,lz4和gzip其中一个。默认为gzip
compression_level: 4 # 设置 gzip 使用的压缩级别。将此值设置为 0 将禁用压缩。压缩级别必须在 1(最佳速度)到 9(最佳压缩)的范围内。提高压缩级别会降低网络使用率,但会增加 CPU 使用率。
max_message_bytes: 100000 # 规定大于 max_message_bytes 的事件将丢弃
codec.json:
pretty: false
filebeat 参数说明
type: 输入类型。
enabled: 设置配置是否生效。 true 表示生效, false 表示不生效
paths: 需要监控的日志文件路径。多个日志路径另起一行
hosts: 消息队列 Kafka 实例接入点
topic: 日志输出到消息队列 Kafka 的 Topic。 请指定已经创建的 Topic
指定 Host
[root@vm-1# cat /etc/hosts
172.16.62.179 kafka
# 客户端启动服务
[root@vm-1#./filebeat &
更多配置
有关 filebeat 的 log input 的配置介绍见官网文档:https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html
有关 filebeat output 到 kafka 的配置介绍见官方文档:https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
配置 Logstash 管道
input {
kafka {
bootstrap_servers => ["kafka:29092"]
group_id => "nginx_access"
topics => ["nginx_access"]
codec => json
}
kafka {
bootstrap_servers => ["kafka:29092"]
group_id => "nginx_error"
topics => ["nginx_error"]
codec => json
}
kafka {
bootstrap_servers => ["kafka:29092"]
group_id => "mysql_error"
topics => ["mysql_error"]
codec => json
}
kafka {
bootstrap_servers => ["kafka:29092"]
group_id => "mysql_slow"
topics => ["mysql_slow"]
codec => json
}
}
# 分析、过滤插件,可以多个
# filter {
# grok {
# match => { "message" => "%{COMBINEDAPACHELOG}"}
# }
# geoip {
# source => "clientip"
# }
# }
output {
elasticsearch {
hosts => ["es:9200"]
#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#index => "kafka-%{+YYYY.MM.dd}"
index => "kafka-%{[fields][kafka_topic]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "elastic"
}
}
logstash 参数说明
input
bootstrap_servers: 消息队列 Kafka 实例接入点
group_id: 指定已创建的 Consumer Group 名称
topics: 指定为已创建的 Topic 名称(不存在自动创建), 需要与 Filebeat 中配置的 Topic 名称保持一致
codec: 设置未 Json, 表示解析 JSON 格式字段
output
hosts: ES 访问地址
user: ES 用户名
password: ES密码
index: 索引名称
有关 logstash 中 kafka-input 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
有关 logstash 中 grok-filter 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
有关 logstash 中 output-elasticsearch 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
查看 kafka 日志消费状
# 进入容器
docker exec -it kafka bash
# kafka 默认安装在 /opt/kafka
cd opt/kafka
# 要想查询消费数据,必须要指定组
bash-5.1# bin/kafka-consumer-groups.sh --bootstrap-server 172.16.62.179:9092 --list
logstash
# 查看 topic
bash-5.1# bin/kafka-topics.sh --list --zookeeper 172.16.62.179:2181
__consumer_offsets
logs
# 查看消费情况
bash-5.1# bin/kafka-consumer-groupsdescribe --bootstrap-server 172.16.62.179:9092 --group logstash
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
logstash logs 0 107335 107335 0 logstash-0-c6d82a1c-0f14-4372-b49f-8cd476f54d90 /172.19.0.2 logstash-0
#参数解释:
#--describe 显示详细信息
#--bootstrap-server 指定kafka连接地址
#--group 指定组。
# TOPCI: topic 名称
# PARTITION: 分区ID
# CURRENT-OFFSET: 当前已消费的条数
# LOG-END-OFFSET: 总条数
# LAG: 未消费的条数
# CONSUMER-ID: 消费ID
# HOST: 主机IP
# CLIENT-ID: 客户端ID
# 从上面的信息可以看出,topic 为 logs 总共消费了 107335 条信息, 未消费的条数为 0。也就是说,消费数据没有积压的情况.
kibana 索引
Create index pattern
The index pattern you've entered doesn't match any indices. You can match any of your 4 indices, below.
kafka-mysql_slow-2021.05.13
kafka-nginx_access-2021.05.13
Rows per page: 10
Last updated