InfluxDB WAL

1. InfluxDB TSM

经过3年的设计、开发、测试,InfluxDB终于来到 1.0版本,同时也拿到了1600万B轮的VC, 这其中InfluxDB也是经历了太多的变化。

  • cluster feature 被去掉
  • 写入协议由json 转换到 line
  • 引入wal
  • 引入cache
  • influxsql

当然,最值的一提的就是InfluxDB实现了一套自己的底层存储引擎:InfluxDB TSM, 新的存储引擎提高了写入性能和查询效率,还大大的降低了磁盘的使用空间。 最近打算学习下这个TSM引擎,于是有了这篇文章。

具体可以参考:https://www.influxdata.com/influxdb-1-0-ga-released-a-retrospective-and-whats-next/


2. WAL

What is WAL?

In computer science, write-ahead logging (WAL) is a family of techniques for providing atomicity and durability (two of the ACID properties) in database systems.

In a system using WAL, all modifications are written to a log before they are applied. Usually both redo and undo information is stored in the log.

The purpose of this can be illustrated by an example. Imagine a program that is in the middle of performing some operation when the machine it is running on loses power. Upon restart, that program might well need to know whether the operation it was performing succeeded, half-succeeded, or failed. If a write-ahead log is used, the program can check this log and compare what it was supposed to be doing when it unexpectedly lost power to what was actually done. On the basis of this comparison, the program could decide to undo what it had started, complete what it had started, or keep things as they are.

WAL allows updates of a database to be done in-place. Another way to implement atomic updates is with shadow paging, which is not in-place. The main advantage of doing updates in-place is that it reduces the need to modify indexes and block lists.

ARIES is a popular algorithm in the WAL family.

File systems typically use a variant of WAL for at least file system metadata called journaling. (From Wikipedia)

什么是WAL?

WAL: Write-Ahead Logging 预写日志系统 数据库中一种高效的日志算法,对于非内存数据库而言,磁盘I/O操作是数据库效率的一大瓶颈。在相同的数据量下,采用WAL日志的数据库系统在事务提交时,磁盘写操作只有传统的回滚日志的一半左右,大大提高了数据库磁盘I/O操作的效率,从而提高了数据库的性能。(From baidu)

InfluxDB WAL

InfluxDB 的 WAL都做了那些事情呢

  • Append-only log composed of fixed size segment files. 在一个固定大小的文件中不断的追加日志。
  • Writes are appended to the current segment. 只往当前打开的这个文件中追加数据。
  • Roll-over to new segment after filling the current segment. 当前文件达到设置的大小后会滚动到一个新文件。
  • Closed segments are never modified and used for startup and recovery as well as compactions. 文件一旦关闭就不能再被修改,只能用于启动加载、恢复,当然还有合并压缩。
  • There is a single WAL for the store as opposed to a WAL per shard. 每一个 shard 一个 WAL。

一个InfluxDB会启动一个store来管理所有的shards,每一个shard有一个engine, 目前只有一个版本TSM1(老的被废弃了)。每个engine有一个wal和一个cache、fs。今天我们探究下wal, 下面是它的结构:

type WAL struct {
  mu            sync.RWMutex
  lastWriteTime time.Time

  path string

  // write variables
  currentSegmentID     int
  currentSegmentWriter *WALSegmentWriter

  // cache and flush variables
  closing chan struct{}

  // WALOutput is the writer used by the logger.
  logger       *log.Logger // Logger to be used for important messages
  traceLogger  *log.Logger // Logger to be used when trace-logging is on.
  logOutput    io.Writer   // Writer to be logger and traceLogger if active.
  traceLogging bool

  // SegmentSize is the file size at which a segment file will be rotated
  SegmentSize int

  // statistics for the WAL
  stats   *WALStatistics
  limiter limiter.Fixed
}


path 下保存这个shard所有的数据文件。

currentSegmentWriter 是当前打开的文件,所有进来的数据会写入到这个文件当中

currentSegmentID 当前的segment file ID, 滚动自增

limiter 控制每个wal的写入协程数,从而控制磁盘写IO,默认是10

3. InfluxDB WalEntryType

InfluxDB 把写日志分成了 3 种操作:


// WalEntryType is a byte written to a wal segment file that indicates what the following compressed block contains
type WalEntryType byte

const (
  WriteWALEntryType       WalEntryType = 0x01
  DeleteWALEntryType      WalEntryType = 0x02
  DeleteRangeWALEntryType WalEntryType = 0x03
)

每种操作都实现了如下接口:


