一、日志收集及告警項目背景
近來安全測試項目較少,想著把安全設(shè)備、nginx日志收集起來并告警, 話不多說,直接說重點(diǎn),搭建背景:
1. 日志源:安全設(shè)備日志(Imperva WAF、綠盟WAF、paloalto防火墻)、nginx日志等;
2. 日志分析開源軟件:ELK,告警插件:Sentinl 或elastalert,告警方式:釘釘和郵件;
3. 安全設(shè)備日志->logstash->es,nginx日志由于其他部門已有一份(flume->kafka)我們通過kafka->logstash->es再輸出一份,其中l(wèi)ogstash的正則過濾規(guī)則需要配置正確,不然比較消耗性能,建議寫之前使用grokdebug先測試好再放入配置文件;
4. 搭建系統(tǒng):centos 7 , JDK 1.8, Python 2.7
5. ELK統(tǒng)一版本為5.5.2
由于es和kibana的安裝都比較簡單,就不在下文中說明安裝及配置方法了。相關(guān)軟件的下載鏈接如下:
Es:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gz
Kibana:https://artifacts.elastic.co/downloads/kibana/kibana-5.5.2-linux-x86_64.tar.gz
Logstash:https://artifacts.elastic.co/downloads/logstash/logstash-5.5.2.tar.gz
測試鏈接Grokdebug:http://grokdebug.herokuapp.com
二、安全設(shè)備日志收集
2.1 Imperva WAF配置
策略->操作集->新增日志告警規(guī)則


