k8s与日志--journalbeat源码解读

简介: 前言 对于日志系统的重要性不言而喻,参照沪江的一篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用: 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案 服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态 数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程 日志+大数据+AI的确有很多想象空间。

前言

对于日志系统的重要性不言而喻,参照沪江的一 篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用:

  • 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案
  • 服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态
  • 数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程

日志+大数据+AI的确有很多想象空间。
而对于收集系统,流行的技术stack有之前的elk,到现在的efk。logstash换成了filebeat。当然日志收集agent,也有flume和fluentd,尤其fluentd属于cncf组织的产品,在k8s中有着广泛的应用。但是fluentd是ruby写的,不利于深入源码了解。当然今天我们重点讲的是另外一个agent--journalbeat。望文生义,隶属于efk stack 中beats系列中的一员,专门用于收集journald日志。

journalbeat源码解读

journald日志简介

长久以来 syslog 是每一个 Unix 系统中的重要部件。在漫长的历史中在各种 Linux 发行版中都有不同的实现去完成类似的工作,它们采取的是逻辑相近,并使用基本相同的文件格式。但是 syslog 也存在诸多的问题,随着新设备的出现以及对安全的重视,这些缺点越发显得突出,例如日志消息内容无法验证、数据格式松散、日志检索低效、有限的元数据保存、无法记录二进制数据等。
Journald是针对以上需求的解决方案。受udev事件启发,Journal 条目与环境组块相似。一个键值域,按照换行符分开,使用大写的变量名。除了支持ASCII 格式的字符串外,还能够支持二进制数据,如 ATA SMART 健康信息、SCSI 数据。应用程序和服务可以通过将项目域传递给systemd journald服务来生成项目。该服务可以为项目增加一定数量的元数据。这些受信任域的值由 Journal 服务来决定且无法由客户端来伪造。在Journald中,可以把日志数据导出,在异地读取,并不受处理器架构的影响。这对嵌入式设备是很有用的功能,方便维护人员分析设备运行状况。
大致总结就是

  • journald日志是新的linux系统的具备的
  • journald区别于传统的文件存储方式,是二进制存储。需要用journalctl查看。

docker对于journald的支持

The journald logging driver sends container logs to the systemd journal. Log entries can be retrieved using the journalctl command, through use of the journal API, or using the docker logs command.
即docker除了json等日志格式,已经增加了journald驱动。

目前本司使用场景

我们的k8s集群,所有的docker输出的日志格式都采用journald,这样主机centos系统日志和docker的日志都用journalbeat来收集。

journalbeat实现关键

journalbeat整个实现过程,基本上两点:

  • 与其他社区贡献的beats系列,比如packetbeat,mysqlbeat类似,遵循了beats的框架和约定,journalbeat实现了run和stop等方法即可,然后作为一个客户端,将收集到的数据,publish到beats中。
  • 读取journald日志,采用了coreos开源的go-systemd库中sdjournal部分。其实sdjournal是一个利用cgo 对于journald日志c接口的封装。

源码解读

程序入口:

package main

import (
 "log" "github.com/elastic/beats/libbeat/beat" "github.com/mheese/journalbeat/beater"
)

func main() {
 err := beat.Run("journalbeat", "", beater.New)
 if err != nil {
 log.Fatal(err)
 }
}

整个journalbeat共实现了3个方法即可。run,stop,和new。
run和stop顾名思义,就是beats控制journalbeat的运行和停止。
而new:
需要按照

// Creator initializes and configures a new Beater instance used to execute // the beat its run-loop. type Creator func(*Beat, *common.Config) (Beater, error)

实现Creator方法,返回的Beater实例,交由beats控制。
具体实现:

// New creates beater func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
 config := config.DefaultConfig
 var err error
 if err = cfg.Unpack(&config); err != nil {
 return nil, fmt.Errorf("Error reading config file: %v", err)
 }

 jb := &Journalbeat{
 config: config,
 done: make(chan struct{}),
 cursorChan: make(chan string),
 pending: make(chan *eventReference),
 completed: make(chan *eventReference, config.PendingQueue.CompletedQueueSize),
 }

 if err = jb.initJournal(); err != nil {
 logp.Err("Failed to connect to the Systemd Journal: %v", err)
 return nil, err
 }

 jb.client = b.Publisher.Connect()
 return jb, nil
}

