云计算百科
云计算领域专业知识百科平台

集中管理与实时审计:构建Linux集群(1300台服务器)日志平台的最佳实践

简介

随着企业IT基础设施的不断扩大,Linux服务器的数量也日益增多,传统的单机日志管理方式已无法满足对日志数据集中管理、审计和分析的需求。尤其是在大型集群环境中,如何高效地收集、存储和分析日志成为了一项重要的技术挑战。

背景

在实现对大型Linux集群的日志集中管理与审计,特别是针对rsyslog日志的收集、操作命令日志以及登录日志的审计。通过部署集中的rsyslog服务端,能够统一收集1300多台Linux服务器的系统日志,确保日志数据的集中化管理。使用Filebeat作为日志收集工具,将日志数据推送至Logstash进行清洗和转换,最终存储到Elasticsearch中,并通过Kibana实现实时数据可视化展示。这种方式不仅简化了日志管理流程,还提高了系统的监控效率和安全性。

需求

统一管理 1300 台服务器的Linux系统日志,能够及时发现问题和告警。 在这里插入图片描述

解决方案

  • 集中管理:通过统一的服务端收集所有Linux服务器的日志数据,减少单独配置每台服务器的工作量。
  • 日志审计:对操作命令日志、登录日志等进行审计,确保系统行为的可追溯性。
  • 数据清洗与分析:通过日志清洗与格式转换,确保日志数据的标准化,便于后续的分析和可视化展示。
  • 实时展示:利用Kibana将清洗后的数据实时可视化,帮助运维人员快速发现潜在问题。
  • 整体架构

    为实现这些目标,我们设计了以下的系统架构:

    • rsyslog:作为日志收集的核心组件,它负责将来自Linux系统的各种日志(包括/var/log/messages等)统一推送到中心化的日志服务端。
    • Filebeat:作为轻量级的日志收集器,部署在各个Linux节点上,负责将日志文件传输到Logstash。
    • Logstash:对收集到的日志进行清洗、解析和转换,确保数据符合预定格式,便于存入Elasticsearch。
    • Elasticsearch:存储经过清洗和转换的日志数据,提供强大的全文搜索和数据查询功能。
    • Kibana:通过Kibana仪表盘实时展示存储在Elasticsearch中的日志数据,帮助运维人员进行数据分析和可视化展示。

    rsyslog汇总

    rsyslog服务端

    首先配置rsyslog服务器,可以统一收集集群内部的日志。

    # 加载本地系统日志模块(例如通过 logger 命令发送的日志)
    $ModLoad imuxsock
    # 加载内核日志模块(之前由 rklogd 处理)
    $ModLoad imklog
    # 加载 UDP 模块,支持通过 UDP 协议接收日志
    $ModLoad imudp
    # 配置 UDP 服务器在 514 端口接收日志
    $UDPServerRun 514
    # 加载 TCP 模块,支持通过 TCP 协议接收日志
    $ModLoad imtcp
    # 配置 TCP 服务器在 514 端口接收日志
    $InputTCPServerRun 514
    # 定义一个自定义的日志格式模板 'myFormat'
    $template myFormat,"%timestamp:::date-rfc3339% %fromhost-ip% %HOSTNAME% [%programname%] %syslogseverity-text%:%msg%\\n"
    # 设置默认的文件格式为传统的 rsyslog 格式
    $ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
    # 加载 /etc/rsyslog.d/ 目录下的所有配置文件
    $IncludeConfig /etc/rsyslog.d/*.conf
    # 配置收集 info 级别的日志,排除 mail、authpriv、cron 类别的日志,输出到 /var/log/messages 文件,并使用 myFormat 格式
    *.info;mail.none;authpriv.none;cron.none /var/log/messages;myFormat
    # 收集所有 authpriv 类别的日志(通常是认证相关的日志),输出到 /var/log/secure 文件,并使用 myFormat 格式
    authpriv.* /var/log/secure;myFormat
    # 收集所有 mail 类别的日志,输出到 /var/log/maillog 文件,使用异步写入(- 表示异步)
    mail.* -/var/log/maillog
    # 收集所有 cron 类别的日志(定时任务日志),输出到 /var/log/cron 文件
    cron.* /var/log/cron
    # 收集所有紧急级别(emerg)的日志,将其通过系统消息发送
    *.emerg :omusrmsg:*
    # 收集 uucp 和 news 类别的严重级别(crit)日志,输出到 /var/log/spooler 文件
    uucp,news.crit /var/log/spooler
    # 收集所有 local7 类别的日志,输出到 /var/log/boot.log 文件
    local7.* /var/log/boot.log

    重启服务端

    systemctl restart rsyslog

    修改前: 在这里插入图片描述

    修改后: 在这里插入图片描述

    rsyslog客户端

    所有集群的客户端配置最后一行IP,就可以把数据汇总在一切

    [root@zabbix ~]# cat /etc/rsyslog.conf |tail -n 2
    #authpriv.* @10.10.10.17
    *.* @@192.168.102.20 # rsyslog 服务端的IP

    重启

    systemctl restart rsyslog.service

    日志已经打印到rsyslog 服务端的 /var/log/messages /var/log/secure 等文件

    在这里插入图片描述

    filebeaet收集

    在rsyslog 服务端的安装filebeaet,并且使用如下配置启动

    filebeat.config.modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false
    filebeat.inputs:
    – type: log
    enabled: true
    tail_files: true
    paths:
    – /var/log/messages
    output.logstash:
    hosts: ["192.168.1.100:5514"]
    # 把日志发送到logstatsh中

    logstatsh配置

    [root@game logstash]# cat config/rsyslog.conf
    input {
    beats {
    port => 5514
    type => syslog
    }
    }

    filter {
    grok {
    match => {
    "message" => "%{TIMESTAMP_ISO8601:time} %{IP:client_ip} %{HOSTNAME:host_name} \\[%{DATA:type}\\] %{GREEDYDATA:info}"
    }
    overwrite => ["message"]

    }
    mutate {
    split => ["type",","]
    }
    mutate{
    add_field => {
    "types" => "%{[type][1]}"
    } }
    mutate{remove_field => [ "tags","agent","host","log","ecs","type" ]}

    date {
    match => ["time", "yyyy-MM-dd HH:mm:ss,SSS", "UNIX"]
    target => "@timestamp"
    locale => "cn"
    }
    }

    output {
    stdout {
    codec=> rubydebug
    }
    elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "message"
    }
    }

    数据格式

    2024-12-03T18:35:30+08:00 192.168.102.30 master01 [kubelet] info: E1203 18:35:30.827608 1065 summary_sys_containers.go:83] "Failed to get system container stats" err="failed to get cgroup stats for \\"/system.slice/docker.service\\": failed to get container info for \\"/system.slice/docker.service\\": unknown container \\"/system.slice/docker.service\\"" containerName="/system.slice/docker.service"

    GROK解析

    %{TIMESTAMP_ISO8601:time} %{IP:client_ip} %{HOSTNAME:host_name} \\[%{DATA:type}\\] %{GREEDYDATA:info}

    输出格式

    {
    "client_ip": "192.168.102.30",
    "time": "2024-12-03T18:35:30+08:00",
    "type": "kubelet",
    "host_name": "master01",
    "info": "info: E1203 18:35:30.827608 1065 summary_sys_containers.go:83] \\"Failed to get system container stats\\" err=\\"failed to get cgroup stats for \\\\\\"/system.slice/docker.service\\\\\\": failed to get container info for \\\\\\"/system.slice/docker.service\\\\\\": unknown container \\\\\\"/system.slice/docker.service\\\\\\"\\" containerName=\\"/system.slice/docker.service\\"\\r"
    }

    在这里插入图片描述

    操作系统命令审计

    Linux一般都是终端执行命令,我们可以让命令写到message日志上,在通过过滤,获取操作命令记录。

    Linux终端配置

    [root@zabbix ~]# cat /etc/profile | tail -n 2
    unset MAILCHECK
    export PROMPT_COMMAND='{ msg=$(history 1 | { read x y; echo $y; });logger -p local2.info "euid=$(whoami)" $(who am i) `pwd` "$msg"; }'

    日志格式
    Oct 11 17:32:41 zabbix root: euid=root root pts/0 2021-10-11 15:13 (10.10.10.3) /root cat /etc/profile
    Oct 11 17:32:47 zabbix root: euid=root root pts/0 2021-10-11 15:13 (10.10.10.3) /root cat /etc/profile | tail -n 2

    rsyslog服务端展示 在这里插入图片描述

    filebeat配置

    [root@logserver01 filebeat]# cat system_messages.yml
    #=========================== Filebeat inputs =============================
    filebeat.inputs:
    – type: log
    enabled: true
    tail_files: true
    paths:
    – /var/log/messages

    logstatsh配置

    [root@logserver01 config]# cat system_userlog_FromKafkaInES.conf
    input{
    beats {
    host => '172.17.9.200'
    port => 5046
    }

    #kafka{
    #bootstrap_servers => ["172.17.8.232:6667"]
    #topics => ["sys_os_exe"]
    #codec => "json"
    #group_id => "ELK_SYSTEM_EXE_GROUP"
    #consumer_threads => 3
    #client_id => "logstash"
    #decorate_events => false
    #auto_offset_reset => "earliest"
    #request_timeout_ms => "300000"
    #session_timeout_ms => "20000"
    #max_poll_interval_ms => "600000"
    #}
    }
    filter{
    if ([message] =~ "euid"){
    grok{
    match => {"message" => '^(?<exetime>\\d+-\\d+-\\d+)(?:[^\\d]+)(?<hhmmss>\\d+:\\d+:\\d+)(?:[^\\d]+\\d+:\\d+)(?:\\s)(?<deshost>[^ ]+)(?:\\s)(?<name>[^ ]+)(?:\\s\\[)(?<loginuser>[^ |\\]]*)(?:\\]\\s[^ ]+\\seuid=)(?<exeuser>[^ ]+)(?:\\s+)(?<userinfo>[^\\(]+)(?:\\s\\()(?<srchost>[^\\)]+)(?:\\)\\s)(?<exepath>[^ ]+)(\\s+)(?<exeinfo>.*)'}
    }
    if "_grokparsefailure" in [tags] { drop { } }
    mutate{
    add_field => ["tmp_exeinfo","%{exeinfo}"]
    }
    mutate{
    split => ["exetime","-"]
    split => ["tmp_exeinfo"," "]
    }
    mutate{
    add_field => ["indextime","%{[exetime][0]}%{[exetime][1]}"]
    add_field => ["evtTime","%{[exetime][0]}-%{[exetime][1]}-%{[exetime][2]} %{hhmmss}"]
    add_field => ["cmd","%{[tmp_exeinfo][0]}"]

    }
    #Retention log insertion time to ES…………..
    ruby { code => "event.set('inserttime', event.get('@timestamp').time.to_i)" }
    #replace InsertTime with evtTime "yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29"
    date{
    match => ["evtTime","yyyy-MM-dd HH:mm:ss"]
    #kibana use this time………………..
    target => "@timestamp"
    }
    mutate{replace => ["evtTime","%{evtTime} +0800"]}
    date{
    match => ["evtTime","yyyy-MM-dd HH:mm:ss +0800"]
    timezone =>"UTC"
    #log event time timestamp…………….
    target => "logtimestamp"
    }
    #log event time long string………………….
    ruby { code => "event.set('longtime', event.get('logtimestamp').time.to_i)" }

    mutate{remove_field => [ "tmp_exeinfo","evtTime","host","ecs","log","hhmmss","input","agent","exetime" ]}
    }
    else{
    drop{}
    }
    }
    output{
    stdout{codec => rubydebug}
    if [indextime] !~ "index"{
    elasticsearch{
    hosts => ["http://172.17.9.176:9200"]
    #hosts => "172.17.9.176"
    index => "sys_os_userlog_%{[indextime]}"
    user => "*********"
    password => "*********"
    }
    }
    }

    数据格式

    2024-12-03T18:39:05+08:00 192.168.102.30 master01 [root] info: euid=root root pts/0 2024-12-03 18:21 (192.168.96.19) /root [2024-12-03 18:39:05]ip a

    GROK解析

    ^(?<exetime>\\d+-\\d+-\\d+)(?:[^\\d]+)(?<hhmmss>\\d+:\\d+:\\d+)(?:[^\\d]+\\d+:\\d+)(?:\\s)(?<deshost>[^ ]+)(?:\\s)(?<name>[^ ]+)(?:\\s\\[)(?<loginuser>[^ |\\]]*)(?:\\]\\s[^ ]+\\seuid=)(?<exeuser>[^ ]+)(?:\\s+)(?<userinfo>[^\\(]+)(?:\\s\\()(?<srchost>[^\\)]+)(?:\\)\\s)(?<exepath>[^ ]+)(\\s+)(?<exeinfo>.*)

    输出格式

    {
    "loginuser": "root",
    "hhmmss": "18:39:05",
    "exepath": "/root",
    "deshost": "192.168.102.30",
    "srchost": "192.168.96.19",
    "name": "master01",
    "exeinfo": "[2024-12-03 18:39:05]ip a\\r",
    "exeuser": "root",
    "userinfo": "root pts/0 2024-12-03 18:21",
    "exetime": "2024-12-03"
    }

    在这里插入图片描述

    系统用户登录审计

    filebeat配置

    [root@logserver01 filebeat]# cat system_secure.yml
    #=========================== Filebeat inputs =============================
    filebeat.inputs:
    – type: log
    enabled: true
    tail_files: true
    paths:
    – /var/log/secure
    #=========================== Filebeat outppp_id: messuts =============================
    output.logstash:
    hosts: ["172.17.9.200:5045"]

    logstatsh配置

    [root@logserver01 config]# cat system_login_FromKafkaInES.conf
    input{
    beats {
    host => '172.17.9.200'
    port => 5045
    }

    #
    #kafka{
    #bootstrap_servers => ["172.17.8.232:6667"]
    #topics => ["sys_os_login"]
    #codec => "json"
    #group_id => "ELK_SYSTEM_LOGIN_GROUP"
    #consumer_threads => 3
    #client_id => "logstash"
    #decorate_events => false
    #auto_offset_reset => "earliest"
    #request_timeout_ms => "300000"
    #session_timeout_ms => "20000"
    #max_poll_interval_ms => "600000"
    #}
    }
    filter{
    #login successed log
    if ([message] =~ "Accepted"){
    grok{
    match => {"message" => '^(?<atime>\\d+-\\d+-\\d+)(?:[^\\d]+)(?<hhmmss>\\d+:\\d+:\\d+)(?:[^\\d]+\\d+:\\d+)(?:\\s+)(?<deshost>\\d+\\.\\d+\\.\\d+\\.\\d+)(?:\\s)(?<name>[^ ]+)(?:[\\S\\s]*Failed\\spassword\\sfor[\\sinvalid\\suser]*\\s)(?<loginuser>[^ ]+)(?:\\sfrom\\s)(?<srchost>[\\d.]+)(?:\\s\\w+\\s\\d+\\s)(?<loginmode>\\w*)'}
    }
    if "_grokparsefailure" in [tags] { drop { } }
    mutate{
    add_field => ["type","systemlogin"]
    split => ["atime","-"]
    }
    mutate{
    add_field => ["indextime","%{[atime][0]}%{[atime][1]}"]
    add_field => ["evtTime","%{[atime][0]}-%{[atime][1]}-%{[atime][2]} %{hhmmss}"]
    }
    #Retention log insertion time to ES…………..
    ruby { code => "event.set('inserttime', event.get('@timestamp').time.to_i)" }

    #replace InsertTime with evtTime "yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29"
    date{
    match => ["evtTime","yyyy-MM-dd HH:mm:ss"]
    #kibana use this time………………..
    target => "@timestamp"
    }
    mutate{replace => ["evtTime","%{evtTime} +0800"]}
    date{
    match => ["evtTime","yyyy-MM-dd HH:mm:ss +0800"]
    timezone =>"UTC"
    #log event time timestamp…………….
    target => "logtimestamp"
    }
    #log event time long string………………….
    ruby { code => "event.set('longtime', event.get('logtimestamp').time.to_i)" }
    mutate{remove_field => [ "evtTime","host","ecs","log","hhmmss","input","agent","atime" ]}
    }
    #login failed log
    else if ([message] =~ "Failed password for"){
    grok{
    match => {"message" => '^(?<atime>\\d+-\\d+-\\d+)(?:[^\\d]+)(?<hhmmss>\\d+:\\d+:\\d+)(?:[^\\d]+\\d+:\\d+)(?:\\s+)(?<deshost>\\d+\\.\\d+\\.\\d+\\.\\d+)(?:[\\S\\s]*Failed\\spassword\\sfor[\\sinvalid\\suser]*\\s)(?<loginuser>[^ ]+)(?:\\sfrom\\s)(?<srchost>[\\d.]+)(?:\\s\\w+\\s\\d+\\s)(?<loginmode>\\w*)'}
    }
    if "_grokparsefailure" in [tags] { drop { } }
    mutate{
    add_field => ["type","systemloginfailed"]
    split => ["atime","-"]
    }
    mutate{
    add_field => ["indextime","%{[atime][0]}%{[atime][1]}"]
    add_field => ["evtTime","%{[atime][0]}-%{[atime][1]}-%{[atime][2]} %{hhmmss}"]
    }
    #Retention log insertion time to ES…………..
    ruby { code => "event.set('inserttime', event.get('@timestamp').time.to_i)" }

    #replace InsertTime with evtTime "yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29"
    date{
    match => ["evtTime","yyyy-MM-dd HH:mm:ss"]
    #kibana use this time………………..
    target => "@timestamp"
    }
    mutate{replace => ["evtTime","%{evtTime} +0800"]}
    date{
    match => ["evtTime","yyyy-MM-dd HH:mm:ss +0800"]
    timezone =>"UTC"
    #log event time timestamp…………….
    target => "logtimestamp"
    }
    #log event time long string………………….
    ruby { code => "event.set('longtime', event.get('logtimestamp').time.to_i)" }

    mutate{remove_field => [ "evtTime","host","ecs","log","hhmmss","input","agent","atime" ]}
    }
    #other log
    else{
    drop{}
    }
    }
    output{
    if [type] == "systemlogin"{
    if [indextime] !~ "index"{
    stdout{codec => rubydebug}
    elasticsearch{
    hosts => "172.17.9.176"
    index => "sys_os_systemlogin_%{[indextime]}"
    user => "elastic"
    password => "f5OPbv6sqfstmc+"
    }
    }

    }
    else if [type] == "systemloginfailed"{
    if [indextime] !~ "index"{
    stdout{codec => rubydebug}
    elasticsearch{
    hosts => "172.17.9.176"
    index => "sys_os_systemloginfailed_%{[indextime]}"
    user => "elastic"
    password => "xxxxxxx+"
    }
    }

    }
    }

    成功日志解析

    日志格式

    2024-12-03T18:45:09+08:00 192.168.102.42 node03 [sshd] info: Accepted password for root from 192.168.96.19 port 14347 ssh2

    GROK语法

    %{TIMESTAMP_ISO8601:atime} %{IP:deshost} %{HOSTNAME:name} \\[%{WORD:type}\\] %{DATA:loglevel}: Accepted password for %{WORD:loginuser} from %{IP:srchost} port %{NUMBER:port} %{WORD:loginmode}

    日志格式

    {
    "loginuser": "root",
    "atime": "2024-12-03T18:45:09+08:00",
    "type": "sshd",
    "deshost": "192.168.102.42",
    "srchost": "192.168.96.19",
    "port": "14347",
    "loglevel": "info",
    "name": "node03",
    "loginmode": "ssh2"
    }

    在这里插入图片描述

    失败日志格式

    日志格式

    2024-12-03T19:01:57+08:00 192.168.102.42 node03 [sshd] info: Failed password for invalid user 123 from 192.168.96.19 port 12103 ssh2

    GROK语法

    ^(?<atime>\\d+-\\d+-\\d+)(?:[^\\d]+)(?<hhmmss>\\d+:\\d+:\\d+)(?:[^\\d]+\\d+:\\d+)(?:\\s+)(?<deshost>\\d+\\.\\d+\\.\\d+\\.\\d+)(?:[\\S\\s]*Failed\\spassword\\sfor[\\sinvalid\\suser]*\\s)(?<loginuser>[^ ]+)(?:\\sfrom\\s)(?<srchost>[\\d.]+)(?:\\s\\w+\\s\\d+\\s)(?<loginmode>\\w*)

    日志输出

    {
    "loginuser": "123",
    "atime": "2024-12-03",
    "hhmmss": "19:01:57",
    "deshost": "192.168.102.42",
    "srchost": "192.168.96.19",
    "loginmode": "ssh2"
    }

    在这里插入图片描述

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 集中管理与实时审计:构建Linux集群(1300台服务器)日志平台的最佳实践
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!