安装elk    

    安装Java 1.8环境
        解压源码安装包:

        tar xf jdk-8u121-linux-x64.tar.gz         ll        mkdir /work/opt -p        mv  jdk1.8.0_121  /work/opt/jdk        ll /work/opt/jdk/        chown -R root.root  /work/opt                 vim /etc/profile :    //添加        export JAVA_HOME=/work/opt/jdk        export JAVA_BIN=$JAVA_HOME/bin        export JRE_HOME=${JAVA_HOME}/jre        export CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar        export PATH=$JAVA_BIN:$PATH

        

    安装 elasticsearch-5.3.0

    tar xf elasticsearch-5.3.0.tar.gz     mv elasticsearch-5.3.0 /work/opt/elasticsearch            ubuntu@ip-172-31-1-79:/work/opt/elasticsearch/config$ egrep -v '#|^$' elasticsearch.yml    cluster.name: lvnian-elk    node.name: lvnian-elk-node1    path.data: /data  #由于是普通用户启动elasticsearch,所以这个目录的属主需要改为普通用户    path.logs: /work/opt/elasticsearch/logs     #由于是普通用户启动elasticsearch,所以这个目录的属主需要改为普通用户    bootstrap.memory_lock: false    network.host: 0.0.0.0    http.port: 9200    http.cors.enabled: true     http.cors.allow-origin: "*"     ####缓存     index.cache.field.expire: 10m        index.cache.field.max_size: 500000        index.cache.field.type: soft    ubuntu@ip-172-31-1-79:/work/opt/elasticsearch/config$     nohup /work/opt/elasticsearch/bin/elasticsearch >> /tmp/elasticsearch.log  &

   ##ES 5.1.1 安装 head:

    (5.1.1版本的 elasticsearch 没有提供直接插件安装方法,但在该github上该插件作者给出了方法)

        下载二进制源码包:        wget https://nodejs.org/dist/v4.2.2/node-v6.2.0-linux-x64.tar.gz        解压:        tar xf node-v6.2.0-linux-x64.tar.gz -C /work/opt/        设置环境变量:        vim /etc/profile:             export NODE_HOME=/work/opt/node/            export PATH=$PATH:$NODE_HOME/bin                 root@ip-172-31-1-79:/work/source# node --version        v6.10.1            npm config set registry https://registry.npm.taobao.org    //设置代理镜像源,加速下载        cd /home/stt/node-v4.2.2-linux-x64/lib/node_modules        npm install grunt    //显示的2条warn可以忽略

        测试grunt是否生效:

        $ grunt -version        grunt-cli v1.2.0        grunt v1.0.1

        安装head插件:

          下载:git clone git://github.com/mobz/elasticsearch-head.git        cd /home/stt/elasticsearch-head        npm install (提示:如果遇到网络的瓶颈,将预先下载的源码包放在对应的位置效果一样,目录为/tmp/phantomjs/,确定位置后可自行创建并上传包)        修改 elasticsearch-head/ _site/app.js        // 把localhost改为ip        找到:        this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://localhost:9200";        改为:        this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://192.168.8.116:9200";        修改 elasticsearch-head/Gruntfile.js        connect: {            server: {                options: {                    hostname: "0.0.0.0",     //增加这个配置                    port: 9100,                    base: '.',                    keepalive: true                }            }        }

        启动服务:(后台运行)

        grunt server &    //需要在 /home/stt/elasticsearch-head 下执行,因为我的 grunt 没有进行全局的安装
        
    ##安装Logstash(这个软件在你需要读取的日志服务上安装,用logstash读取你的日志,上传给elasticsearch的):
        也需要安装java1.8环境

        tar xf logstash-5.3.0.tar.gz         mkdir /work/opt        mv logstash-5.3.0 /work/opt/        cd /work/opt/

    vim /work/opt/logstash-5.3.0/conf/central.conf    #(处理基于 FILE 方式输入的日志信息,这里是简单的举个例子,日后继续学习补充)

        input {            file {                path => "/tmp/*.log"            }        }        output {            elasticsearch {                hosts => "192.168.8.116:9200"                index => "nginx-access"            }            stdout {                codec => rubydebug            }        }

    

    
    
    ##安装Kibana:
    解压源码包:

    tar zxf kibana-5.1.1-linux-x86_64.tar.gz -C /home/stt/server/    vim config/kibana.yml    //修改        server.port: 5601    //打开注释而已,不用可以去效果,请使用默认端口        server.host: "0.0.0.0"    //打开监听地址,让别的机器也能访问这个 kibana        elasticsearch.url: "http://127.0.0.1:9200"      //这个url要根据实质情况,添加访问 elasticsearch 的url    启动服务: (后台运行)    /home/stt/server/kibana-5.1.1-linux-x86_64/bin/kibana &

        

    安装nginx 反向代理,
    apt-gei install nginx
    nginx放心代理配置文件如下:

 ##文件名kibana.conf    upstream backend {                 server 172.31.6.155:5601;             }      server {            listen       80;            server_name  kibana.lvnian.co;        access_log /tmp/kibana-access.log;        error_log /tmp/kibana-error.log;            location / {                #设置主机头和客户端真实地址,以便服务器获取客户端真实IP                 proxy_set_header Host $host;                 proxy_set_header X-Real-IP $remote_addr;                 proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;                 #禁用缓存                 proxy_buffering off;                 #反向代理的地址                 proxy_pass http://backend;                 }        }

        
        
        
    logstash 读取nginx的访问日志以及error日志,上传到logstash的配置文件文件。

   用下面命令运行即可