一般的beats中,都会有一些共同属性。例如下面的done和client属性。

// Journalbeat is the main Journalbeat struct type Journalbeat struct {
 done chan struct{}
 config config.Config
 client publisher.Client

 journal *sdjournal.Journal

 cursorChan chan string
 pending, completed chan *eventReference
 wg sync.WaitGroup
}

done是一个控制整个beater启停的信号量。
而client 是与beats平台通信的client。注意在初始化的时候,

jb.client = b.Publisher.Connect()

建立链接。
然后在收集到数据,发送的时候,也是通过该client

select {
 case <-jb.done:
 return nil
 default:
 // we need to clone to avoid races since map is a pointer...
 jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)
 }

注意上边的发送姿势和对于刚才提到的done信号量使用。
其他方法都是业务相关不再详细解读了。

journalbeat如何保证发送失败的日志重新发送

关于这点,个人感觉是最优雅的部分

所有发送失败的日志是会在程序结束之前以json格式保存到文件,完成持久化。

 // on exit fully consume both queues and flush to disk the pending queue defer func() {
 var wg sync.WaitGroup
 wg.Add(2)

 go func() {
 defer wg.Done()
 for evRef := range jb.pending {
 pending[evRef.cursor] = evRef.body
 }
 }()

 go func() {
 defer wg.Done()
 for evRef := range jb.completed {
 completed[evRef.cursor] = evRef.body
 }
 }()
 wg.Wait()

 logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
 if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
 }
 }()

程序启动以后首先会读取之前持久化的发送失败的日志,重新发送

// load the previously saved queue of unsent events and try to publish them if any
 if err := jb.publishPending(); err != nil {
 logp.Warn("could not read the pending queue: %s", err)
 }

client publish收集到的日志到beats,设置了publisher.Guaranteed模式,成功和失败都有反馈

jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)

其中publisher.Signal(&eventSignal{ref, jb.completed})类似于一个回调,凡是成功的都会写成功的ref到jb.completed中。方便客户端控制。

维护了两个chan,一个存放客户端发送的日志,一个存放服务端接受成功的日志,精确对比,可获取发送失败的日志,进入重发动作

journalbeat struct中有下面两个属性

pending, completed chan *eventReference

每次客户端发送一条日志,都会写到pending。

case publishedChan <- jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed):
 if published := <-publishedChan; published {
 jb.pending <- ref // save cursor if jb.config.WriteCursorState {
 jb.cursorChan <- rawEvent.Cursor
 }
 }
 }

publisher.Signal(&eventSignal{ref, jb.completed}),回调会将成功的写到completed。
整个程序同时会启动一个
go jb.managePendingQueueLoop()
协程,专门用来定时重发失败日志。

 // managePendingQueueLoop runs the loop which manages the set of events waiting to be acked func (jb *Journalbeat) managePendingQueueLoop() {
 jb.wg.Add(1)
 defer jb.wg.Done()
 pending := map[string]common.MapStr{}
 completed := map[string]common.MapStr{}

 // diff returns the difference between this map and the other.
 diff := func(this, other map[string]common.MapStr) map[string]common.MapStr {
 result := map[string]common.MapStr{}
 for k, v := range this {
 if _, ok := other[k]; !ok {
 result[k] = v
 }
 }
 return result
 }

 // flush saves the map[string]common.MapStr to the JSON file on disk
 flush := func(source map[string]common.MapStr, dest string) error {
 tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest)))
 if err != nil {
 return err
 }

 if err = json.NewEncoder(tempFile).Encode(source); err != nil {
 _ = tempFile.Close()
 return err
 }

 _ = tempFile.Close()
 return os.Rename(tempFile.Name(), dest)
 }

 // on exit fully consume both queues and flush to disk the pending queue defer func() {
 var wg sync.WaitGroup
 wg.Add(2)

 go func() {
 defer wg.Done()
 for evRef := range jb.pending {
 pending[evRef.cursor] = evRef.body
 }
 }()

 go func() {
 defer wg.Done()
 for evRef := range jb.completed {
 completed[evRef.cursor] = evRef.body
 }
 }()
 wg.Wait()

 logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
 if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
 }
 }()

 // flush the pending queue to disk periodically
 tick := time.Tick(jb.config.PendingQueue.FlushPeriod)
 for {
 select {
 case <-jb.done:
 return case p, ok := <-jb.pending:
 if ok {
 pending[p.cursor] = p.body
 }
 case c, ok := <-jb.completed:
 if ok {
 completed[c.cursor] = c.body
 }
 case <-tick:
 result := diff(pending, completed)
 if err := flush(result, jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing %s: %s", jb.config.PendingQueue.File, err)
 }
 pending = result
 completed = map[string]common.MapStr{}
 }
 }
}

