diff --git a/point/check.go b/point/check.go index 02291db7..c7110326 100644 --- a/point/check.go +++ b/point/check.go @@ -45,8 +45,8 @@ func (c *checker) checkMeasurement(m string) string { if c.cfg.maxMeasurementLen > 0 && len(m) > c.cfg.maxMeasurementLen { c.addWarn(WarnInvalidMeasurement, - fmt.Sprintf("exceed max measurement length(%d), got length %d, trimmed", - c.cfg.maxMeasurementLen, len(m))) + fmt.Sprintf("%s: exceed max measurement length(%d), got length %d, trimmed", + m, c.cfg.maxMeasurementLen, len(m))) return m[:c.cfg.maxMeasurementLen] } else { return m @@ -151,8 +151,8 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { if c.cfg.maxTagValLen > 0 && len(x.S) > c.cfg.maxTagValLen { c.addWarn(WarnMaxTagValueLen, - fmt.Sprintf("exceed max tag value length(%d), got %d, value truncated", - c.cfg.maxTagValLen, len(x.S))) + fmt.Sprintf("%s: exceed max tag value length(%d), got %d, value truncated", + x, c.cfg.maxTagValLen, len(x.S))) x.S = x.S[:c.cfg.maxTagValLen] f.Val = x @@ -160,7 +160,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { // check tag key '\', '\n' if strings.HasSuffix(f.Key, `\`) || strings.Contains(f.Key, "\n") { - c.addWarn(WarnInvalidTagKey, fmt.Sprintf("invalid tag key `%s'", f.Key)) + c.addWarn(WarnInvalidTagKey, fmt.Sprintf("%s: invalid tag key `%s'", f, f.Key)) newKey := adjustKV(f.Key) if c.keyConflict(newKey, kvs) { @@ -172,7 +172,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { // check tag value: '\', '\n' if strings.HasSuffix(f.GetS(), `\`) || strings.Contains(f.GetS(), "\n") { - c.addWarn(WarnInvalidTagValue, fmt.Sprintf("invalid tag value %q", f.GetS())) + c.addWarn(WarnInvalidTagValue, fmt.Sprintf("%s: invalid tag value %q", f, f.GetS())) x.S = adjustKV(x.S) f.Val = x @@ -180,7 +180,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { // replace `.' with `_' in tag keys if strings.Contains(f.Key, ".") && !c.cfg.enableDotInKey { - c.addWarn(WarnInvalidTagKey, fmt.Sprintf("invalid tag key `%s': found `.'", f.Key)) + c.addWarn(WarnInvalidTagKey, fmt.Sprintf("%s: invalid tag key `%s': found `.'", f, f.Key)) newKey := strings.ReplaceAll(f.Key, ".", "_") if c.keyConflict(newKey, kvs) { @@ -191,7 +191,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { } if c.keyDisabled(f.Key) { - c.addWarn(WarnTagDisabled, fmt.Sprintf("tag key `%s' disabled", f.Key)) + c.addWarn(WarnTagDisabled, fmt.Sprintf("%s: tag key `%s' disabled", f, f.Key)) return f, false } @@ -204,8 +204,8 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { // trim key if c.cfg.maxFieldKeyLen > 0 && len(f.Key) > c.cfg.maxFieldKeyLen { c.addWarn(WarnMaxFieldKeyLen, - fmt.Sprintf("exceed max field key length(%d), got %d, key truncated to %s", - c.cfg.maxFieldKeyLen, len(f.Key), f.Key)) + fmt.Sprintf("%s: exceed max field key length(%d), got %d, key truncated to %s", + f, c.cfg.maxFieldKeyLen, len(f.Key), f.Key)) newKey := f.Key[:c.cfg.maxFieldKeyLen] @@ -218,7 +218,7 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if strings.Contains(f.Key, ".") && !c.cfg.enableDotInKey { c.addWarn(WarnDotInkey, - fmt.Sprintf("invalid field key `%s': found `.'", f.Key)) + fmt.Sprintf("%s: invalid field key `%s': found `.'", f, f.Key)) newKey := strings.ReplaceAll(f.Key, ".", "_") if c.keyConflict(newKey, kvs) { @@ -230,7 +230,7 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if c.keyDisabled(f.Key) { c.addWarn(WarnFieldDisabled, - fmt.Sprintf("field key `%s' disabled, value: %v", f.Key, f.Raw())) + fmt.Sprintf("%s: field key `%s' disabled, value: %v", f, f.Key, f.Raw())) return nil, false } @@ -239,7 +239,8 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if !c.cfg.enableU64Field { if x.U > uint64(math.MaxInt64) { c.addWarn(WarnMaxFieldValueInt, - fmt.Sprintf("too large int field: key=%s, value=%d(> %d)", f.Key, x.U, uint64(math.MaxInt64))) + fmt.Sprintf("%s: too large int field: key=%s, value=%d(> %d)", + f, f.Key, x.U, uint64(math.MaxInt64))) return f, false } else { // Force convert uint64 to int64: to disable line proto like @@ -265,14 +266,14 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if !c.cfg.enableStrField { c.addWarn(WarnInvalidFieldValueType, - fmt.Sprintf("field(%s) dropped with string value, when [DisableStringField] enabled", f.Key)) + fmt.Sprintf("%s: field(%s) dropped with string value, when [DisableStringField] enabled", f, f.Key)) return f, false } if c.cfg.maxFieldValLen > 0 && len(x.D) > c.cfg.maxFieldValLen { c.addWarn(WarnMaxFieldValueLen, - fmt.Sprintf("field (%s) exceed max field value length(%d), got %d, value truncated", - f.Key, c.cfg.maxFieldValLen, len(x.D))) + fmt.Sprintf("%s: field (%s) exceed max field value length(%d), got %d, value truncated", + f, f.Key, c.cfg.maxFieldValLen, len(x.D))) x.D = x.D[:c.cfg.maxFieldValLen] f.Val = x @@ -282,14 +283,14 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { if !c.cfg.enableStrField { c.addWarn(WarnInvalidFieldValueType, - fmt.Sprintf("field(%s) dropped with string value, when [DisableStringField] enabled", f.Key)) + fmt.Sprintf("%s: field(%s) dropped with string value, when [DisableStringField] enabled", f, f.Key)) return f, false } if c.cfg.maxFieldValLen > 0 && len(x.S) > c.cfg.maxFieldValLen { c.addWarn(WarnMaxFieldValueLen, - fmt.Sprintf("field (%s) exceed max field value length(%d), got %d, value truncated", - f.Key, c.cfg.maxFieldValLen, len(x.S))) + fmt.Sprintf("%s: field (%s) exceed max field value length(%d), got %d, value truncated", + f, f.Key, c.cfg.maxFieldValLen, len(x.S))) x.S = x.S[:c.cfg.maxFieldValLen] f.Val = x @@ -297,8 +298,7 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { default: c.addWarn(WarnInvalidFieldValueType, - fmt.Sprintf("invalid field (%s), value: %s, type: %s", - f.Key, f.Val, reflect.TypeOf(f.Val))) + fmt.Sprintf("%s: invalid field (%s), value: %s, type: %s", f, f.Key, f.Val, reflect.TypeOf(f.Val))) return f, false } diff --git a/point/encode.go b/point/encode.go index 1a2010eb..6d12a99c 100644 --- a/point/encode.go +++ b/point/encode.go @@ -317,14 +317,14 @@ func (e *Encoder) doEncodeProtobuf(buf []byte) ([]byte, bool) { // e.pbpts size larger than buf, we must trim some of points // until size fit ok or MarshalTo will panic. - if curSize >= len(buf) { + if curSize >= cap(buf) { if len(e.pbpts.Arr) <= 1 { // nothing to trim e.lastErr = errTooSmallBuffer return nil, false } for { - if pbptsSize = e.pbpts.Size(); pbptsSize > len(buf) { + if pbptsSize = e.pbpts.Size(); pbptsSize > cap(buf) { e.pbpts.Arr = e.pbpts.Arr[:len(e.pbpts.Arr)-trimmed] e.lastPtsIdx -= trimmed trimmed *= 2 diff --git a/point/encode_test.go b/point/encode_test.go index 1bc12e66..ddfac849 100644 --- a/point/encode_test.go +++ b/point/encode_test.go @@ -500,7 +500,7 @@ func TestV2Encode(t *T.T) { var ( decodePts []*Point round int - buf = make([]byte, 1<<20) // KB + buf = make([]byte, 0, 1<<20) // KB ) for { diff --git a/point/point.go b/point/point.go index 50c74b25..cc3fe17a 100644 --- a/point/point.go +++ b/point/point.go @@ -466,16 +466,7 @@ func (p *Point) LPSize() int { // PBSize get point protobuf size. func (p *Point) PBSize() int { - pbpt := p.PBPoint() - - m := protojson.Marshaler{} - buf := bytes.Buffer{} - - if err := m.Marshal(&buf, pbpt); err != nil { - return 0 - } - - return buf.Len() + return p.pt.Size() } func b64(x []byte) string { diff --git a/point/ptpool_test.go b/point/ptpool_test.go index be2033aa..96f1f15a 100644 --- a/point/ptpool_test.go +++ b/point/ptpool_test.go @@ -491,15 +491,60 @@ func TestReservedCapPointPool(t *T.T) { }) } -func TestPoolKVResuable(t *T.T) { - type Foo struct { +type Foo struct { + Measurement string + + TS int64 + + // tags + T1Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + T2Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + T3Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + + T1 string + T2 string + T3 string + + SKey, S string `fake:"{regex:[a-zA-Z0-9]{128}}"` + + I8Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I16Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I64 int64 + I8 int8 + I16 int16 + I32 int32 + + U8Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + U16Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + U32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + U64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + + U8 uint8 + U16 uint16 + U32 uint32 + U64 uint64 + + BKey string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + DKey string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + F64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + F32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + B bool + D []byte + F64 float64 + F32 float32 +} + +func TestPoolKVResuableConcurrently(t *T.T) { + type foo struct { Measurement string TS int64 // tags - T1Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - T2Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + T1Key string `fake:"{regex:[a-zA-Z0-9_]{16}}"` + T2Key string `fake:"{regex:[a-zA-Z0-9_]{32}}"` T3Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` T1 string @@ -508,18 +553,18 @@ func TestPoolKVResuable(t *T.T) { SKey, S string `fake:"{regex:[a-zA-Z0-9]{128}}"` - I8Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - I16Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - I32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + I8Key string `fake:"{regex:[a-zA-Z0-9_]{8}}"` + I16Key string `fake:"{regex:[a-zA-Z0-9_]{16}}"` + I32Key string `fake:"{regex:[a-zA-Z0-9_]{32}}"` I64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` I64 int64 I8 int8 I16 int16 I32 int32 - U8Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - U16Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - U32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + U8Key string `fake:"{regex:[a-zA-Z0-9_]{8}}"` + U16Key string `fake:"{regex:[a-zA-Z0-9_]{16}}"` + U32Key string `fake:"{regex:[a-zA-Z0-9_]{32}}"` U64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` U8 uint8 @@ -527,9 +572,9 @@ func TestPoolKVResuable(t *T.T) { U32 uint32 U64 uint64 - BKey string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - DKey string `fake:"{regex:[a-zA-Z0-9_]{64}}"` - F64Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` + BKey string `fake:"{regex:[a-zA-Z0-9_]{8}}"` + DKey string `fake:"{regex:[a-zA-Z0-9_]{16}}"` + F64Key string `fake:"{regex:[a-zA-Z0-9_]{32}}"` F32Key string `fake:"{regex:[a-zA-Z0-9_]{64}}"` B bool D []byte @@ -537,6 +582,154 @@ func TestPoolKVResuable(t *T.T) { F32 float32 } + fn := func() *Point { + var f foo + assert.NoError(t, gofakeit.Struct(&f)) + var kvs KVs + kvs = kvs.AddTag("T_"+f.T1Key, f.T1) + kvs = kvs.AddTag("T_"+f.T2Key, f.T2) + kvs = kvs.AddTag("T_"+f.T3Key, f.T3) + + kvs = kvs.AddV2("S_"+f.SKey, f.S, true) + + kvs = kvs.AddV2("I8_"+f.I8Key, f.I8, true) + kvs = kvs.AddV2("I16_"+f.I16Key, f.I16, true) + kvs = kvs.AddV2("I32_"+f.I32Key, f.I32, true) + kvs = kvs.AddV2("I64_"+f.I64Key, f.I64, true) + + kvs = kvs.AddV2("U8_"+f.U8Key, f.U8, true) + kvs = kvs.AddV2("U16_"+f.U16Key, f.U16, true) + kvs = kvs.AddV2("U32_"+f.U32Key, f.U32, true) + kvs = kvs.AddV2("U64_"+f.U64Key, f.U64, true) + + kvs = kvs.AddV2("F32_"+f.F32Key, f.F32, true) + kvs = kvs.AddV2("F64_"+f.F64Key, f.F64, true) + + kvs = kvs.AddV2("B_"+f.BKey, f.B, true) + kvs = kvs.AddV2("D_"+f.DKey, f.D, true) + + if f.TS < 0 { + f.TS = 0 + } + + pt := NewPointV2(f.Measurement, kvs, WithTimestamp(f.TS)) + + require.Equal(t, f.T1, pt.Get("T_"+f.T1Key)) + require.Equal(t, f.T2, pt.Get("T_"+f.T2Key)) + require.Equal(t, f.T3, pt.Get("T_"+f.T3Key)) + + require.Equal(t, f.S, pt.Get("S_"+f.SKey)) + + require.Equal(t, int64(f.I8), pt.Get("I8_"+f.I8Key)) + require.Equalf(t, int64(f.I16), pt.Get("I16_"+f.I16Key), "got %s", pt.Pretty()) + require.Equal(t, int64(f.I32), pt.Get("I32_"+f.I32Key)) + require.Equal(t, f.I64, pt.Get("I64_"+f.I64Key)) + + require.Equal(t, uint64(f.U8), pt.Get("U8_"+f.U8Key)) + require.Equal(t, uint64(f.U16), pt.Get("U16_"+f.U16Key)) + require.Equal(t, uint64(f.U32), pt.Get("U32_"+f.U32Key)) + require.Equal(t, f.U64, pt.Get("U64_"+f.U64Key)) + + require.Equal(t, f.B, pt.Get("B_"+f.BKey), "got %s", pt.Pretty()) + require.Equal(t, f.D, pt.Get("D_"+f.DKey)) + require.Equalf(t, float64(f.F32), pt.Get("F32_"+f.F32Key), "got %s", pt.Pretty()) + require.Equal(t, f.F64, pt.Get("F64_"+f.F64Key)) + + require.Equal(t, f.TS, pt.Time().UnixNano(), "got %s", pt.Pretty()) + + return pt + } + + pp := NewReservedCapPointPool(1 << 14) // 16kb + SetPointPool(pp) + + metrics.MustRegister(pp) + + t.Cleanup(func() { + ClearPointPool() + metrics.Unregister(pp) + }) + + nworkers := 32 + npts := (1 << 16) + + wg := sync.WaitGroup{} + wg.Add(nworkers) + ch := make(chan int) + chExit := make(chan any) + batchSize := npts / nworkers + + for i := 0; i < nworkers; i++ { + go func(idx int) { + defer wg.Done() + + subBatch := (nworkers + (batchSize % (i + 1))) + + pts := make([]*Point, 0, subBatch) + encBuffer := make([]byte, 1<<14) + + for { + select { + case <-ch: // new point comming + pts = append(pts, fn()) + + if len(pts) == subBatch { + // try encoding these points + func() { + enc := GetEncoder(WithEncEncoding(Protobuf)) + + defer PutEncoder(enc) + enc.EncodeV2(pts) + parts := 0 + + for { + enc.pbpts.Size() + if x, ok := enc.Next(encBuffer); !ok { + if enc.LastErr() != nil { + t.Errorf("encode error: %s", enc.LastErr()) + } else { + assert.Equal(t, enc.pbpts.Size(), len(x)) + break + } + } else { + parts++ + } + } + }() + + for _, pt := range pts { + pp.Put(pt) + } + pts = pts[:0] //clear slice + } + + case <-chExit: + return + } + } + }(i) + } + + // send jobs + for i := 0; i < npts; i++ { + ch <- i + if i%(npts/nworkers) == 0 { + t.Logf("job %d done", i) + } + } + + // signal exit and wait + close(chExit) + wg.Wait() + + // show p8s metrics + mfs, err := metrics.Gather() + assert.NoError(t, err) + + t.Logf("\n%s", metrics.MetricFamily2Text(mfs)) +} + +func TestPoolKVResuable(t *T.T) { cases := []struct { name string pp PointPool