我们是ELK和Exchange的朋友。第2部分





我继续讲述如何结交Exchange和ELK朋友的故事(从此处开始)。让我提醒您,这种组合能够毫不犹豫地处理大量日志。这次,我们将讨论如何使Exchange与Logstash和Kibana组件一起使用。



ELK堆栈中的Logstash用于智能处理日志,并准备将其以文档形式放置在Elastic中,在此基础上,可以方便地在Kibana中构建各种可视化文件。



安装



包括两个阶段:



  • 安装和配置OpenJDK软件包。
  • 安装和配置Logstash软件包。


安装和配置OpenJDK



软件包必须下载OpenJDK软件包并将其解压缩到特定目录中。然后,必须将此目录的路径输入Windows操作系统的$ env:Path和$ env:JAVA_HOME变量中:











检查Java版本:



PS C:\> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)


安装和配置Logstash软件包



从此处 下载带有Logstash发行版的存档文件必须将归档文件解压缩到磁盘的根目录。C:\Program Files您不应将其解压缩到文件夹中,Logstash将拒绝正常启动。然后,您需要对jvm.options负责为Java进程分配RAM的文件进行更改。我建议指定服务器RAM的一半。如果他板上有16 GB的RAM,则默认密钥为:



-Xms1g
-Xmx1g


必须替换为:



-Xms8g
-Xmx8g


另外,建议将这一行注释掉 -XX:+UseConcMarkSweepGC在此处了解更多信息下一步是在logstash.conf文件中创建默认配置:



input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}


使用此配置,Logstash从控制台读取数据,将其通过空过滤器传递,然后写回到控制台。应用此配置将测试Logstash的功能。为此,以交互方式运行它:



PS C:\...\bin> .\logstash.bat -f .\logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}


Logstash在端口9600上成功启动。



安装的最后一步是将Logstash作为Windows服务启动。例如,可以使用NSSM软件包来完成此操作



PS C:\...\bin> .\nssm.exe install logstash
Service "logstash" installed successfully!


容错



持久队列机制可确保从源服务器传输日志期间的安全性。



它是如何工作的



日志处理期间的队列布局:输入→队列→过滤器+输出。



输入插件从日志源接收数据,将其写入队列,然后将确认接收数据的消息发送到源。



Logstash处理队列中的消息,并通过过滤器和输出插件。收到来自发送日志的输出的确认后,Logstash将从队列中删除已处理的日志。如果Logstash停止,则所有未处理的消息和尚未收到发送确认的消息仍保留在队列中,Logstash在下次启动时将继续对其进行处理。



配置



由文件中的键调节 C:\Logstash\config\logstash.yml:



  • queue.type:(可能的值是persistedmemory (default))
  • path.queue:(带有队列文件的文件夹的路径,默认情况下存储在C:\ Logstash \ queue中)。
  • queue.page_capacity:(队列的最大页面大小,默认为64mb)。
  • queue.drain:(是/否-启用/禁用在关闭Logstash之前停止对队列的处理。我不建议将其打开,因为这将直接影响关闭服务器的速度)。
  • queue.max_events:(队列中的最大事件数,默认-0(无限制))。
  • queue.max_bytes:(最大队列大小(以字节为单位,默认值为1024mb(1gb)))。


如果配置了queue.max_eventsqueue.max_bytes,则在达到任何这些设置的值时,将停止在队列中接收消息。在此处阅读有关持久队列的更多信息



logstash.yml部分负责设置队列的示例:



queue.type: persisted
queue.max_bytes: 10gb


配置



Logstash配置通常由三个部分组成,分别负责处理传入日志的不同阶段:接收(输入部分),解析(过滤器部分)和发送到Elastic(输出部分)。下面我们将仔细研究它们中的每一个。



输入项



从文件信号代理接收带有原始日志的传入流。我们在输入部分指定的就是这个插件:



input {
  beats {
    port => 5044
  }
}


完成此设置后,Logstash开始侦听端口5044,并且在接收日志时,将根据过滤器部分中的设置对其进行处理。如有必要,您可以包装用于从SSL中的文件位接收日志的通道。在此处阅读有关Beats插件设置的更多信息



过滤



Exchange生成用于处理的所有有趣的文本日志均采用csv格式,并在日志文件本身中描述了字段。解析CSV记录,Logstash为我们提供了三个插件:解剖,CSV和神交。第一个是最快的,但是它只能解析最简单的日志。

例如,它将以下记录分成两部分(由于字段中存在逗号),这将导致日志分析不正确:



…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…


在解析日志(例如IIS)时可以使用它。在这种情况下,过滤器部分可能如下所示:



filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 


Logstash配置允许使用条件语句,因此我们只能将标有filebeat标签的日志发送到dissect插件IIS。在插件内部,我们将字段值与其名称匹配,message从日志中删除包含条目的原始字段,然后我们可以添加一个任意字段,例如包含从中收集日志的应用程序的名称。



在跟踪日志的情况下,最好使用csv插件,它可以正确处理复杂的字段:



filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}


在插件内部,我们将字段值与其名称匹配,从日志中删除包含该条目的原始字段message(以及tenant-idand字段schema-version),然后我们可以添加一个任意字段,例如包含从中收集日志的应用程序的名称。



在过滤阶段的出口,我们将获得近似的文档,准备在Kibana中进行渲染。我们将错过以下内容:



  • 数字字段将被识别为文本,从而阻止对其执行操作。即,time-takenIIS日志字段以及“跟踪”字段recipient-counttotal-bites日志。
  • 标准文档时间戳将包含日志处理时间,而不是服务器端记录时间。
  • 该字段recipient-address看起来像一个单一的结构,不允许进行计数来计算接收者的数量。


现在是时候在日志处理过程中添加一些魔术了。



转换数字字段



dissect插件具有一个选项convert_datatype,可用于将文本字段转换为数字格式。例如,像这样:



dissect {
  convert_datatype => { "time-taken" => "int" }
}


值得记住的是,该方法仅在字段肯定包含字符串的情况下才适用。该选项不处理字段中的空值,并引发异常。



对于跟踪日志,最好不要使用类似的方法转换,因为该领域recipient-count,并total-bites可以为空。最好使用mutate插件来转换这些字段



mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}


将收件人地址拆分为单个收件人



也可以使用mutate插件解决此任务:



mutate {
  split => ["recipient_address", ";"]
}


更改时间戳



对于跟踪日志,可以通过date插件轻松解决该任务,该插件将以timestamp所需格式从字段写入日期和时间date-time



date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}


对于IIS日志,我们将需要使用mutate插件将字段数据date结合起来time,注册所需的时区,并timestamp使用date插件将该时间戳记放置在其中:



mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}


输出量



输出部分用于将已处理的日志发送到日志接收器。在直接发送到Elastic的情况下,将使用elasticsearch插件,该插件指定服务器地址和索引名称的模板以发送生成的文档:



output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}


最终配置



最终配置如下所示:



input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}


有用的链接:






All Articles