Distributed Tracing

最近在研究分布式trace相关的东西,随着微服务和容器技术的不断普及,服务之前的调用关系变得越来越复杂,如何让这些关系变得更加可视化,并且能发现其中的短板性能问题显得越来越重要。

Jaeger

jaeger是uber开源的一套分布式trace系统,并且完全遵循opentracing标准。zipkin也是trace领域非常火的项目,但是试用下来,从性能和后期优化方面考虑,我们最终选择了jaeger。当然基于jaeger我们做了些优化和调整,可以和大家简单分享一下:

agent 合并

从架构图上可以看到 jaeger 需要一个本机的agent用于转发数据,目前我们内网已经有一个监控agent,再维护一个agent成本还是很高的,不如直接简单合并一下,避免改动太多,版本还是跟着社区走。新建一个trace package, 代码如下:


package trace

import (
	"fmt"
	"path/filepath"

	"github.com/jaegertracing/jaeger/cmd/agent/app"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

const (
	defaultMetricsBackend = "prometheus"
	defaultMetricsRoute   = "/metrics"
)

var defaultProcessors = []struct {
	model    app.Model
	protocol app.Protocol
	hostPort string
}{
	{model: "jaeger", protocol: "compact", hostPort: ":6831"},
	{model: "jaeger", protocol: "binary", hostPort: ":6832"},
}

const (
	defaultQueueSize     = 1000
	defaultMaxPacketSize = 65000
	defaultServerWorkers = 10
	defaultMinPeers      = 3

	defaultHTTPServerHostPort = ":5778"
)

// Tracer custom struct
type Tracer struct {
	agent *app.Agent
}

// New trace service
func New(collector []string, logDir string) (*Tracer, error) {
	t := &Tracer{}
	if len(collector) < 1 {
		return t, fmt.Errorf("config collector address first: %d", len(collector))
	}

	conf := zap.NewProductionConfig()
	var level zapcore.Level
	err := (&level).UnmarshalText([]byte("info"))
	if err != nil {
		return t, err
	}
	conf.Level = zap.NewAtomicLevelAt(level)
	conf.OutputPaths = []string{filepath.Join(logDir, "INFO.log")}
	logger, err := conf.Build()
	if err != nil {
		return t, err
	}

	builder := &app.Builder{}
	builder.Metrics.Backend = defaultMetricsBackend
	builder.Metrics.HTTPRoute = defaultMetricsRoute

	for _, processor := range defaultProcessors {
		p := &app.ProcessorConfiguration{Model: processor.model, Protocol: processor.protocol}
		p.Workers = defaultServerWorkers
		p.Server.QueueSize = defaultQueueSize
		p.Server.MaxPacketSize = defaultMaxPacketSize
		p.Server.HostPort = processor.hostPort
		builder.Processors = append(builder.Processors, *p)
	}

	builder.CollectorHostPorts = collector

	builder.HTTPServer.HostPort = defaultHTTPServerHostPort
	builder.DiscoveryMinPeers = defaultMinPeers

	t.agent, err = builder.CreateAgent(logger)
	if err != nil {
		return t, fmt.Errorf("Unable to initialize Jaeger Agent: %s", err)
	}
	return t, nil
}

// Start trace service
func (t *Tracer) Start() error {
	if err := t.agent.Run(); err != nil {
		return fmt.Errorf("Failed to run the trace module: %s", err)
	}
	return nil
}

在包外调用 NewStart就可以了,非常简单。

service name filter

另外我们不希望任何人随随便便都可以往trace系统里面打数据,可能会有一些恶意写入,另外我们还想做到脏数据的清洗和控流,这样的需求最好在collector组件中添加,阅读了代码后发现collector内置了spanfilter,只是没有开放使用而已,这样就方便很多了,我们建立一个第三方库用于维护规则,然后在collector中引用,这样对collector的改动就非常小,但是我们仍然想直接集成到collector中,但是像这样的逻辑代码以一种什么样的形式合并进去还需要再考虑。

jaeger 仓库中的默认 spanfilter:


func defaultSpanFilter(*model.Span) bool {
	return true
}

只要在函数中添加过滤规则即可:


import "github.com/ifeng/tracefilter"

func defaultSpanFilter(span *model.Span) bool {
	return tracefilter.Check(span)
}


先整理到这里。