我继续讲述如何结交Exchange和ELK朋友的故事(从此处开始)。让我提醒您,这种组合能够毫不犹豫地处理大量日志。这次,我们将讨论如何使Exchange与Logstash和Kibana组件一起使用。
ELK堆栈中的Logstash用于智能处理日志,并准备将其以文档形式放置在Elastic中,在此基础上,可以方便地在Kibana中构建各种可视化文件。
安装
包括两个阶段:
- 安装和配置OpenJDK软件包。
- 安装和配置Logstash软件包。
安装和配置OpenJDK
软件包必须下载OpenJDK软件包并将其解压缩到特定目录中。然后,必须将此目录的路径输入Windows操作系统的$ env:Path和$ env:JAVA_HOME变量中:
检查Java版本:
PS C:\> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)
安装和配置Logstash软件包
从此处 下载带有Logstash发行版的存档文件。必须将归档文件解压缩到磁盘的根目录。
C:\Program Files
您不应将其解压缩到文件夹中,Logstash将拒绝正常启动。然后,您需要对jvm.options
负责为Java进程分配RAM的文件进行更改。我建议指定服务器RAM的一半。如果他板上有16 GB的RAM,则默认密钥为:
-Xms1g
-Xmx1g
必须替换为:
-Xms8g
-Xmx8g
另外,建议将这一行注释掉
-XX:+UseConcMarkSweepGC
。在此处了解更多信息。下一步是在logstash.conf文件中创建默认配置:
input {
stdin{}
}
filter {
}
output {
stdout {
codec => "rubydebug"
}
}
使用此配置,Logstash从控制台读取数据,将其通过空过滤器传递,然后写回到控制台。应用此配置将测试Logstash的功能。为此,以交互方式运行它:
PS C:\...\bin> .\logstash.bat -f .\logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
Logstash在端口9600上成功启动。
安装的最后一步是将Logstash作为Windows服务启动。例如,可以使用NSSM软件包来完成此操作:
PS C:\...\bin> .\nssm.exe install logstash
Service "logstash" installed successfully!
容错
持久队列机制可确保从源服务器传输日志期间的安全性。
它是如何工作的
日志处理期间的队列布局:输入→队列→过滤器+输出。
输入插件从日志源接收数据,将其写入队列,然后将确认接收数据的消息发送到源。
Logstash处理队列中的消息,并通过过滤器和输出插件。收到来自发送日志的输出的确认后,Logstash将从队列中删除已处理的日志。如果Logstash停止,则所有未处理的消息和尚未收到发送确认的消息仍保留在队列中,Logstash在下次启动时将继续对其进行处理。
配置
由文件中的键调节
C:\Logstash\config\logstash.yml:
queue.type
:(可能的值是persisted
和memory (default))
。path.queue
:(带有队列文件的文件夹的路径,默认情况下存储在C:\ Logstash \ queue中)。queue.page_capacity
:(队列的最大页面大小,默认为64mb)。queue.drain
:(是/否-启用/禁用在关闭Logstash之前停止对队列的处理。我不建议将其打开,因为这将直接影响关闭服务器的速度)。queue.max_events
:(队列中的最大事件数,默认-0(无限制))。queue.max_bytes
:(最大队列大小(以字节为单位,默认值为1024mb(1gb)))。
如果配置了
queue.max_events
和queue.max_bytes
,则在达到任何这些设置的值时,将停止在队列中接收消息。在此处阅读有关持久队列的更多信息。
logstash.yml部分负责设置队列的示例:
queue.type: persisted
queue.max_bytes: 10gb
配置
Logstash配置通常由三个部分组成,分别负责处理传入日志的不同阶段:接收(输入部分),解析(过滤器部分)和发送到Elastic(输出部分)。下面我们将仔细研究它们中的每一个。
输入项
从文件信号代理接收带有原始日志的传入流。我们在输入部分指定的就是这个插件:
input {
beats {
port => 5044
}
}
完成此设置后,Logstash开始侦听端口5044,并且在接收日志时,将根据过滤器部分中的设置对其进行处理。如有必要,您可以包装用于从SSL中的文件位接收日志的通道。在此处阅读有关Beats插件设置的更多信息。
过滤
Exchange生成用于处理的所有有趣的文本日志均采用csv格式,并在日志文件本身中描述了字段。解析CSV记录,Logstash为我们提供了三个插件:解剖,CSV和神交。第一个是最快的,但是它只能解析最简单的日志。
例如,它将以下记录分成两部分(由于字段中存在逗号),这将导致日志分析不正确:
…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…
在解析日志(例如IIS)时可以使用它。在这种情况下,过滤器部分可能如下所示:
filter {
if "IIS" in [tags] {
dissect {
mapping => {
"message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
}
remove_field => ["message"]
add_field => { "application" => "exchange" }
}
}
}
Logstash配置允许使用条件语句,因此我们只能将标有filebeat标签的日志发送到dissect插件
IIS
。在插件内部,我们将字段值与其名称匹配,message
从日志中删除包含条目的原始字段,然后我们可以添加一个任意字段,例如包含从中收集日志的应用程序的名称。
在跟踪日志的情况下,最好使用csv插件,它可以正确处理复杂的字段:
filter {
if "Tracking" in [tags] {
csv {
columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
remove_field => ["message", "tenant-id", "schema-version"]
add_field => { "application" => "exchange" }
}
}
在插件内部,我们将字段值与其名称匹配,从日志中删除包含该条目的原始字段
message
(以及tenant-id
and字段schema-version
),然后我们可以添加一个任意字段,例如包含从中收集日志的应用程序的名称。
在过滤阶段的出口,我们将获得近似的文档,准备在Kibana中进行渲染。我们将错过以下内容:
- 数字字段将被识别为文本,从而阻止对其执行操作。即,
time-taken
IIS日志字段以及“跟踪”字段recipient-count
和total-bites
日志。 - 标准文档时间戳将包含日志处理时间,而不是服务器端记录时间。
- 该字段
recipient-address
看起来像一个单一的结构,不允许进行计数来计算接收者的数量。
现在是时候在日志处理过程中添加一些魔术了。
转换数字字段
dissect插件具有一个选项
convert_datatype
,可用于将文本字段转换为数字格式。例如,像这样:
dissect {
…
convert_datatype => { "time-taken" => "int" }
…
}
值得记住的是,该方法仅在字段肯定包含字符串的情况下才适用。该选项不处理字段中的空值,并引发异常。
对于跟踪日志,最好不要使用类似的方法转换,因为该领域
recipient-count
,并total-bites
可以为空。最好使用mutate插件来转换这些字段:
mutate {
convert => [ "total-bytes", "integer" ]
convert => [ "recipient-count", "integer" ]
}
将收件人地址拆分为单个收件人
也可以使用mutate插件解决此任务:
mutate {
split => ["recipient_address", ";"]
}
更改时间戳
对于跟踪日志,可以通过date插件轻松解决该任务,该插件将以
timestamp
所需格式从字段中写入日期和时间date-time
:
date {
match => [ "date-time", "ISO8601" ]
timezone => "Europe/Moscow"
remove_field => [ "date-time" ]
}
对于IIS日志,我们将需要使用mutate插件将字段数据
date
和结合起来time
,注册所需的时区,并timestamp
使用date插件将该时间戳记放置在其中:
mutate {
add_field => { "data-time" => "%{date} %{time}" }
remove_field => [ "date", "time" ]
}
date {
match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
timezone => "UTC"
remove_field => [ "data-time" ]
}
输出量
输出部分用于将已处理的日志发送到日志接收器。在直接发送到Elastic的情况下,将使用elasticsearch插件,该插件指定服务器地址和索引名称的模板以发送生成的文档:
output {
elasticsearch {
hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
manage_template => false
index => "Exchange-%{+YYYY.MM.dd}"
}
}
最终配置
最终配置如下所示:
input {
beats {
port => 5044
}
}
filter {
if "IIS" in [tags] {
dissect {
mapping => {
"message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
}
remove_field => ["message"]
add_field => { "application" => "exchange" }
convert_datatype => { "time-taken" => "int" }
}
mutate {
add_field => { "data-time" => "%{date} %{time}" }
remove_field => [ "date", "time" ]
}
date {
match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
timezone => "UTC"
remove_field => [ "data-time" ]
}
}
if "Tracking" in [tags] {
csv {
columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
remove_field => ["message", "tenant-id", "schema-version"]
add_field => { "application" => "exchange" }
}
mutate {
convert => [ "total-bytes", "integer" ]
convert => [ "recipient-count", "integer" ]
split => ["recipient_address", ";"]
}
date {
match => [ "date-time", "ISO8601" ]
timezone => "Europe/Moscow"
remove_field => [ "date-time" ]
}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
manage_template => false
index => "Exchange-%{+YYYY.MM.dd}"
}
}
有用的链接: