go语言zap日志自定义输出

前篇文章中介绍了zap日志包的basic用法, 在zap.Config的配置中, 有OutputPathsErrorOutputPaths分别可以指定日志输出目标, 一般情况下, 日志会输出到stdout stderr 或 本地文本文件. 本篇文章介绍自定义协议的日志输出目标

基础配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
logLevel := zap.NewAtomicLevelAt(zapcore.DebugLevel)

var zc = zap.Config{
Level: logLevel,
Development: false,
DisableCaller: false,
DisableStacktrace: false,
Sampling: nil,
Encoding: "json",
EncoderConfig: zapcore.EncoderConfig{
MessageKey: "message",
LevelKey: "level",
TimeKey: "time",
NameKey: "name",
CallerKey: "caller",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
EncodeName: zapcore.FullNameEncoder,
},
OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stderr"},
InitialFields: map[string]interface{}{"app": "apdex"},
}

OutputPaths 定义

源码截取

1
2
3
// OutputPaths is a list of URLs or file paths to write logging output to.
// See Open for details.
OutputPaths []string `json:"outputPaths" yaml:"outputPaths"`
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Open is a high-level wrapper that takes a variadic number of URLs, opens or
// creates each of the specified resources, and combines them into a locked
// WriteSyncer. It also returns any error encountered and a function to close
// any opened files.
//
// Passing no URLs returns a no-op WriteSyncer. Zap handles URLs without a
// scheme and URLs with the "file" scheme. Third-party code may register
// factories for other schemes using RegisterSink.
//
// URLs with the "file" scheme must use absolute paths on the local
// filesystem. No user, password, port, fragments, or query parameters are
// allowed, and the hostname must be empty or "localhost".
//
// Since it's common to write logs to the local filesystem, URLs without a
// scheme (e.g., "/var/log/foo.log") are treated as local file paths. Without
// a scheme, the special paths "stdout" and "stderr" are interpreted as
// os.Stdout and os.Stderr. When specified without a scheme, relative file
// paths also work.
func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
writers, close, err := open(paths)
if err != nil {
return nil, nil, err
}

writer := CombineWriteSyncers(writers...)
return writer, close, nil
}

func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {
writers := make([]zapcore.WriteSyncer, 0, len(paths))
closers := make([]io.Closer, 0, len(paths))
close := func() {
for _, c := range closers {
c.Close()
}
}

var openErr error
for _, path := range paths {
sink, err := newSink(path)
if err != nil {
openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
continue
}
writers = append(writers, sink)
closers = append(closers, sink)
}
if openErr != nil {
close()
return writers, nil, openErr
}

return writers, close, nil
}

实现自定义输出协议, 注释中最重要的一句话是: Third-party code may register factories for other schemes using RegisterSink. 也就是说, 你可以使用 RegisterSink 函数来注册新的输出方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// RegisterSink registers a user-supplied factory for all sinks with a
// particular scheme.
//
// All schemes must be ASCII, valid under section 3.1 of RFC 3986
// (https://tools.ietf.org/html/rfc3986#section-3.1), and must not already
// have a factory registered. Zap automatically registers a factory for the
// "file" scheme.
func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {
_sinkMutex.Lock()
defer _sinkMutex.Unlock()

if scheme == "" {
return errors.New("can't register a sink factory for empty string")
}
normalized, err := normalizeScheme(scheme)
if err != nil {
return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
}
if _, ok := _sinkFactories[normalized]; ok {
return fmt.Errorf("sink factory already registered for scheme %q", normalized)
}
_sinkFactories[normalized] = factory
return nil
}

该函数的入参之一为一个自定义的工厂函数, 该工厂函数接收一个*url.URL类型的指针, 返回一个Sink类型的接口和一个错误. 简化来看, 其实我们需要重点处理的就是这个自定义工厂函数.

1
2
3
func xxx (*url.URL) (Sink, error) {

}

该工厂函数需要返回的最重要的值为一个实现了Sink接口的对象, Sink接口的定义如下:

1
2
3
4
5
// Sink defines the interface to write to and close logger destinations.
type Sink interface {
zapcore.WriteSyncer
io.Closer
}

Sink接口可以简单理解为是用来写日志和关闭日志对象的, Sink接口中又包含两个接口, 两个接口的源码定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// A WriteSyncer is an io.Writer that can also flush any buffered data. Note
// that *os.File (and thus, os.Stderr and os.Stdout) implement WriteSyncer.
type WriteSyncer interface {
io.Writer
Sync() error
}

// Writer is the interface that wraps the basic Write method.
//
// Write writes len(p) bytes from p to the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Write must return a non-nil error if it returns n < len(p).
// Write must not modify the slice data, even temporarily.
//
// Implementations must not retain p.
type Writer interface {
Write(p []byte) (n int, err error)
}

// Closer is the interface that wraps the basic Close method.
//
// The behavior of Close after the first call is undefined.
// Specific implementations may document their own behavior.
type Closer interface {
Close() error
}

