From 6ed358813fb2b61f3f5a1a90d07bedef779426a3 Mon Sep 17 00:00:00 2001 From: coanor Date: Thu, 23 May 2024 11:54:39 +0800 Subject: [PATCH 1/6] save --- point/point.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/point/point.go b/point/point.go index 50c74b25..cc3fe17a 100644 --- a/point/point.go +++ b/point/point.go @@ -466,16 +466,7 @@ func (p *Point) LPSize() int { // PBSize get point protobuf size. func (p *Point) PBSize() int { - pbpt := p.PBPoint() - - m := protojson.Marshaler{} - buf := bytes.Buffer{} - - if err := m.Marshal(&buf, pbpt); err != nil { - return 0 - } - - return buf.Len() + return p.pt.Size() } func b64(x []byte) string { From e66ed3e3f2befa0b93b4f6688fb911fa6e4adecb Mon Sep 17 00:00:00 2001 From: coanor Date: Thu, 23 May 2024 17:58:16 +0800 Subject: [PATCH 2/6] use cap to detect encode buffer size --- point/encode.go | 4 ++-- point/encode_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/point/encode.go b/point/encode.go index 1a2010eb..6d12a99c 100644 --- a/point/encode.go +++ b/point/encode.go @@ -317,14 +317,14 @@ func (e *Encoder) doEncodeProtobuf(buf []byte) ([]byte, bool) { // e.pbpts size larger than buf, we must trim some of points // until size fit ok or MarshalTo will panic. - if curSize >= len(buf) { + if curSize >= cap(buf) { if len(e.pbpts.Arr) <= 1 { // nothing to trim e.lastErr = errTooSmallBuffer return nil, false } for { - if pbptsSize = e.pbpts.Size(); pbptsSize > len(buf) { + if pbptsSize = e.pbpts.Size(); pbptsSize > cap(buf) { e.pbpts.Arr = e.pbpts.Arr[:len(e.pbpts.Arr)-trimmed] e.lastPtsIdx -= trimmed trimmed *= 2 diff --git a/point/encode_test.go b/point/encode_test.go index 1bc12e66..ddfac849 100644 --- a/point/encode_test.go +++ b/point/encode_test.go @@ -500,7 +500,7 @@ func TestV2Encode(t *T.T) { var ( decodePts []*Point round int - buf = make([]byte, 1<<20) // KB + buf = make([]byte, 0, 1<<20) // KB ) for { From bde7a8be289a99c66daad8b454d7033fb5793677 Mon Sep 17 00:00:00 2001 From: coanor Date: Fri, 24 May 2024 15:00:56 +0800 Subject: [PATCH 3/6] update point check warnning --- point/check.go | 42 ++++----- point/ptpool_test.go | 220 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 228 insertions(+), 34 deletions(-) diff --git a/point/check.go b/point/check.go index 02291db7..c7110326 100644 --- a/point/check.go +++ b/point/check.go @@ -45,8 +45,8 @@ func (c *checker) checkMeasurement(m string) string { if c.cfg.maxMeasurementLen > 0 && len(m) > c.cfg.maxMeasurementLen { c.addWarn(WarnInvalidMeasurement, - fmt.Sprintf("exceed max measurement length(%d), got length %d, trimmed", - c.cfg.maxMeasurementLen, len(m))) + fmt.Sprintf("%s: exceed max measurement length(%d), got length %d, trimmed", + m, c.cfg.maxMeasurementLen, len(m))) return m[:c.cfg.maxMeasurementLen] } else { return m @@ -151,8 +151,8 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { if c.cfg.maxTagValLen > 0 && len(x.S) > c.cfg.maxTagValLen { c.addWarn(WarnMaxTagValueLen, - fmt.Sprintf("exceed max tag value length(%d), got %d, value truncated", - c.cfg.maxTagValLen, len(x.S))) + fmt.Sprintf("%s: exceed max tag value length(%d), got %d, value truncated", + x, c.cfg.maxTagValLen, len(x.S))) x.S = x.S[:c.cfg.maxTagValLen] f.Val = x @@ -160,7 +160,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { // check tag key '\', '\n' if strings.HasSuffix(f.Key, `\`) || strings.Contains(f.Key, "\n") { - c.addWarn(WarnInvalidTagKey, fmt.Sprintf("invalid tag key `%s'", f.Key)) + c.addWarn(WarnInvalidTagKey, fmt.Sprintf("%s: invalid tag key `%s'", f, f.Key)) newKey := adjustKV(f.Key) if c.keyConflict(newKey, kvs) { @@ -172,7 +172,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { // check tag value: '\', '\n' if strings.HasSuffix(f.GetS(), `\`) || strings.Contains(f.GetS(), "\n") { - c.addWarn(WarnInvalidTagValue, fmt.Sprintf("invalid tag value %q", f.GetS())) + c.addWarn(WarnInvalidTagValue, fmt.Sprintf("%s: invalid tag value %q", f, f.GetS())) x.S = adjustKV(x.S) f.Val = x @@ -180,7 +180,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { // replace `.' with `_' in tag keys if strings.Contains(f.Key, ".") && !c.cfg.enableDotInKey { - c.addWarn(WarnInvalidTagKey, fmt.Sprintf("invalid tag key `%s': found `.'", f.Key)) + c.addWarn(WarnInvalidTagKey, fmt.Sprintf("%s: invalid tag key `%s': found `.'", f, f.Key)) newKey := strings.ReplaceAll(f.Key, ".", "_") if c.keyConflict(newKey, kvs) { @@ -191,7 +191,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { } if c.keyDisabled(f.Key) { - c.addWarn(WarnTagDisabled, fmt.Sprintf("tag key `%s' disabled", f.Key)) + c.addWarn(WarnTagDisabled, fmt.Sprintf("%s: tag key `%s' disabled", f, f.Key)) return f, false } @@ -204,8 +204,8 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { // trim key if c.cfg.maxFieldKeyLen > 0 && len(f.Key) > c.cfg.maxFieldKeyLen { c.addWarn(WarnMaxFieldKeyLen, - fmt.Sprintf("exceed max field key length(%d), got %d, key truncated to %s", - c.cfg.maxFieldKeyLen, len(f.Key), f.Key)) + fmt.Sprintf("%s: exceed max field key length(%d), got %d, key truncated to %s", + f, c.cfg.maxFieldKeyLen, len(f.Key), f.Key)) newKey := f.Key[:c.cfg.maxFieldKeyLen] @@ -218,7 +218,7 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if strings.Contains(f.Key, ".") && !c.cfg.enableDotInKey { c.addWarn(WarnDotInkey, - fmt.Sprintf("invalid field key `%s': found `.'", f.Key)) + fmt.Sprintf("%s: invalid field key `%s': found `.'", f, f.Key)) newKey := strings.ReplaceAll(f.Key, ".", "_") if c.keyConflict(newKey, kvs) { @@ -230,7 +230,7 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if c.keyDisabled(f.Key) { c.addWarn(WarnFieldDisabled, - fmt.Sprintf("field key `%s' disabled, value: %v", f.Key, f.Raw())) + fmt.Sprintf("%s: field key `%s' disabled, value: %v", f, f.Key, f.Raw())) return nil, false } @@ -239,7 +239,8 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if !c.cfg.enableU64Field { if x.U > uint64(math.MaxInt64) { c.addWarn(WarnMaxFieldValueInt, - fmt.Sprintf("too large int field: key=%s, value=%d(> %d)", f.Key, x.U, uint64(math.MaxInt64))) + fmt.Sprintf("%s: too large int field: key=%s, value=%d(> %d)", + f, f.Key, x.U, uint64(math.MaxInt64))) return f, false } else { // Force convert uint64 to int64: to disable line proto like @@ -265,14 +266,14 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if !c.cfg.enableStrField { c.addWarn(WarnInvalidFieldValueType, - fmt.Sprintf("field(%s) dropped with string value, when [DisableStringField] enabled", f.Key)) + fmt.Sprintf("%s: field(%s) dropped with string value, when [DisableStringField] enabled", f, f.Key)) return f, false } if c.cfg.maxFieldValLen > 0 && len(x.D) > c.cfg.maxFieldValLen { c.addWarn(WarnMaxFieldValueLen, - fmt.Sprintf("field (%s) exceed max field value length(%d), got %d, value truncated", - f.Key, c.cfg.maxFieldValLen, len(x.D))) + fmt.Sprintf("%s: field (%s) exceed max field value length(%d), got %d, value truncated", + f, f.Key, c.cfg.maxFieldValLen, len(x.D))) x.D = x.D[:c.cfg.maxFieldValLen] f.Val = x @@ -282,14 +283,14 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if !c.cfg.enableStrField { c.addWarn(WarnInvalidFieldValueType, - fmt.Sprintf("field(%s) dropped with string value, when [DisableStringField] enabled", f.Key)) + fmt.Sprintf("%s: field(%s) dropped with string value, when [DisableStringField] enabled", f, f.Key)) return f, false } if c.cfg.maxFieldValLen > 0 && len(x.S) > c.cfg.maxFieldValLen { c.addWarn(WarnMaxFieldValueLen, - fmt.Sprintf("field (%s) exceed max field value length(%d), got %d, value truncated", - f.Key, c.cfg.maxFieldValLen, len(x.S))) + fmt.Sprintf("%s: field (%s) exceed max field value length(%d), got %d, value truncated", + f, f.Key, c.cfg.maxFieldValLen, len(x.S))) x.S = x.S[:c.cfg.maxFieldValLen] f.Val = x @@ -297,8 +298,7 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { default: c.addWarn(WarnInvalidFieldValueType, - fmt.Sprintf("invalid field (%s), value: %s, type: %s", - f.Key, f.Val, reflect.TypeOf(f.Val))) + fmt.Sprintf("%s: invalid field (%s), value: %s, type: %s", f, f.Key, f.Val, reflect.TypeOf(f.Val))) return f, false } diff --git a/point/ptpool_test.go b/point/ptpool_test.go index be2033aa..71a75584 100644 --- a/point/ptpool_test.go +++ b/point/ptpool_test.go @@ -491,15 +491,61 @@ func TestReservedCapPointPool(t *T.T) { }) } -func TestPoolKVResuable(t *T.T) { - type Foo struct { +type Foo struct { + Measurement string + + TS int64 + + // tags + T1Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + T2Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + T3Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + + T1 string + T2 string + T3 string + + SKey, S string `fake:"{regex:[a-zA-Z0-9]{128}}"` + + I8Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I16Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I64 int64 + I8 int8 + I16 int16 + I32 int32 + + U8Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + U16Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + U32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + U64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + + U8 uint8 + U16 uint16 + U32 uint32 + U64 uint64 + + BKey string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + DKey string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + F64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + F32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + B bool + D []byte + F64 float64 + F32 float32 +} + +func TestPoolKVResuableConcurrently(t *T.T) { + + type foo struct { Measurement string TS int64 // tags - T1Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - T2Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + T1Key string `fake:"{regex:[a-zA-Z0-9_]{16}}"` + T2Key string `fake:"{regex:[a-zA-Z0-9_]{32}}"` T3Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` T1 string @@ -508,18 +554,18 @@ func TestPoolKVResuable(t *T.T) { SKey, S string `fake:"{regex:[a-zA-Z0-9]{128}}"` - I8Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - I16Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - I32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I8Key string `fake:"{regex:[a-zA-Z0-9_]{8}}"` + I16Key string `fake:"{regex:[a-zA-Z0-9_]{16}}"` + I32Key string `fake:"{regex:[a-zA-Z0-9_]{32}}"` I64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` I64 int64 I8 int8 I16 int16 I32 int32 - U8Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - U16Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - U32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + U8Key string `fake:"{regex:[a-zA-Z0-9_]{8}}"` + U16Key string `fake:"{regex:[a-zA-Z0-9_]{16}}"` + U32Key string `fake:"{regex:[a-zA-Z0-9_]{32}}"` U64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` U8 uint8 @@ -527,9 +573,9 @@ func TestPoolKVResuable(t *T.T) { U32 uint32 U64 uint64 - BKey string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - DKey string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - F64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + BKey string `fake:"{regex:[a-zA-Z0-9_]{8}}"` + DKey string `fake:"{regex:[a-zA-Z0-9_]{16}}"` + F64Key string `fake:"{regex:[a-zA-Z0-9_]{32}}"` F32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` B bool D []byte @@ -537,6 +583,154 @@ func TestPoolKVResuable(t *T.T) { F32 float32 } + fn := func() *Point { + var f foo + assert.NoError(t, gofakeit.Struct(&f)) + var kvs KVs + kvs = kvs.AddTag("T_"+f.T1Key, f.T1) + kvs = kvs.AddTag("T_"+f.T2Key, f.T2) + kvs = kvs.AddTag("T_"+f.T3Key, f.T3) + + kvs = kvs.AddV2("S_"+f.SKey, f.S, true) + + kvs = kvs.AddV2("I8_"+f.I8Key, f.I8, true) + kvs = kvs.AddV2("I16_"+f.I16Key, f.I16, true) + kvs = kvs.AddV2("I32_"+f.I32Key, f.I32, true) + kvs = kvs.AddV2("I64_"+f.I64Key, f.I64, true) + + kvs = kvs.AddV2("U8_"+f.U8Key, f.U8, true) + kvs = kvs.AddV2("U16_"+f.U16Key, f.U16, true) + kvs = kvs.AddV2("U32_"+f.U32Key, f.U32, true) + kvs = kvs.AddV2("U64_"+f.U64Key, f.U64, true) + + kvs = kvs.AddV2("F32_"+f.F32Key, f.F32, true) + kvs = kvs.AddV2("F64_"+f.F64Key, f.F64, true) + + kvs = kvs.AddV2("B_"+f.BKey, f.B, true) + kvs = kvs.AddV2("D_"+f.DKey, f.D, true) + + if f.TS < 0 { + f.TS = 0 + } + + pt := NewPointV2(f.Measurement, kvs, WithTimestamp(f.TS)) + + require.Equal(t, f.T1, pt.Get("T_"+f.T1Key)) + require.Equal(t, f.T2, pt.Get("T_"+f.T2Key)) + require.Equal(t, f.T3, pt.Get("T_"+f.T3Key)) + + require.Equal(t, f.S, pt.Get("S_"+f.SKey)) + + require.Equal(t, int64(f.I8), pt.Get("I8_"+f.I8Key)) + require.Equalf(t, int64(f.I16), pt.Get("I16_"+f.I16Key), "got %s", pt.Pretty()) + require.Equal(t, int64(f.I32), pt.Get("I32_"+f.I32Key)) + require.Equal(t, f.I64, pt.Get("I64_"+f.I64Key)) + + require.Equal(t, uint64(f.U8), pt.Get("U8_"+f.U8Key)) + require.Equal(t, uint64(f.U16), pt.Get("U16_"+f.U16Key)) + require.Equal(t, uint64(f.U32), pt.Get("U32_"+f.U32Key)) + require.Equal(t, f.U64, pt.Get("U64_"+f.U64Key)) + + require.Equal(t, f.B, pt.Get("B_"+f.BKey), "got %s", pt.Pretty()) + require.Equal(t, f.D, pt.Get("D_"+f.DKey)) + require.Equalf(t, float64(f.F32), pt.Get("F32_"+f.F32Key), "got %s", pt.Pretty()) + require.Equal(t, f.F64, pt.Get("F64_"+f.F64Key)) + + require.Equal(t, f.TS, pt.Time().UnixNano(), "got %s", pt.Pretty()) + + return pt + } + + pp := NewReservedCapPointPool(1 << 14) // 16kb + SetPointPool(pp) + + metrics.MustRegister(pp) + + t.Cleanup(func() { + ClearPointPool() + metrics.Unregister(pp) + }) + + nworkers := 32 + npts := (1 << 30) + + wg := sync.WaitGroup{} + wg.Add(nworkers) + ch := make(chan int) + chExit := make(chan any) + batchSize := npts / nworkers + + for i := 0; i < nworkers; i++ { + go func(idx int) { + defer wg.Done() + + subBatch := (nworkers + (batchSize % (i + 1))) + + pts := make([]*Point, 0, subBatch) + encBuffer := make([]byte, 1<<14) + + for { + select { + case <-ch: // new point comming + pts = append(pts, fn()) + + if len(pts) == subBatch { + // try encoding these points + func() { + enc := GetEncoder(WithEncEncoding(Protobuf)) + + defer PutEncoder(enc) + enc.EncodeV2(pts) + parts := 0 + + for { + enc.pbpts.Size() + if x, ok := enc.Next(encBuffer); !ok { + if enc.LastErr() != nil { + t.Errorf("encode error: %s", enc.LastErr()) + } else { + assert.Equal(t, enc.pbpts.Size(), len(x)) + break + } + } else { + parts++ + } + } + }() + + for _, pt := range pts { + pp.Put(pt) + } + pts = pts[:0] //clear slice + } + + case <-chExit: + return + } + } + }(i) + } + + // send jobs + for i := 0; i < npts; i++ { + ch <- i + if i%(npts/nworkers) == 0 { + t.Logf("job %d done", i) + } + } + + // signal exit and wait + close(chExit) + wg.Wait() + + // show p8s metrics + mfs, err := metrics.Gather() + assert.NoError(t, err) + + t.Logf("\n%s", metrics.MetricFamily2Text(mfs)) +} + +func TestPoolKVResuable(t *T.T) { cases := []struct { name string pp PointPool From 6b3f46455ac583f7fd6779a21c32b622dcaf0be1 Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 27 May 2024 09:17:22 +0800 Subject: [PATCH 4/6] add new error when protobuf encoding index out of range --- point/point.pb.go | 51 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/point/point.pb.go b/point/point.pb.go index 42d3301e..ebcfb549 100644 --- a/point/point.pb.go +++ b/point/point.pb.go @@ -1,21 +1,19 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: point.proto - package point import ( bytes "bytes" encoding_binary "encoding/binary" fmt "fmt" - proto "github.com/gogo/protobuf/proto" - github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" - types "github.com/gogo/protobuf/types" io "io" math "math" math_bits "math/bits" reflect "reflect" strconv "strconv" strings "strings" + + proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" + types "github.com/gogo/protobuf/types" ) // Reference imports to suppress errors if they are not otherwise used. @@ -1765,6 +1763,9 @@ func (m *BasicTypes) MarshalToSizedBuffer(dAtA []byte) (int, error) { { size := m.X.Size() i -= size + if i < 0 { + return -1, ErrPBMarshalOutOfRange + } if _, err := m.X.MarshalTo(dAtA[i:]); err != nil { return 0, err } @@ -1885,6 +1886,9 @@ func (m *Array) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size + if i < 0 { + return -1, ErrPBMarshalOutOfRange + } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -1925,6 +1929,9 @@ func (m *Map) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size + if i < 0 { + return -1, ErrPBMarshalOutOfRange + } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -1967,6 +1974,12 @@ func (m *Field) MarshalToSizedBuffer(dAtA []byte) (int, error) { { size := m.Val.Size() i -= size + if i < 0 { + return -1, + fmt.Errorf("%w: kv: %+#v, size: %d, buf size: %d", + ErrPBMarshalOutOfRange, m, size, len(dAtA)) + } + if _, err := m.Val.MarshalTo(dAtA[i:]); err != nil { return 0, err } @@ -2088,6 +2101,10 @@ func (m *Field_A) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size + if i < 0 { + return -1, ErrPBMarshalOutOfRange + } + i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -2174,6 +2191,9 @@ func (m *PBPoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size + if i < 0 { + return -1, ErrPBMarshalOutOfRange + } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -2187,7 +2207,12 @@ func (m *PBPoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { if err != nil { return 0, err } + i -= size + if i < 0 { + return -1, ErrPBMarshalOutOfRange + } + i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -2207,6 +2232,9 @@ func (m *PBPoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size + if i < 0 { + return -1, ErrPBMarshalOutOfRange + } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -2235,7 +2263,11 @@ func (m *PBPoints) Marshal() (dAtA []byte, err error) { func (m *PBPoints) MarshalTo(dAtA []byte) (int, error) { size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if n != size { + return 0, fmt.Errorf("%w: expect %d, got %d", ErrPBSizeNotEqual, size, n) + } + return n, err } func (m *PBPoints) MarshalToSizedBuffer(dAtA []byte) (int, error) { @@ -2251,6 +2283,9 @@ func (m *PBPoints) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size + if i < 0 { + return -1, ErrPBMarshalOutOfRange + } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -4267,4 +4302,6 @@ var ( ErrInvalidLengthPoint = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowPoint = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroupPoint = fmt.Errorf("proto: unexpected end of group") + ErrPBMarshalOutOfRange = fmt.Errorf("marshal out of range") + ErrPBSizeNotEqual = fmt.Errorf("encode size not equal to data size") ) From b99b311e04be7a255a29c0afdf473fa25f4b6021 Mon Sep 17 00:00:00 2001 From: coanor Date: Sat, 1 Jun 2024 15:13:05 +0800 Subject: [PATCH 5/6] recover proto generated file --- point/point.pb.go | 51 +++++++---------------------------------------- 1 file changed, 7 insertions(+), 44 deletions(-) diff --git a/point/point.pb.go b/point/point.pb.go index ebcfb549..42d3301e 100644 --- a/point/point.pb.go +++ b/point/point.pb.go @@ -1,19 +1,21 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: point.proto + package point import ( bytes "bytes" encoding_binary "encoding/binary" fmt "fmt" + proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" + types "github.com/gogo/protobuf/types" io "io" math "math" math_bits "math/bits" reflect "reflect" strconv "strconv" strings "strings" - - proto "github.com/gogo/protobuf/proto" - github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" - types "github.com/gogo/protobuf/types" ) // Reference imports to suppress errors if they are not otherwise used. @@ -1763,9 +1765,6 @@ func (m *BasicTypes) MarshalToSizedBuffer(dAtA []byte) (int, error) { { size := m.X.Size() i -= size - if i < 0 { - return -1, ErrPBMarshalOutOfRange - } if _, err := m.X.MarshalTo(dAtA[i:]); err != nil { return 0, err } @@ -1886,9 +1885,6 @@ func (m *Array) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size - if i < 0 { - return -1, ErrPBMarshalOutOfRange - } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -1929,9 +1925,6 @@ func (m *Map) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size - if i < 0 { - return -1, ErrPBMarshalOutOfRange - } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -1974,12 +1967,6 @@ func (m *Field) MarshalToSizedBuffer(dAtA []byte) (int, error) { { size := m.Val.Size() i -= size - if i < 0 { - return -1, - fmt.Errorf("%w: kv: %+#v, size: %d, buf size: %d", - ErrPBMarshalOutOfRange, m, size, len(dAtA)) - } - if _, err := m.Val.MarshalTo(dAtA[i:]); err != nil { return 0, err } @@ -2101,10 +2088,6 @@ func (m *Field_A) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size - if i < 0 { - return -1, ErrPBMarshalOutOfRange - } - i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -2191,9 +2174,6 @@ func (m *PBPoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size - if i < 0 { - return -1, ErrPBMarshalOutOfRange - } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -2207,12 +2187,7 @@ func (m *PBPoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { if err != nil { return 0, err } - i -= size - if i < 0 { - return -1, ErrPBMarshalOutOfRange - } - i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -2232,9 +2207,6 @@ func (m *PBPoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size - if i < 0 { - return -1, ErrPBMarshalOutOfRange - } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -2263,11 +2235,7 @@ func (m *PBPoints) Marshal() (dAtA []byte, err error) { func (m *PBPoints) MarshalTo(dAtA []byte) (int, error) { size := m.Size() - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if n != size { - return 0, fmt.Errorf("%w: expect %d, got %d", ErrPBSizeNotEqual, size, n) - } - return n, err + return m.MarshalToSizedBuffer(dAtA[:size]) } func (m *PBPoints) MarshalToSizedBuffer(dAtA []byte) (int, error) { @@ -2283,9 +2251,6 @@ func (m *PBPoints) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size - if i < 0 { - return -1, ErrPBMarshalOutOfRange - } i = encodeVarintPoint(dAtA, i, uint64(size)) } i-- @@ -4302,6 +4267,4 @@ var ( ErrInvalidLengthPoint = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowPoint = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroupPoint = fmt.Errorf("proto: unexpected end of group") - ErrPBMarshalOutOfRange = fmt.Errorf("marshal out of range") - ErrPBSizeNotEqual = fmt.Errorf("encode size not equal to data size") ) From bac55ba59ecdc8d43da5a0e404703265b7d582d1 Mon Sep 17 00:00:00 2001 From: coanor Date: Sat, 1 Jun 2024 15:49:59 +0800 Subject: [PATCH 6/6] save --- point/ptpool_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/point/ptpool_test.go b/point/ptpool_test.go index 71a75584..96f1f15a 100644 --- a/point/ptpool_test.go +++ b/point/ptpool_test.go @@ -537,7 +537,6 @@ type Foo struct { } func TestPoolKVResuableConcurrently(t *T.T) { - type foo struct { Measurement string @@ -652,7 +651,7 @@ func TestPoolKVResuableConcurrently(t *T.T) { }) nworkers := 32 - npts := (1 << 30) + npts := (1 << 16) wg := sync.WaitGroup{} wg.Add(nworkers)