nohup /work/opt/logstash-5.3.0/bin/logstash -f /work/opt/logstash-5.3.0/conf/elk-nginx-log.conf &

   文件名: elk-nginx-log.conf    input {    file {        path => "/data/logs/nginx/*.log"        start_position => beginning    }}filter {    if [path] =~ "access"  {        mutate { replace => { type => "nginx_access" } }        ruby {            init => "@kname = ['http_x_forwarded_for','http_host','time_local','request','status','body_bytes_sent','request_body','content_length','http_referer','http_user_agent','http_cookie','remote_addr','remote_port','hostname','upstream_addr','upstream_response_time','request_time']"            code => "                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split(' || '))])                new_event.remove('@timestamp')                event.append(new_event)            "        }        if [request] {            ruby {                init => "@kname = ['method','uri','verb']"                code => "                    new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])                    new_event.remove('@timestamp')                    event.append(new_event)                "            }            if [uri] {                ruby {                    init => "@kname = ['url_path','url_args']"                    code => "                        new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])                        new_event.remove('@timestamp')                        event.append(new_event)                    "                }            }        }        mutate {            convert => [                "body_bytes_sent" , "integer",                "content_length", "integer",                "upstream_response_time", "float",                "request_time", "float",                "http_x_forwarded_for", "string",                "http_host", "string"            ]            remove_field => [ "message","uri","request","path","verb" ]        }                if [http_x_forwarded_for] == '-' or '.' not  in [http_x_forwarded_for]  {                      mutate { replace => { http_x_forwarded_for => "%{remote_addr}" } } ##  http_x_forwarded_for 字段为空,则把remote_addr字段的内容赋值给它                }        if [remote_addr] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {                  geoip {                    source => "remote_addr"    #设置解析IP地址的字段                    target => "geoip"    #将geoip数据保存到一个字段内                    database => "/work/opt/logstash/conf/GeoLite2-City.mmdb"    #IP地址数据库                             add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]                            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]                    }                    mutate {                            convert => [ "[geoip][coordinates]", "float"]                    }            }        date {            match => [ "time_local", "dd/MMM/yyyy:hh:mm:ss Z" ]            locale => "zh"        }    }    else if [path] =~ "error" {        mutate { replace => { type => "nginx_error" } }        grok {            match => { "message" => "(?
\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) \[(?
\w+)\] \S+: \*\d+ (?
[^,]+), (?
.*)$" }        }        mutate {            rename => [ "host", "fromhost" ]            gsub => [ "errmsg", "too large body: \d+ bytes", "too large body" ]        }        if [errinfo]        {            ruby {                code => "                    new_event = LogStash::Event.new(Hash[event.get('errinfo').split(', ').map{|l| l.split(': ')}])                    new_event.remove('@timestamp')                    event.append(new_event)                "            }        }        grok {    #       match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}(?:\?%{NGX_URIPARAM:urlparam})?(?: HTTP/%{NUMBER:httpversion})"' }            match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}?(?: HTTP/%{NUMBER:httpversion})"' }            patterns_dir => ["/etc/logstash/patterns"]    #      remove_field => [ "message", "errinfo", "request" ]        }    }    else {            mutate { replace => { type => "random_logs" } }    }#################  时区问题解决     ruby {              code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"        }           ruby {             code => "event.set('@timestamp',event.get('timestamp'))"       }       mutate {             remove_field => ["timestamp"]       }###############}output {    elasticsearch {        hosts => "10.19.104.161:9200"        #index => "logstash-nginx"        index => "logstash-%{type}-%{+YYYY.MM.dd}"        document_type => "%{type}"        flush_size => 20000        idle_flush_time => 10        sniffing => true        template_overwrite => true    }        stdout {            codec => rubydebug        }}

使用上面的logstash配置文件,需要注意更改nginx日志格式,nginx日志格式应该改为如下:

       log_format elk  "$http_x_forwarded_for | $time_local | $request | $status | $body_bytes_sent | "                "$request_body | $content_length | $http_referer | $http_user_agent  | "                "$http_cookie | $remote_addr | $hostname | $upstream_addr | $upstream_response_time | $request_time|$gzip_ratio";    access_log /data/log/nginx/access.log elk;    error_log  /data/log/nginx/error.log;

监控5分钟内xxx 状态码 出现 阀值n 次进行的微信报警

#!/usr/bin/env python# -*- coding:utf-8 -*- #Author: gaogdimport weixin_alter as  weixinfrom elasticsearch import Elasticsearchimport json,time ,datetime,osclass Checkcode(object):    def __init__(self,statuscode,num=30):        self.host='172.31.1.79'        self.port=9200        self.statuscode=statuscode        self.es = Elasticsearch([{'host': self.host, 'port': self.port}])        #now_day = time.strftime('%Y.%m.%d', time.localtime(time.time()))        now_day = (datetime.datetime.now() + datetime.timedelta(hours=-8)).strftime("%Y.%m.%d")        ## 8 个小时差是为了解决logstash 导入日志到es中,索引回更加0时区来创建索引的,导致索引日期相差8个小时        self.index = 'logstash-nginx_access-%s' % now_day        self.file='./alter%s.log'%self.statuscode        self.num=int(num)    def Getdata(self):        body = {            "query": {                "bool": {                    "filter": [                        {"term":                             {"status": "%s"%self.statuscode                              }                         },                        {                            "range": {                                "@timestamp": {                                    "gt": "now-5m",                                    "lt": "now"                                },                            },                        },                    ]                }            },            "aggs": {                "api_status_total": {"terms": {"field": "url_path.keyword"}}            },            "size": 0        }        res = self.es.search(index=self.index, body=body)        num = res['hits']['total']        api_list=res['aggregations']['api_status_total']['buckets']        dict_api = {}        for l in api_list:            if type(l) is dict:                dict_api[l['key']]=l['doc_count']        dictlist = sorted(dict_api.iteritems(), key=lambda d: d[1], reverse=True)        content=''        for n,v in enumerate(dictlist):            if n > 9:                break            content = content + v[0].encode('utf-8') + " : " + str(v[1]) + "\n"        print 'code:', code, 'time', num        print content        return num,content    def JudgeAlter(self,num,content):        if not os.path.isfile(self.file):            with open(self.file, 'w') as f:                f.write('0')        if int(num) > self.num:            with open(self.file, 'r') as f:                time = f.read()            if len(time) == 0 or int(time) == 0 or int(time) >= 15:                content = u"""%s告警!!!最近5分钟内前端日志出现%s错误\n 5 分钟内有: %s  次%s"""% (self.statuscode,self.statuscode,num,content)                print content                weixin.WeixinSend(str(content))                with open(self.file, 'w+') as f:                    f.write('1')            elif 0 < int(time) <= 15:                time = int(int(time) + 1)                with open(self.file, 'w+') as f:                    f.write(str(time))                exit()        else:            with open(self.file, 'r') as f:                num1 = f.read()            if int(num1) > 0:                content = u"""%s 告警恢复!!!最近5分钟内日志出现%s 错误少于%s次现在错误次数为: %s  次%s"""% (self.statuscode,self.statuscode,self.num,num,content)                print content                weixin.WeixinSend(str(content))                with open(self.file, 'w+') as f:                    f.write('0')   if __name__ == "__main__":    print datetime.datetime.now()    code='500'    num=30    try:        import sys        code=sys.argv[1]        num=sys.argv[2]    except  Exception as err:        print str(err)        print "Usage: python %s code  num"%sys.argv[0]        exit()    obj=Checkcode(code,num)    num,dict_api=obj.Getdata()    obj.JudgeAlter(num,dict_api)      ### python /CheckStatusCode.py 500 20

logstash分析日志,导入到es中的时区问题解决

input { stdin {} }  output { stdout { codec => rubydebug } }  filter {    date {      match => ["message","UNIX_MS"]      target => "@timestamp"       }   ruby {      code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"    }   ruby {     code => "event.set('@timestamp',event.get('timestamp'))"   }   mutate {     remove_field => ["timestamp"]   }  }上面这种方法,实际上是把时间多加8小时,时区并没有发生变化,这样做有一个坏处,就是你的kibana默认是CST时区了,这个时候展示出来的数据就会在8个小时之后的地方了。有问题,不建议使用。市区问题其实不用修改,kibana能够很好识别。

## 参考 elk 框架

flume| ===èkafka==èlogstash收集==èes 存储==è kibana展示