在上一篇文章中我实现了一个支持Debug、Info、Error等多个级别的日志库,并将日志写到了磁盘文件中,代码比较简单,适合练手。有兴趣的可以通过这个链接前往:https://github.com/bosima/ylog/releases/tag/v1.0.1
工程实践中,我们往往还需要对日志进行采集,将日志归集到一起,然后用于各种处理分析,比如生产环境上的错误分析、异常告警等等。在日志消息系统领域,Kafka久负盛名,这篇文章就以将日志发送到Kafka来实现日志的采集;同时考虑到日志分析时对结构化数据的需求,这篇文章还会提供一种输出Json格式日志的方法。
这个升级版的日志库还要保持向前兼容,即还能够使用普通文本格式,以及写日志到磁盘文件,这两个特性和要新增的两个功能分别属于同类处理,因此我这里对它们进行抽象,形成两个接口:格式化接口、写日志接口。
所谓格式化,就是日志的格式处理。这个日志库目前要支持两种格式:普通文本和Json。
为了在不同格式之上提供一个统一的抽象,ylog中定义 logEntry 来代表一条日志:
type logEntry struct {
Ts time.Time `json:"ts"`
File string `json:"file"`
Line int `json:"line"`
Level LogLevel `json:"level"`
Msg string `json:"msg"`
}
格式化接口的能力就是将日志从logEntry格式转化为其它某种数据格式。ylog中对它的定义是:
type LoggerFormatter interface {
Format(*logEntry, *[]byte) error
}
第1个参数是一个logEntry实例,也就是要被格式化的日志,第2个参数是日志格式化之后要写入的容器。
其实现是这样的:
type textFormatter struct {
}
func NewTextFormatter() *textFormatter {
return &textFormatter{}
}
func (f *textFormatter) Format(entry *logEntry, buf *[]byte) error {
formatTime(buf, entry.Ts)
*buf = append(*buf, ' ')
file := toShort(entry.File)
*buf = append(*buf, file...)
*buf = append(*buf, ':')
itoa(buf, entry.Line, -1)
*buf = append(*buf, ' ')
*buf = append(*buf, levelNames[entry.Level]...)
*buf = append(*buf, ' ')
*buf = append(*buf, entry.Msg...)
return nil
}
可以看到它的主要功能就是将logEntry中的各个字段按照某种顺序平铺开来,中间用空格分隔。
其中的很多数据处理方法参考了Golang标准日志库中的数据格式化处理代码,有兴趣的可以去Github中详细查看。
这里对日期时间格式化为字符串做了特别的优化,在标准日志库中为了将年、月、日、时、分、秒、毫秒、微秒等格式化指定长度的字符串,使用了一个函数:
func itoa(buf *[]byte, i int, wid int) {
// Assemble decimal in reverse order.
var b [20]byte
bp := len(b) - 1
for i >= 10 || wid > 1 {
wid--
q := i / 10
b[bp] = byte('0' + i - q*10)
bp--
i = q
}
// i < 10
b[bp] = byte('0' + i)
*buf = append(*buf, b[bp:]...)
}
其逻辑大概就是将数字中的每一位转换为字符并存入byte中,注意这里初始化byte数组的时候是20位,这是int64最大的数字位数。
其实时间字符串中的每个部分位数都是固定的,比如年是4位、月日时分秒都是2位,根本不需要20位,所以这个空间可以节省;还有这里用了循环,这对于CPU的分支预测可能有那么点影响,所以我这里分别对不同位数写了专门的格式化方法,以2位数为例:
func itoa2(buf *[]byte, i int) {
q := i / 10
s := byte('0' + i - q*10)
f := byte('0' + q)
*buf = append(*buf, f, s)
}
其实现是这样的:
type jsonFormatter struct {
}
func NewJsonFormatter() *jsonFormatter {
return &jsonFormatter{}
}
func (f *jsonFormatter) Format(entry *logEntry, buf *[]byte) (err error) {
entry.File = toShortFile(entry.File)
jsonBuf, err := json.Marshal(entry)
*buf = append(*buf, jsonBuf...)
return
}
代码也很简单,使用标准库的json序列化方法将logEntry实例转化为Json格式的数据。
对于Json格式,后续考虑支持用户自定义Json字段,这里暂时先简单处理。
写日志就是将日志输出到别的目标,比如ylog要支持的输出到磁盘文件、输出到Kafka等。
前边格式化接口将格式化后的数据封装到了 []byte 中,写日志接口就是将格式化处理的输出 []byte 写到某种输出目标中。参考Golang中各种Writer的定义,ylog中对它的定义是:
type LoggerWriter interface {
Ensure(*logEntry) error
Write([]byte) error
Sync() error
Close() error
}
这里有4个方法:
这里定义一个名为fileWriter的类型,它需要实现LoggerWriter的接口。
先看类型的定义:
type fileWriter struct {
file *os.File
lastHour int64
Path string
}
包含四个字段:
再看其实现的接口:
func (w *fileWriter) Ensure(entry *logEntry) (err error) {
if w.file == nil {
f, err := w.createFile(w.Path, entry.Ts)
if err != nil {
return err
}
w.lastHour = w.getTimeHour(entry.Ts)
w.file = f
return nil
}
currentHour := w.getTimeHour(entry.Ts)
if w.lastHour != currentHour {
_ = w.file.Close()
f, err := w.createFile(w.Path, entry.Ts)
if err != nil {
return err
}
w.lastHour = currentHour
w.file = f
}
return
}
func (w *fileWriter) Write(buf []byte) (err error) {
buf = append(buf, '\n')
_, err = w.file.Write(buf)
return
}
func (w *fileWriter) Sync() error {
return w.file.Sync()
}
func (w *fileWriter) Close() error {
return w.file.Close()
}
Ensure 中的主要逻辑是创建当前要写入的文件对象,如果小时数变了,先把之前的关闭,再创建一个新的文件。
Write 把数据写入到文件对象,这里加了一个换行符,也就是说对于文件日志,其每条日志最后都会有一个换行符,这样比较方便阅读。
Sync 调用文件对象的Sync方法,将日志从操作系统缓存刷到磁盘。
Close 关闭当前文件对象。
这里定义一个名为kafkaWriter的类型,它也需要实现LoggerWriter的接口。
先看其结构体定义:
type kafkaWriter struct {
Topic string
Address string
writer *kafka.Writer
batchSize int
}
这里包含四个字段:
Topic 写Kafka时需要一个主题,这里默认当前Logger中所有日志使用同一个主题。
Address Kafka的访问地址。
writer 向Kafka写数据时使用的Writer,这里集成的是:github.com/segmentio/kafka-go,支持自动重试和重连。
batchSize Kafka写日志的批次大小,批量写可以提高日志的写效率。
再看其实现的接口:
func (w *kafkaWriter) Ensure(curTime time.Time) (err error) {
if w.writer == nil {
w.writer = &kafka.Writer{
Addr: kafka.TCP(w.Address),
Topic: w.Topic,
BatchSize: w.batchSize,
Async: true,
}
}
return
}
func (w *kafkaWriter) Write(buf []byte) (err error) {
// buf will be reused by ylog when this method return,
// with aysnc write, we need copy data to a new slice
kbuf := append([]byte(nil), buf...)
err = w.writer.WriteMessages(context.Background(),
kafka.Message{Value: kbuf},
)
return
}
func (w *kafkaWriter) Sync() error {
return nil
}
func (w *kafkaWriter) Close() error {
return w.writer.Close()
}
这里采用的是异步发送到Kafka的方式,WriteMessages方法不会阻塞,因为传入的buf要被ylog重用,所以这里copy了一下。异步还会存在的一个问题就是不会返回错误,可能丢失数据,不过对于日志这种数据,没有那么严格的要求,也可以接受。
如果采用同步发送,因为批量发送比较有效率,这里可以攒几条再发,但日志比较稀疏时,可能短时间很难攒够,就会出现长时间等不到日志的情况,所以还要有个超时机制,这有点麻烦,不过我也写了一个版本,有兴趣的可以去看看:https://github.com/bosima/ylog/blob/main/examples/kafka-writer.go
有了格式化接口和写日志接口,下一步就是将它们组装起来,以实现相应的处理能力。
首先是创建它们,因为我这里也没有动态配置的需求,所以就放到创建Logger实例的时候了,这样比较简单。
func NewYesLogger(opts ...Option) (logger *YesLogger) {
logger = &YesLogger{}
...
logger.writer = NewFileWriter("logs")
logger.formatter = NewTextFormatter()
for _, opt := range opts {
opt(logger)
}
...
return
}
可以看到默认的formatter是textFormatter,默认的writer是fileWriter。这个函数传入的Option其实是个函数,在下边的opt(logger)中会执行它们,所以使用其它的Formatter或者Writer可以这样做:
logger := ylog.NewYesLogger(
...
ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)),
ylog.Formatter(ylog.NewJsonFormatter()),
)
这里 ylog.Writer 和 ylog.Formatter 就是符合Option类型的函数,调用它们可以设置不同的Formatter和Writer。
然后怎么使用它们呢?
...
l.formatter.Format(entry, &buf)
l.writer.Ensure(entry)
err := l.writer.Write(buf)
...
当 logEntry 进入消息处理环节后,首先调用formatter的Format方法格式化logEntry;然后调用了writer的Ensure方法确保writer已经准备好,最后调用writer的Write方法将格式化之后的数据输出到对应的目标。
为什么不将Ensure方法放到Write中呢?这是因为目前写文本日志的时候需要根据logEntry中的日志时间创建日志文件,这样就需要给Writer传递两个参数,有点别扭,所以这里将它们分开了。
Kafka的吞吐量是很高的,那么如果放到ylog自身来说,如何提高它的吞吐量呢?
首先想到的就是Channel,可以使用有缓冲的Channel模拟一个队列,生产者不停的向Channel发送数据,如果Writer可以一直在缓冲被填满之前将数据取走,那么理论上说生产者就是非阻塞的,相比同步输出到某个Writer,没有直接磁盘IO、网络IO,日志处理的吞吐量必将大幅提升。
定义一个Channel,其容量默认为当前机器逻辑处理器的数量:
logger.pipe = make(chan *logEntry, runtime.NumCPU())
发送数据的代码:
entry := &logEntry{
Level: level,
Msg: s,
File: file,
Line: line,
Ts: now,
}
l.pipe <- entry
接收数据的代码:
for {
select {
case entry := <-l.pipe:
// reuse the slice memory
buf = buf[:0]
l.formatter.Format(entry, &buf)
l.writer.Ensure(entry.Ts)
err := l.writer.Write(buf)
...
}
}
实际效果怎么样呢?看下Benchmark:
goos: darwin
goarch: amd64
pkg: github.com/bosima/ylog
cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
BenchmarkInfo-8 1332333 871.6 ns/op 328 B/op 4 allocs/op
这个结果可以和zerolog、zap等高性能日志库一较高下了,当然目前可以做的事情要比它们简单很多。
如果对Java有所了解的同学应该听说过log4j,在log4j2中引入了一个名为Disruptor的组件,它让日志处理飞快了起来,受到很多Java开发者的追捧。Disruptor之所以这么厉害,是因为它使用了无锁并发、环形队列、缓存行填充等多种高级技术。
相比之下,Golang的Channel虽然也使用了环形缓冲,但是还是使用了锁,作为队列来说性能并不是最优的。
Golang中有没有类似的东西呢?最近出来的ZenQ可能是一个不错的选择,不过看似还不太稳定,过段时间再尝试下。有兴趣的可以去看看:https://github.com/alphadose/ZenQ 。
好了,以上就是本文的主要内容。关于ylog的介绍也告一段落了,后续会在Github上持续更新,增加更多有用的功能,并不断优化处理性能,欢迎关注:https://github.com/bosima/ylog 。
收获更多架构知识,请关注微信公众号 萤火架构。原创内容,转载请注明出处。
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
为了将Cucumber用于命令行脚本,我按照提供的说明安装了arubagem。它在我的Gemfile中,我可以验证是否安装了正确的版本并且我已经包含了require'aruba/cucumber'在'features/env.rb'中为了确保它能正常工作,我写了以下场景:@announceScenario:Testingcucumber/arubaGivenablankslateThentheoutputfrom"ls-la"shouldcontain"drw"假设事情应该失败。它确实失败了,但失败的原因是错误的:@announceScenario:Testingcucumber/ar
我正在使用puppet为ruby程序提供一组常量。我需要提供一组主机名,我的程序将对其进行迭代。在我之前使用的bash脚本中,我只是将它作为一个puppet变量hosts=>"host1,host2"我将其提供给bash脚本作为HOSTS=显然这对ruby不太适用——我需要它的格式hosts=["host1","host2"]自从phosts和putsmy_array.inspect提供输出["host1","host2"]我希望使用其中之一。不幸的是,我终其一生都无法弄清楚如何让它发挥作用。我尝试了以下各项:我发现某处他们指出我需要在函数调用前放置“function_”……这
在我的Controller中,我通过以下方式在我的index方法中支持HTML和JSON:respond_todo|format|format.htmlformat.json{renderjson:@user}end在浏览器中拉起它时,它会自然地以HTML呈现。但是,当我对/user资源进行内容类型为application/json的curl调用时(因为它是索引方法),我仍然将HTML作为响应。如何获取JSON作为响应?我还需要说明什么? 最佳答案 您应该将.json附加到请求的url,提供的格式在routes.rb的路径中定义。这
这是一道面试题,我没有答对,但还是很好奇怎么解。你有N个人的大家庭,分别是1,2,3,...,N岁。你想给你的大家庭拍张照片。所有的家庭成员都排成一排。“我是家里的friend,建议家庭成员安排如下:”1岁的家庭成员坐在这一排的最左边。每两个坐在一起的家庭成员的年龄相差不得超过2岁。输入:整数N,1≤N≤55。输出:摄影师可以拍摄的照片数量。示例->输入:4,输出:4符合条件的数组:[1,2,3,4][1,2,4,3][1,3,2,4][1,3,4,2]另一个例子:输入:5输出:6符合条件的数组:[1,2,3,4,5][1,2,3,5,4][1,2,4,3,5][1,2,4,5,3][
这个问题在这里已经有了答案:Railsformattingdate(4个答案)关闭4年前。我想格式化Time.Now函数以显示YYYY-MM-DDHH:MM:SS而不是:“2018-03-0909:47:19+0000”该函数需要放在时间中.现在功能。require‘roo’require‘roo-xls’require‘byebug’file_name=ARGV.first||“Template.xlsx”excel_file=Roo::Spreadsheet.open(“./#{file_name}“,extension::xlsx)xml=Nokogiri::XML::Build
我喜欢使用Textile或Markdown为我的项目编写自述文件,但是当我生成RDoc时,自述文件被解释为RDoc并且看起来非常糟糕。有没有办法让RDoc通过RedCloth或BlueCloth而不是它自己的格式化程序运行文件?它可以配置为自动检测文件后缀的格式吗?(例如README.textile通过RedCloth运行,但README.mdown通过BlueCloth运行) 最佳答案 使用YARD直接代替RDoc将允许您包含Textile或Markdown文件,只要它们的文件后缀是合理的。我经常使用类似于以下Rake任务的东西:
给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最
我有一个非常简单的RubyRack服务器,例如:app=Proc.newdo|env|req=Rack::Request.new(env).paramspreq.inspect[200,{'Content-Type'=>'text/plain'},['Somebody']]endRack::Handler::Thin.run(app,:Port=>4001,:threaded=>true)每当我使用JSON对象向服务器发送POSTHTTP请求时:{"session":{"accountId":String,"callId":String,"from":Object,"headers":
我想使用spawn(针对多个并发子进程)在Ruby中执行一个外部进程,并将标准输出或标准错误收集到一个字符串中,其方式类似于使用Python的子进程Popen.communicate()可以完成的操作。我尝试将:out/:err重定向到一个新的StringIO对象,但这会生成一个ArgumentError,并且临时重新定义$stdxxx会混淆子进程的输出。 最佳答案 如果你不喜欢popen,这是我的方法:r,w=IO.pipepid=Process.spawn(command,:out=>w,:err=>[:child,:out])