From f0d197862a7ee2a594c6e2afda33eba6f8f0ad93 Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 3 Jun 2024 11:02:44 +0800 Subject: [PATCH] refact(point): refact point protobuf encoding --- point/encode.go | 6 ++---- point/encode_test.go | 28 +++++++++++++++++++++++++--- point/encode_v2.go | 38 ++++++++++++-------------------------- point/point.go | 17 ++++++++--------- 4 files changed, 47 insertions(+), 42 deletions(-) diff --git a/point/encode.go b/point/encode.go index f4f1509b..cc262935 100644 --- a/point/encode.go +++ b/point/encode.go @@ -40,7 +40,6 @@ type Encoder struct { pts []*Point lastPtsIdx, - trimmed, parts int lastErr error @@ -90,7 +89,6 @@ func (e *Encoder) reset() { e.lastPtsIdx = 0 e.lastErr = nil e.parts = 0 - e.trimmed = 0 e.pbpts.Arr = e.pbpts.Arr[:0] e.lpPointBuf = e.lpPointBuf[:0] } @@ -229,8 +227,8 @@ func (e *Encoder) LastErr() error { } func (e *Encoder) String() string { - return fmt.Sprintf("encoding: %s, parts: %d, byte size: %d, e.batchSize: %d, lastPtsIdx: %d, trimmed: %d", - e.enc, e.parts, e.bytesSize, e.batchSize, e.lastPtsIdx, e.trimmed, + return fmt.Sprintf("encoding: %s, parts: %d, byte size: %d, e.batchSize: %d, lastPtsIdx: %d", + e.enc, e.parts, e.bytesSize, e.batchSize, e.lastPtsIdx, ) } diff --git a/point/encode_test.go b/point/encode_test.go index 8dce8b79..51daafb6 100644 --- a/point/encode_test.go +++ b/point/encode_test.go @@ -549,27 +549,49 @@ func TestGoGoPBDecodePB(t *T.T) { t.Logf("gogopts:\n%s", string(j)) } +func TestPBArraySize(t *T.T) { + r := NewRander(WithFixedTags(true), WithRandText(3)) + randPts := r.Rand(10000) + + total := 0 + for i := range randPts { + ptsize := randPts[i].pt.Size() + total += (1 + ptsize + sovPoint(uint64(ptsize))) + } + + pbpts := PBPoints{} + for i := range randPts { + pbpts.Arr = append(pbpts.Arr, randPts[i].pt) + } + + assert.Equal(t, pbpts.Size(), total) +} + func BenchmarkV2Encode(b *T.B) { r := NewRander(WithFixedTags(true), WithRandText(3)) randPts := r.Rand(10000) buf := make([]byte, 1<<20) + _ = buf + + var arr [][]byte + _ = arr b.Logf("start...") b.ResetTimer() - b.Run("encode-v1", func(b *T.B) { for i := 0; i < b.N; i++ { enc := GetEncoder(WithEncEncoding(Protobuf)) - enc.Encode(randPts) + arr, _ = enc.Encode(randPts) assert.NoError(b, enc.LastErr()) PutEncoder(enc) } }) - b.Run("Next", func(b *T.B) { + b.Run("encode-v2", func(b *T.B) { + b.ResetTimer() for i := 0; i < b.N; i++ { enc := GetEncoder(WithEncEncoding(Protobuf)) enc.EncodeV2(randPts) diff --git a/point/encode_v2.go b/point/encode_v2.go index f6159768..75e4f30b 100644 --- a/point/encode_v2.go +++ b/point/encode_v2.go @@ -29,53 +29,39 @@ func (e *Encoder) Next(buf []byte) ([]byte, bool) { } func (e *Encoder) doEncodeProtobuf(buf []byte) ([]byte, bool) { - var ( - curSize, - pbptsSize int - trimmed = 1 - ) + var curSize int + + // clear points before current package + if len(e.pbpts.Arr) > 0 { + e.pbpts.Arr = e.pbpts.Arr[:0] + } for _, pt := range e.pts[e.lastPtsIdx:] { if pt == nil { continue } - curSize += pt.Size() + curSize += pt.PBArraySize() - // e.pbpts size larger than buf, we must trim some of points - // until size fit ok or MarshalTo will panic. if curSize >= len(buf) { - if len(e.pbpts.Arr) <= 1 { // nothing to trim + // no point added, means current point(not added) is a + // huge point that can't fit into buf. + if len(e.pbpts.Arr) == 0 { e.lastErr = errTooSmallBuffer return nil, false } - for { - if pbptsSize = e.pbpts.Size(); pbptsSize > len(buf) { - e.pbpts.Arr = e.pbpts.Arr[:len(e.pbpts.Arr)-trimmed] - e.lastPtsIdx -= trimmed - trimmed *= 2 - } else { - goto __doEncode - } - } + break // break current package, the remaining points will encoded in next package } else { e.pbpts.Arr = append(e.pbpts.Arr, pt.pt) e.lastPtsIdx++ } } -__doEncode: - e.trimmed = trimmed - - if len(e.pbpts.Arr) == 0 { + if len(e.pbpts.Arr) == 0 { // no points available, it's done return nil, false } - defer func() { - e.pbpts.Arr = e.pbpts.Arr[:0] - }() - if n, err := e.pbpts.MarshalTo(buf); err != nil { e.lastErr = err return nil, false diff --git a/point/point.go b/point/point.go index 99f7a95e..4bf9a892 100644 --- a/point/point.go +++ b/point/point.go @@ -444,7 +444,7 @@ func (p *Point) Size() int { return n } -// LPSize get point line-protocol size. +// LPSize get point size in line-protocol. func (p *Point) LPSize() int { lppt, err := p.LPPoint() if err != nil { @@ -456,16 +456,15 @@ func (p *Point) LPSize() int { // PBSize get point protobuf size. func (p *Point) PBSize() int { - pbpt := p.PBPoint() - - m := protojson.Marshaler{} - buf := bytes.Buffer{} + return p.pt.Size() +} - if err := m.Marshal(&buf, pbpt); err != nil { - return 0 - } +// PBArraySize get protobuf size when points encoded as array. +func (p *Point) PBArraySize() int { + l := p.PBSize() - return buf.Len() + // For sovPoint(), see PBPoints.Size() + return (1 + l + sovPoint(uint64(l))) } func b64(x []byte) string {