我這邊沒有用設(shè)備自身的一些日志規(guī)則,而是根據(jù)手冊自定義了一些需要的日志字段,可在自定義策略的消息填入如下字段:
StartTime=$!{Alert.createTime}AlarmID=$!{Alert.dn} EventID=$!{Event.dn} AggregationInfo=$!{Alert.aggregationInfo.occurrences}Alert_level=$!{Alert.severity} RuleName=$!{Alert.alertMetadata.alertName} Category=$!{Alert.alertType} Alert_description=$!{Alert.description}EventType=$!{Event.eventType} PolicyName=$!{Rule.parent.displayName}SrcIP=$!{Event.sourceInfo.sourceIp} SrcPort=$!{Event.sourceInfo.sourcePort}Proto=$!{Event.sourceInfo.ipProtocol} DstIP=$!{Event.destInfo.serverIp}DstPort=$!{Event.destInfo.serverPort} WebMethod=$!{Event.struct.httpRequest.url.method}Domain=$!{Alert.serverGroupName} URL=$!{Event.struct.httpRequest.url.path}ResponseCode=$!{Event.struct.httpResponse.responseCode}Alert_key=$!{Event.struct.httpRequest.url.queryString}Action=$!{Alert.immediateAction} ResponseTime=$!{Event.struct.responseTime}ResponseSize=$!{Event.struct.responseSize} Headers_value=$!{Event.struct.httpRequest.headers.value}Parameters_value=$!{Event.struct.httpRequest.parameters.value}
參數(shù)說明:告警開始時間、告警ID、事件ID、事情數(shù)量、告警級別…. HTTP返回碼、觸發(fā)告警字符串、響應(yīng)動作、響應(yīng)時間、響應(yīng)大小、http包頭的值,中間省略的部分請自行查看手冊。其實Imperva WAF的總?cè)罩咀侄螖?shù)不少于一兩百個,單從這一點(diǎn)可以看出確實好于國產(chǎn)WAF太多。
針對Imperva WAF的logstash配置如下:
input{ syslog{ type => "syslog" port => 514 } } filter { grok { match =>["message","%{GREEDYDATA:StartTime} AlarmID=%{NUMBER:AlarmID} EventID=%{NUMBER:EventID}AggregationInfo=%{NUMBER:AggregationInfo} Alert_level=%{DATA:Alert_level}RuleName=%{GREEDYDATA:RuleName} Category=%{GREEDYDATA:Category}Alert_description=%{GREEDYDATA:Alert_description} EventType=%{DATA:EventType}PolicyName=%{GREEDYDATA:PolicyName} SrcIP=%{IPV4:SrcIP} SrcPort=%{NUMBER:SrcPort}Proto=%{DATA:Proto} DstIP=%{IPV4:DstIP} DstPort=%{NUMBER:DstPort}WebMethod=%{GREEDYDATA:WebMethod} Domain=%{DATA:Domain}URL=%{GREEDYDATA:URL} ResponseCode=%{GREEDYDATA:ResponseCode}Alert_key=%{GREEDYDATA:Alert_key} Action=%{DATA:Action}ResponseTime=%{GREEDYDATA:ResponseTime} ResponseSize=%{GREEDYDATA:ResponseSize}Headers_value=%{GREEDYDATA:Headers_value}Parameters_value=%{GREEDYDATA:Parameters_value}"] # 實際復(fù)制粘貼可能會有點(diǎn)格式問題,注意參數(shù)之間空一個空格即可 remove_field => ["message"] } geoip { source => "SrcIP" #IP歸屬地解析插件 } } output{ elasticsearch{ hosts => "es單機(jī)或集群地址:9200" index => "impervasyslog" } }
說明:其中g(shù)eoip為elk自帶插件,可以解析ip歸屬地,比如ip歸屬的國家及城市,在儀表盤配置“地圖炮”裝X、查看攻擊源地理位置的時候有點(diǎn)用,
2.2 綠盟WAF配置
日志報表->日志管理配置->Syslog配置&日志發(fā)生參數(shù)



針對綠盟WAF的logstash配置如下:
input和output參照imperva waf,貼出最要的grok部分,如下:
grok { match => ["message",%{DATA:syslog_flag} site_id:%{NUMBER:site_id} protect_id:%{NUMBER:protect_id} dst_ip:%{IPV4:dst_ip} dst_port:%{NUMBER:dst_port} src_ip:%{IPV4:src_ip}src_port:%{NUMBER:src_port} method:%{DATA:method} domain:%{DATA:domain} uri:%{DATA:uri} alertlevel:%{DATA:alert_level} event_type:%{DATA:Attack_types} stat_time:%{GREEDYDATA:stat_time} policy_id:%{NUMBER:policy_id}rule_id:%{NUMBER:rule_id} action:%{DATA:action} block:%{DATA:block} block_info:%{DATA:block_info} http:%{GREEDYDATA:URL}
2.3 paloalto防火墻配置
(6.1版本,其他版本可能會有點(diǎn)差異)
新建syslog服務(wù)器->日志轉(zhuǎn)發(fā),具體看截圖





針對PA的logstash配置如下:
input和output參照imperva waf,貼出最要的grok部分,如下:
grok { match =>["message","%{DATA:PA_Name},%{GREEDYDATA:Time},%{NUMBER:Eventid},%{DATA:Category},%{DATA:Subcategory},%{NUMBER:NULL},%{GREEDYDATA:Generate_Time},%{IPV4:SourceIP},%{IPV4:DestinationIP},%{IPV4:NAT_SourceIP},%{IPV4:NAT_DestinationIP},%{DATA:RuleName},%{DATA:SourceUser},%{DATA:DestinationUser},%{DATA:Application},%{DATA:VMsys},%{DATA:SourceZone},%{DATA:DestinationZone},%{DATA:IN_interface},%{DATA:OUT_interface},%{DATA:Syslog},%{DATA:GREEDYDATA:TimeTwo},%{NUMBER:SessionID},%{NUMBER:Repeat},%{NUMBER:SourcePort},%{NUMBER:DestinationPort},%{NUMBER:NAT-SourcePort},%{NUMBER:NAT-DestinationPort},%{DATA:Flag},%{DATA:Proto},%{DATA:Action},%{DATA:NUll2},%{DATA:ThreatName},%{DATA:Category2},%{DATA:Priority},%{DATA:Direction},%{NUMBER:Serialnum},%{GREEDYDATA:NULL3}"]
貼兩張最終的效果圖:


三、Nginx日志收集
由于nginx日志已經(jīng)被其他大數(shù)據(jù)部門收集過一遍了,為避免重復(fù)讀取,我們從其他部門的kafka拉取過來即可,這里說一下nginx收集的方式,flume->kafka 示例配置方式如下:
Flume配置如下
配置掃描日志文件
log_analysis_test.conf配置文件
a1.sources=s1 #可以理解為輸入端,定義名稱為s1
a1.channels=c1 #傳輸頻道,類似隊列,定義為c1,設(shè)置為內(nèi)存模式
a1.sinks=k1 #可以理解為輸出端,定義為sk1
#source配置
a1.sources.s1.type=exec
a1.sources.s1.command=tail -F/data/log/nginx/crf_crm.access.log
a1.sources.s1.channels=c1
#channel配置
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink 設(shè)置Kafka接收器
a1.sinks.k1.channel=c1
a1.sinks.k1.topic=crm_nginx_log_topic #設(shè)置Kafka的Topic
a1.sinks.k1.brokerList=x.x.x.x:9092 #設(shè)置Kafka的broker地址和端口號
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20
kafka配置
kafka下載
wgethttp://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz
配置zookeeper,根據(jù)機(jī)器狀況更改jvm 內(nèi)存設(shè)置
vimbin/zookeeper-server-start.sh
配置kafka
vimbin/kafka-server-start.sh 根據(jù)機(jī)器狀況更改jvm 內(nèi)存設(shè)置
啟動zookeeper
nohupbin/zookeeper-server-start.sh config/zookeeper.properties &
啟動kafka
nohup bin/kafka-server-start.shconfig/server.properties &
創(chuàng)建應(yīng)用服務(wù)器的topic
kafka-topics.sh --create –zookeeper x.x.x.x:2181--replication-factor 2 --partitions 5 --topic crm_nginx_log_topic
分區(qū)及副本以自己的情況而定
查看Topic是否創(chuàng)建成功
./bin/kafka-topics.sh --list --zookeeperx.x.x.x:2181
啟動kafka生產(chǎn)者
bin/kafka-console-producer.sh --broker-listx.x.x.x:9092 --topic pay
啟動kafka消費(fèi)者
kafka-console-consumer.sh --zookeeperx.x.x.x:2181--topic crm_nginx_log_topic
logstash 配置文件
input{ kafka { codec => "plain" group_id => "logstash1" auto_offset_reset => "smallest" reset_beginning => true topic_id => "crm_nginx_log_topic" zk_connect => "x.x.x.x:2181" # zookeeper的地址 } grok {省略} } output{ elasticsearch{ hosts => "es單機(jī)或集群地址:9200" index => "impervasyslog" }
四、安全告警
安全告警可以是Sentinl或 elastalert,Sentinl是kibana插件,可以集成到kibana內(nèi)圖形化展示,但是寫規(guī)則時需要對JS較熟悉,elastalert 是es的插件,不支持集成到kibana界面進(jìn)行圖形化展示。下面分別對這2個插件進(jìn)行安裝及配置說明。
4.1 Sentinl
安裝如下:
kibana-plugin install https://github.comsirensolutionssentinlreleasesdownloadtag-
5.4sentinl-v5.4.1.zip
安裝后,以IP請求頻次告警設(shè)置為例:
每2分鐘去es查詢一次

針對需要監(jiān)控的IP字段
Input的body內(nèi)容即es查詢語法

編寫過濾條件
{ "script": { "script":"payload.json='超過閥值40';var match=false;var threshold=40;varfirst=payload.aggregations.code.buckets;for(var i=0;i

如果想對狀態(tài)碼做監(jiān)控,參考如下:


Action里面添加釘釘群機(jī)器人的webhook

釘釘報警如下

釘釘?shù)?a target="_blank">接口文檔鏈接:
https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.karFPe&treeId=257&articleId=105735&docType=1
郵件告警設(shè)置如下:

4.2 elastalert
前置條件:
JDK 1.8
python 2.7
easy_install -U setuptools (最新setuptools-39.2.0)
yum install gcc
yum install python-devel
yum install libffi-devel
安裝
git clonehttps://github.com/Yelp/elastalert.git cd elastalert python setup.py install pip install -r requirements.txt cp config.yaml.example config.yaml
下載https://github.com/xuyaoqiang/elastalert-dingtalk-plugin
elastalert-dingtalk-plugin中的elastalert_modules復(fù)制到elastalert下
創(chuàng)建索引
elastalert-create-index
輸入 es的IP 及 es的端口,其余根據(jù)自己的環(huán)境寫,一般默認(rèn)即可
配置config.yaml,以下為關(guān)鍵配置信息:
rules_folder: example_rules #指定rule的目錄
run_every:
minutes: 1 #每一分鐘去探測一次
buffer_time:
minutes: 15 #緩存15分鐘
es_host: x.x.x.x
es_port: 9200
writeback_index: elastalert_status #創(chuàng)建的告警索引
alert_time_limit:
days: 2 #失敗重試的時間限制
下面先以釘釘告警為例:
在example_rules里面新建釘釘告警配置文件,內(nèi)容如下
es_host: x.x.x.x
es_port: 9200
name: xxx安全告警
type: cardinality
index: nsfocuswaf_syslog
cardinality_field: src_ip
max_cardinality: 30
timeframe:
minutes: 5 #單IP 5分鐘內(nèi)訪問超過30次就會告警
aggregation_key: src_ip
summary_table_fields:
src_ip
dst_ip
dst_port
Attack_types
stat_time
alert:
-“elastalert_modules.dingtalk_alert.DingTalkAlerter”
dingtalk_webhook: “your webhook”
dingtalk_msgtype: text
群機(jī)器人的配置比較簡單,自己搜索一下即可
注意如果告警匹配了N條,卻只發(fā)出1條告警,修改elastalert.py代碼,在856行后面增加如下代碼:

修改dingtalk_alert.py的代碼,增加如下內(nèi)容:
src_ip = matches[0]['src_ip'] #src_ip為grok過濾后自定義的字段,以下相同 dst_ip = matches[0]['dst_ip'] dst_port = matches[0]['dst_port'] attack_types = matches[0]['Attack_types'] start_time = matches[0]['stat_time'] 修改payload初的代碼,如下: payload = { "msgtype": self.dingtalk_msgtype, "text": { "content": r"發(fā)現(xiàn)源IP地址:{}在5分鐘內(nèi),針對服務(wù)器IP:{}的{}端口發(fā)動了3次攻擊,攻擊類型是:{},攻擊時間:{},請及時處理,謝謝!".format(src_ip,dst_ip,dst_port,attack_types,start_time) },
如果沒有過濾除自定義的字段,只有message字段的話,可以新增如下代碼:
message = matches[0]['message'] src_ip_pattern =r"src_ip:(\d+\.\d+\.\d+\.\d+)" dst_ip_pattern =r"dst_ip:(\d+\.\d+\.\d+\.\d+)" dst_port_pattern =r"dst_port:(\d+)" time_pattern =r"stat_time:(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})" type_pattern =r"event_type:(\w+)" src_ip= re.findall(src_ip_pattern, message)[0] dst_ip= re.findall(dst_ip_pattern, message)[0] dst_port= re.findall(dst_port_pattern, message)[0] start_time= re.findall(time_pattern, message)[0] attack_types= re.findall(type_pattern, message)[0]
最終效果圖如下:

郵件的rule配置文件如下:
es_host: x.x.x.x es_port: 9200 name: 信息安全告警 type: cardinality index: nsfocuswaf_syslog cardinality_field: src_ip max_cardinality: 5 timeframe: minutes: 5 alert: - "email" smtp_host: mail.crfchina.com smtp_port: 25 smtp_auth_file:/opt/elastalert/smtp_auth_file.yaml email_reply_to: xxxxx@qq.com #回復(fù)給誰 from_addr: xxxxxx@qq.com #發(fā)件人 email: - xxxxx@qq.com #收件人 alert_subject: "信息安全告警" alert_text_type: alert_text_only alert_text: | XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 您好, 網(wǎng)站:{}正被人惡意攻擊,請及時處理?。? 當(dāng)前匹配XX規(guī)則數(shù):{} 發(fā)生時間:{} 攻擊者的IP:{} 攻擊類型:{} 請求的URL:{} alert_text_args: - domain -num_hits -stat_time -src_ip -Attack_types -URL
郵件告警最終效果如下:

五、總結(jié)
相關(guān)進(jìn)程運(yùn)行命令如下:
后臺啟動es
nohup su - elasticsearch -c'/opt/elasticsearch-5.5.2/bin/elasticsearch -d' &
后臺啟動logstash
nohup ./bin/logstash -f x.x.x.conf &
后臺啟動kibana
nohup ./kibana &
啟動flume
flume-ng agent --conf conf --conf-fileconf/log_analysis_test.conf --name a1 – Dflume.root.logger=INFO,console
啟動kafka
nohup bin/kafka-server-start.shconfig/server.properties &
后臺啟動elastalert
nohup python -m elastalert.elastalert--config ./config.yaml --verbose --rule ./example_rules/DD_rule.yaml &
常見的告警策略除了來自安全設(shè)備的正則之外,大量的IP請求、錯誤狀態(tài)碼、nginx的request請求中包含的特征碼也都是常見的告警規(guī)則。
-
自動化
+關(guān)注
關(guān)注
31文章
5990瀏覽量
90707 -
日志
+關(guān)注
關(guān)注
0文章
149瀏覽量
11098 -
運(yùn)維
+關(guān)注
關(guān)注
1文章
287瀏覽量
8731
原文標(biāo)題:自動化運(yùn)維之日志統(tǒng)一管理
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
誠聘高級運(yùn)維自動化工程師
銳捷助互聯(lián)網(wǎng)數(shù)據(jù)中心網(wǎng)絡(luò)自動化、可視化運(yùn)維
數(shù)據(jù)自動化運(yùn)維應(yīng)該注意哪些事項
配電自動化實用化運(yùn)維指標(biāo)研究
厲害了!山東電力運(yùn)維自動化平臺正式投運(yùn)
Ansible企業(yè)級自動化運(yùn)維探索的詳細(xì)資料說明
城域網(wǎng)自動化運(yùn)維實現(xiàn)的關(guān)鍵點(diǎn)、難點(diǎn)和解決方案研究
城域網(wǎng)是什么,其生命周期和自動化運(yùn)維應(yīng)用有哪些特點(diǎn)
使用Python腳本實現(xiàn)自動化運(yùn)維任務(wù)
網(wǎng)絡(luò)設(shè)備自動化運(yùn)維工具—ansible入門筆記介紹
Python腳本實現(xiàn)運(yùn)維工作自動化案例
容器化NPB + Ansible:自動化運(yùn)維方案
如何實現(xiàn)自動化運(yùn)維?自動化運(yùn)維之日志統(tǒng)一管理
評論