展开所有接口的定义后, 可以清晰的看出, 我们只需要定义三个函数, 就可以实现Sink接口

  • Sync() error // 如果写日志用到了缓存的话, 你需要定义sync的逻辑, 以便在程序关闭时, 保证缓存中的日志可以正确写入到目标对象中
  • Write(p []byte) (n int, err error) // 这个是写日志的函数, 也是三个函数中, 必须要定义逻辑的一个函数, 它定义了日志内容p如何写入到目标对象中
  • Close() error // 关闭日志对象, 在程序关闭时, 你可能需要先正确关闭日志对象. 例如你的日志对象是个文件时, 你需要close掉这个文件描述符; 例如你的日志对象是个远程TCP连接时, 你需要close掉这条连接

实现自定义日志输出协议

上面已经明确了自定义日志输出协议需要定义的函数, 那么接下来我们就来定义一个最简单的日志输出协议, 当程序中调用日志打印时, 我将日志以全部红色的字体打印在控制台的标准输出, 需要用到一个特殊的库 github.com/gookit/color 该库可以实现带颜色的控制台输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import (
"github.com/gookit/color"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/url"
)

// 定义一个结构体
type Color struct {
}

// 定义Sync方法以实现Sink接口
func (c Color) Sync() error {
// 因为只是控制台打印, 涉及不到缓存同步问题, 所以简单return就可以
return nil
}

// 定义Close方法以实现Sink接口
func (c Color) Close() error {
// 因为只是控制台打印, 也涉及不到关闭对象的问题, return就好
return nil
}

// 定义Write方法以实现Sink接口
func (c Color) Write(p []byte) (n int, err error) {
// 使用带颜色的控制台输出日志信息
color.Red.Println("我是自定义的日志输出协议: ", string(p))
// 返回写入日志的长度,以及错误
return len(p), nil
}

// 定义工厂函数
func colorSink(url *url.URL) (sink zap.Sink, err error) {
// 工厂函数中, 定义了必须接收一个*url.URL参数
// 但是我们的需求比较简单, 暂时用不到, 所以可以直接忽略这个参数的使用

// 实例化一个Color对象, 该对象实现了Sink接口
c := Color{}
return c, nil
}

func main() {

// 将colorSink工厂函数注册到zap中, 自定义协议名为 Color
if err := zap.RegisterSink("Color", colorSink); err != nil {
painc(err)
return
}

logLevel := zap.NewAtomicLevelAt(zapcore.DebugLevel)
var zc = zap.Config{
Level: logLevel,
Development: false,
DisableCaller: false,
DisableStacktrace: false,
Sampling: nil,
Encoding: "json",
EncoderConfig: zapcore.EncoderConfig{
MessageKey: "message",
LevelKey: "level",
TimeKey: "time",
NameKey: "name",
CallerKey: "caller",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
EncodeName: zapcore.FullNameEncoder,
},
// 日志标准输出的定义中, 除了标准的控制台输出, 还增加了一个我们自定义的Color协议输出
// 这里需要注意的是, 我们的自定义协议中, 固定是接收了一个 *url.URL, 虽然我们没有用到
// 但是在日志实际配置使用时, 我们仍需要显示传递该参数. 按照http协议的风格, 我们可以
// 将其定义为 "Color://127.0.0.1", 当然 "Color:127.0.0.1" 和 "Color:"
// 这种形式也是可以的. 但是 "Color" 这种是错误的配置形式
OutputPaths: []string{"stdout", "Color://127.0.0.1"},
ErrorOutputPaths: []string{"stderr"},
InitialFields: map[string]interface{}{"app": "apdex"},
}

logger, err := zc.Build()
if err != nil {
panic(err)
}
defer logger.Sync()

logger.Info("logger construction succeeded",
zap.String("key", "value"),
zap.Int("number", 9),
)

sugarLogger := logger.Sugar()

sugarLogger.Infow("logger construction succeeded",
"key", "value", "number", 9,
)
}

执行结果:

实现自定义日志输出协议-http

接下来, 再实现一个稍微复杂一些的例子, 通过http协议, 将日志以json格式发送到远端.

首先来实现接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
)

func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Println("请求方法:", r.Method)
fmt.Println("请求地址:", r.URL)
body, _ := ioutil.ReadAll(r.Body)
fmt.Println("Body体内容: ", bytes.NewBuffer(body).String())
fmt.Println("===========")
})

log.Fatal(http.ListenAndServe("0.0.0.0:5001", nil))
}

服务端/日志接收端代码很简单, 只是启动了一个http服务, 用来接收http请求, 会把接收到的请求的基本信息打印出来. 执行 go run logServer.go 启动日志接收服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main

import (
"bytes"
"errors"
"fmt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/http"
"net/url"
)

// 定义http协议接口体
type Http struct {
// 包含一个属性: 远端地址
Remote *url.URL
}

