Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions point/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type Encoder struct {

pts []*Point
lastPtsIdx,
trimmed,
parts int
lastErr error

Expand Down Expand Up @@ -90,7 +89,6 @@ func (e *Encoder) reset() {
e.lastPtsIdx = 0
e.lastErr = nil
e.parts = 0
e.trimmed = 0
e.pbpts.Arr = e.pbpts.Arr[:0]
e.lpPointBuf = e.lpPointBuf[:0]
}
Expand Down Expand Up @@ -229,8 +227,8 @@ func (e *Encoder) LastErr() error {
}

func (e *Encoder) String() string {
return fmt.Sprintf("encoding: %s, parts: %d, byte size: %d, e.batchSize: %d, lastPtsIdx: %d, trimmed: %d",
e.enc, e.parts, e.bytesSize, e.batchSize, e.lastPtsIdx, e.trimmed,
return fmt.Sprintf("encoding: %s, parts: %d, byte size: %d, e.batchSize: %d, lastPtsIdx: %d",
e.enc, e.parts, e.bytesSize, e.batchSize, e.lastPtsIdx,
)
}

Expand Down
28 changes: 25 additions & 3 deletions point/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,27 +549,49 @@ func TestGoGoPBDecodePB(t *T.T) {
t.Logf("gogopts:\n%s", string(j))
}

func TestPBArraySize(t *T.T) {
r := NewRander(WithFixedTags(true), WithRandText(3))
randPts := r.Rand(10000)

total := 0
for i := range randPts {
ptsize := randPts[i].pt.Size()
total += (1 + ptsize + sovPoint(uint64(ptsize)))
}

pbpts := PBPoints{}
for i := range randPts {
pbpts.Arr = append(pbpts.Arr, randPts[i].pt)
}

assert.Equal(t, pbpts.Size(), total)
}

func BenchmarkV2Encode(b *T.B) {
r := NewRander(WithFixedTags(true), WithRandText(3))
randPts := r.Rand(10000)

buf := make([]byte, 1<<20)
_ = buf

var arr [][]byte
_ = arr

b.Logf("start...")

b.ResetTimer()

b.Run("encode-v1", func(b *T.B) {
for i := 0; i < b.N; i++ {
enc := GetEncoder(WithEncEncoding(Protobuf))
enc.Encode(randPts)
arr, _ = enc.Encode(randPts)

assert.NoError(b, enc.LastErr())
PutEncoder(enc)
}
})

b.Run("Next", func(b *T.B) {
b.Run("encode-v2", func(b *T.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
enc := GetEncoder(WithEncEncoding(Protobuf))
enc.EncodeV2(randPts)
Expand Down
38 changes: 12 additions & 26 deletions point/encode_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,53 +29,39 @@ func (e *Encoder) Next(buf []byte) ([]byte, bool) {
}

func (e *Encoder) doEncodeProtobuf(buf []byte) ([]byte, bool) {
var (
curSize,
pbptsSize int
trimmed = 1
)
var curSize int

// clear points before current package
if len(e.pbpts.Arr) > 0 {
e.pbpts.Arr = e.pbpts.Arr[:0]
}

for _, pt := range e.pts[e.lastPtsIdx:] {
if pt == nil {
continue
}

curSize += pt.Size()
curSize += pt.PBArraySize()

// e.pbpts size larger than buf, we must trim some of points
// until size fit ok or MarshalTo will panic.
if curSize >= len(buf) {
if len(e.pbpts.Arr) <= 1 { // nothing to trim
// no point added, means current point(not added) is a
// huge point that can't fit into buf.
if len(e.pbpts.Arr) == 0 {
e.lastErr = errTooSmallBuffer
return nil, false
}

for {
if pbptsSize = e.pbpts.Size(); pbptsSize > len(buf) {
e.pbpts.Arr = e.pbpts.Arr[:len(e.pbpts.Arr)-trimmed]
e.lastPtsIdx -= trimmed
trimmed *= 2
} else {
goto __doEncode
}
}
break // break current package, the remaining points will encoded in next package
} else {
e.pbpts.Arr = append(e.pbpts.Arr, pt.pt)
e.lastPtsIdx++
}
}

__doEncode:
e.trimmed = trimmed

if len(e.pbpts.Arr) == 0 {
if len(e.pbpts.Arr) == 0 { // no points available, it's done
return nil, false
}

defer func() {
e.pbpts.Arr = e.pbpts.Arr[:0]
}()

if n, err := e.pbpts.MarshalTo(buf); err != nil {
e.lastErr = err
return nil, false
Expand Down
17 changes: 8 additions & 9 deletions point/point.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (p *Point) Size() int {
return n
}

// LPSize get point line-protocol size.
// LPSize get point size in line-protocol.
func (p *Point) LPSize() int {
lppt, err := p.LPPoint()
if err != nil {
Expand All @@ -456,16 +456,15 @@ func (p *Point) LPSize() int {

// PBSize get point protobuf size.
func (p *Point) PBSize() int {
pbpt := p.PBPoint()

m := protojson.Marshaler{}
buf := bytes.Buffer{}
return p.pt.Size()
}

if err := m.Marshal(&buf, pbpt); err != nil {
return 0
}
// PBArraySize get protobuf size when points encoded as array.
func (p *Point) PBArraySize() int {
l := p.PBSize()

return buf.Len()
// For sovPoint(), see PBPoints.Size()
return (1 + l + sovPoint(uint64(l)))
}

func b64(x []byte) string {
Expand Down