// WALEntry is record stored in each WAL segment.  Each entry has a type
// and an opaque, type dependent byte slice data attribute.

type WALEntry interface {
  Type() WalEntryType
  Encode(dst []byte) ([]byte, error)
  MarshalBinary() ([]byte, error)
  UnmarshalBinary(b []byte) error
}

这里我们以写操作为例探究下:


// WriteWALEntry represents a write of points.

type WriteWALEntry struct {
  Values map[string][]Value
}

// 这里的map key是 measurement + tag set + filed name 
// value是 (time, vlaue)

当然,write也同样实现了上面说的那几个接口,其中的MarshalBinary()中调用了encode函数,这个函数会对value数据类型进行断言, 按照一定的格式进行编码,这个函数就是整个设计的核心:


// Encode converts the WriteWALEntry into a byte stream using dst if it
// is large enough.  If dst is too small, the slice will be grown to fit the
// encoded entry.
func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
  // The entries values are encode as follows:
  //
  // For each key and slice of values, first a 1 byte type for the []Values
  // slice is written.  Following the type, the length and key bytes are written.
  // Following the key, a 4 byte count followed by each value as a 8 byte time
  // and N byte value.  The value is dependent on the type being encoded.  float64,
  // int64, use 8 bytes, boolean uses 1 byte, and string is similar to the key encoding,
  // except that string values have a 4-byte length, and keys only use 2 bytes.
  //
  // This structure is then repeated for each key an value slices.
  //
  // ┌────────────────────────────────────────────────────────────────────┐
  // │                           WriteWALEntry                            │
  // ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
  // │ Type │ Key Len │   Key  │ Count │  Time   │  Value  │...│ Type │...│
  // │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │   │1 byte│   │
  // └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘

  encLen := 7 * len(w.Values) // Type (1), Key Length (2), and Count (4) for each key

  // determine required length
  for k, v := range w.Values {
    encLen += len(k)
    if len(v) == 0 {
      return nil, errors.New("empty value slice in WAL entry")
    }

    encLen += 8 * len(v) // timestamps (8)

    switch v[0].(type) {
    case *FloatValue, *IntegerValue:
      encLen += 8 * len(v)
    case *BooleanValue:
      encLen += 1 * len(v)
    case *StringValue:
      for _, vv := range v {
        str, ok := vv.(*StringValue)
        if !ok {
          return nil, fmt.Errorf("non-string found in string value slice: %T", vv)
        }
        encLen += 4 + len(str.value)
      }
    default:
      return nil, fmt.Errorf("unsupported value type: %T", v[0])
    }
  }

  // allocate or re-slice to correct size
  if len(dst) < encLen {
    dst = make([]byte, encLen)
  } else {
    dst = dst[:encLen]
  }

  // Finally, encode the entry
  var n int
  var curType byte

  for k, v := range w.Values {
    switch v[0].(type) {
    case *FloatValue:
      curType = float64EntryType
    case *IntegerValue:
      curType = integerEntryType
    case *BooleanValue:
      curType = booleanEntryType
    case *StringValue:
      curType = stringEntryType
    default:
      return nil, fmt.Errorf("unsupported value type: %T", v[0])
    }
    dst[n] = curType
    n++

    binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k)))
    n += 2
    n += copy(dst[n:], k)

    binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v)))
    n += 4

    for _, vv := range v {
      binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano()))
      n += 8

      switch vv := vv.(type) {
      case *FloatValue:
        if curType != float64EntryType {
          return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
        }
        binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value))
        n += 8
      case *IntegerValue:
        if curType != integerEntryType {
          return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
        }
        binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
        n += 8
      case *BooleanValue:
        if curType != booleanEntryType {
          return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
        }
        if vv.value {
          dst[n] = 1
        } else {
          dst[n] = 0
        }
        n++
      case *StringValue:
        if curType != stringEntryType {
          return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
        }
        binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.value)))
        n += 4
        n += copy(dst[n:], vv.value)
      default:
        return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv)
      }
    }
  }

  return dst[:n], nil
}

编码完后,会通过google 的 snappy 进行压缩然后通过WALSegmentWriter 把类型和压缩后的数据写到文件中,最后sync以下文件,写入完成。

最后还在wal中看到了 WALSegmentReader, 主要实现了 read 和 next,应该是供compactor调用的把,后面我们一探究竟.

// WALSegmentReader reads WAL segments.
type WALSegmentReader struct {
  r     io.ReadCloser
  entry WALEntry
  n     int64
  err   error
}