总结

当然还有一些其他的细节,不再一一讲述了。比如定时写Cursor的功能和日志格式转换等。具体的大家可以看源码。主要是讲了我认为其优雅的部分和为beats编写beater的要点。

本文转自SegmentFault-k8s与日志--journalbeat源码解读

相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务&nbsp;ACK 容器服务&nbsp;Kubernetes&nbsp;版(简称&nbsp;ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
相关文章
|
4月前
|
Kubernetes 容器
k8s学习-CKS真题-日志审计 log audit
k8s学习-CKS真题-日志审计 log audit
74 0
|
4月前
|
Kubernetes 容器
在Kubernetes(k8s)中部署Higress时,查看Wasm插件日志的方法如下
在Kubernetes(k8s)中部署Higress时,查看Wasm插件日志的方法如下
73 1
|
6月前
|
消息中间件 Kubernetes 监控
【K8S系列】如何高效查看 k8s日志
【K8S系列】如何高效查看 k8s日志
930 0
|
16天前
|
Kubernetes Linux Windows
kubectl 本地远程链接k8s多个集群,远程管控多集群,查看日志 部署服务(windows版)
kubectl 本地远程链接k8s多个集群,远程管控多集群,查看日志 部署服务(windows版)
226 0
|
4月前
|
Kubernetes Shell Linux
linux|shell脚本|有趣的知识---格式化输出日志和脚本调试方法以及kubernetes集群核心服务重启和集群证书备份脚本
linux|shell脚本|有趣的知识---格式化输出日志和脚本调试方法以及kubernetes集群核心服务重启和集群证书备份脚本
60 0
|
4月前
|
Kubernetes 监控 容器
k8s学习-CKA真题-监控Pod日志
k8s学习-CKA真题-监控Pod日志
78 0
|
1月前
|
Prometheus 监控 Kubernetes
Kubernetes 集群监控与日志管理实践
【2月更文挑战第29天】 在微服务架构日益普及的当下,Kubernetes 已成为容器编排的事实标准。然而,随着集群规模的扩大和业务复杂度的提升,有效的监控和日志管理变得至关重要。本文将探讨构建高效 Kubernetes 集群监控系统的策略,以及实施日志聚合和分析的最佳实践。通过引入如 Prometheus 和 Fluentd 等开源工具,我们旨在为运维专家提供一套完整的解决方案,以保障系统的稳定性和可靠性。
|
1月前
|
Prometheus 监控 Kubernetes
Kubernetes 集群的监控与日志管理实践
【2月更文挑战第31天】 在微服务架构日益普及的今天,容器编排工具如Kubernetes已成为部署、管理和扩展容器化应用的关键平台。然而,随着集群规模的扩大和业务复杂性的增加,如何有效监控集群状态、及时响应系统异常,以及管理海量日志信息成为了运维人员面临的重要挑战。本文将深入探讨 Kubernetes 集群监控的最佳实践和日志管理的高效策略,旨在为运维团队提供一套系统的解决思路和操作指南。
27 0
|
4月前
|
存储 Kubernetes Cloud Native
云原生|kubernetes|apiserver审计日志的开启
云原生|kubernetes|apiserver审计日志的开启
49 0
|
4月前
|
Kubernetes Cloud Native 网络协议
云原生|kubernetes|搭建部署一个稳定高效的EFK日志系统
云原生|kubernetes|搭建部署一个稳定高效的EFK日志系统
75 0