// 因为没有使用到缓存技术, 所以直接return即可
func (h Http) Sync() error {
return nil
}

// 关闭http连接, 因为本次示例仅是短连接, 不涉及关闭连接, 所以直接return即可
func (h Http) Close() error {
return nil
}

// 本质就是发起一次http post的过程
func (h Http) Write(p []byte) (n int, err error) {
// 构造 http post 请求
req, err := http.NewRequest("POST", h.Remote.String(), bytes.NewBuffer(p))
if err != nil {
return 0, nil
}
req.Header.Set("Content-Type", "application/json")
// 创建http client
client := &http.Client{}
// 发起请求
resp, err := client.Do(req)
if err != nil {
return 0, nil
}
// 记得关闭body
defer resp.Body.Close()
if resp.StatusCode != 200 {
return 0, errors.New("request failed")
}
// 返回写入的字节长度
return len(p), nil
}

// 实现工厂函数, 返回实现了Sink接口的http对象
func httpSink(url *url.URL) (sink zap.Sink, err error) {
h := Http{Remote: url}
return h, nil
}

func main() {
// 把自定义的http协议日志输出注册到zap
if err := zap.RegisterSink("Http", httpSink); err != nil {
fmt.Println(err)
return
}

logLevel := zap.NewAtomicLevelAt(zapcore.DebugLevel)

var zc = zap.Config{
Level: logLevel,
Development: false,
DisableCaller: false,
DisableStacktrace: false,
Sampling: nil,
Encoding: "json",
EncoderConfig: zapcore.EncoderConfig{
MessageKey: "message",
LevelKey: "level",
TimeKey: "time",
NameKey: "name",
CallerKey: "caller",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
EncodeName: zapcore.FullNameEncoder,
},
// 使用自定义的http协议日志输出时, 既然要记得配置成正确的协议格式
// xxx://ooo
// 由于我在本地 5001 启动了一个http服务, 所以IP地址需要配置成: 127.0.0.1:5001
// 我启动的http服务监听了根 /, 所以可以接受任何Request Path, 这里的/log 仅仅是为了
// 演示才配置上的, 你可以定义任何需要的路径
OutputPaths: []string{"stdout", "http://127.0.0.1:5001/log"},
ErrorOutputPaths: []string{"stderr"},
InitialFields: map[string]interface{}{"app": "apdex"},
}

logger, err := zc.Build()
if err != nil {
panic(err)
}
defer logger.Sync()

logger.Info("logger construction succeeded",
zap.String("key", "value"),
zap.Int("number", 9),
)

sugarLogger := logger.Sugar()

sugarLogger.Infow("logger construction succeeded",
"key", "value", "number", 9,
)
}

执行结果:

远程日志接收服务端:

1
2
3
4
5
6
7
8
9
10
请求方法: POST
请求地址: /log
Body体内容: {"level":"info","time":"2020-03-25T14:48:29.221+0800","caller":"log/log2.go:117","message":"logger construction succeeded","app":"apdex","key":"value","number":9}

===========
请求方法: POST
请求地址: /log
Body体内容: {"level":"info","time":"2020-03-25T14:48:29.221+0800","caller":"log/log2.go:124","message":"logger construction succeeded","app":"apdex","key":"value","number":9}

===========

执行写日志的客户端:

1
2
{"level":"info","time":"2020-03-25T14:48:29.221+0800","caller":"log/log2.go:117","message":"logger construction succeeded","app":"apdex","key":"value","number":9}
{"level":"info","time":"2020-03-25T14:48:29.221+0800","caller":"log/log2.go:124","message":"logger construction succeeded","app":"apdex","key":"value","number":9}

注意: 该http用法仅仅是用来抛砖引玉, 该代码无法在实际生产环境中使用, 因为日志量太大的话, http短连接会消耗相当大的主机资源, 如果确实需要以这种方式传输日志的话, 可以考虑使用全局的http连接池, 并做好Close方法的实现. 也可以开辟一块内存, 专门用来做日志缓存, 定期发起http请求, 发送到远端, 避免频繁的单次调用, 同时也需要做好Sync方法的实现

最佳实践

自定义日志传输协议的最佳实践, 应该是直接将json日志发送到远端的kafka服务器中. 以kafka的吞吐能力, 正常的日志输出想要把kafka使用到瓶颈的一个状态, 并不是很容易😆

一般情况下的日志处理流程是, 程序将日志打入到本地文件, 然后由filebeat或logstash之类的日志收集客户端进行收集, 收集后发送到kafka, 然后再由logstash消费, 处理格式后发送给elasticsearch.

程序直接对接kafka的话, 依然要注意日志量输出情况, 好在zap本身就可以设置日志输出速率, 因为kafka抗住一个疯狂打印日志的”疯子”很容易, 抗住千千万万个乱打日志的”疯子”, kafka也受不了的~

我这里暂时没有kafka环境, 后续有机会的话会单独实现zap下kafka的日志传输协议