Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ FlyDB is released under the Apache license. For details, see LICENSE file.

## Thanks To JetBrains

> Thanks to `JetBrains` for the free open source license.
> Thanks to `JetBrains` for the free open source license.

<img src="./assets/thanks-jetbrains.png" alt="FlyDB-logo" style="display: block; margin: 0 auto; width: 30%;" />

19 changes: 13 additions & 6 deletions engine/benchmark/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/ByteStorage/FlyDB/flydb"
_const "github.com/ByteStorage/FlyDB/lib/const"
"math/rand"
"path/filepath"
"os"
"testing"
"time"
)
Expand All @@ -34,18 +34,17 @@ func GetValue() []byte {
return str.Bytes()
}

func init() {
func Benchmark_PutValue_FlyDB(b *testing.B) {
opts := config.DefaultOptions
opts.DirPath = filepath.Join("benchmark", "flydbtest")
dir, _ := os.MkdirTemp("", "flydbtest")
opts.DirPath = dir

FlyDB, err = flydb.NewFlyDB(opts)
defer FlyDB.Clean()
if err != nil {
panic(err)
}
}

func Benchmark_PutValue_FlyDB(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

Expand All @@ -58,6 +57,15 @@ func Benchmark_PutValue_FlyDB(b *testing.B) {
}

func Benchmark_GetValue_FlyDB(b *testing.B) {
opts := config.DefaultOptions
opts.DirPath = "/tmp/FlyDB"

FlyDB, err = flydb.NewFlyDB(opts)
defer FlyDB.Close()
if err != nil {
panic(err)
}

for i := 0; i < 500000; i++ {
err = FlyDB.Put(GetKey(i), GetValue())
if err != nil {
Expand All @@ -74,5 +82,4 @@ func Benchmark_GetValue_FlyDB(b *testing.B) {
panic(err)
}
}

}
3 changes: 1 addition & 2 deletions engine/data/data_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,10 @@ func (df *DataFile) ReadLogRecord(offset int64) (*LogRecord, int64, error) {
}

func (df *DataFile) Write(buf []byte) error {
size, err := df.IoManager.Write(buf)
_, err := df.IoManager.Write(buf)
if err != nil {
return err
}
df.WriteOff += int64(size)
return nil
}

Expand Down
15 changes: 8 additions & 7 deletions engine/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ func (db *DB) appendLogRecord(logRecord *data2.LogRecord) (*data2.LogRecordPst,

// Write data coding
encRecord, size := data2.EncodeLogRecord(logRecord)
if db.activeFile.WriteOff+size > db.options.DataFileSize {
var writeOff int64
var ok bool
for writeOff, ok = SingleOffset().CanWrite(db.activeFile.FileID, db.options.DataFileSize, size); !ok; writeOff, ok = SingleOffset().CanWrite(db.activeFile.FileID, db.options.DataFileSize, size) {
// Persisting data files to ensure that existing data is persisted to disk
if err := db.activeFile.Sync(); err != nil {
return nil, err
Expand All @@ -207,7 +209,6 @@ func (db *DB) appendLogRecord(logRecord *data2.LogRecord) (*data2.LogRecordPst,
}
}

writeOff := db.activeFile.WriteOff
if err := db.activeFile.Write(encRecord); err != nil {
return nil, err
}
Expand Down Expand Up @@ -243,6 +244,9 @@ func (db *DB) setActiveDataFile() error {
return err
}
db.activeFile = dataFile

size, _ := dataFile.IoManager.Size()
SingleOffset().AddNew(initialFileID, size)
return nil
}

Expand Down Expand Up @@ -466,7 +470,7 @@ func (db *DB) loadIndexFromDataFiles() error {
var currentSeqNo = nonTransactionSeqNo

// Iterate through all file ids, processing records in the file
for i, fid := range db.fileIds {
for _, fid := range db.fileIds {
var fileID = uint32(fid)
// If the id is smaller than that of the file that did not participate in the merge recently,
// the hint file has been loaded
Expand Down Expand Up @@ -531,10 +535,7 @@ func (db *DB) loadIndexFromDataFiles() error {
offset += size
}

// If it is a current active file, update writeOff for this file
if i == len(db.fileIds)-1 {
db.activeFile.WriteOff = offset
}
SingleOffset().AddNew(fileID, offset)
}

// Update the transaction sequence number to the database field
Expand Down
110 changes: 16 additions & 94 deletions engine/fileio/mmap_io.go
Original file line number Diff line number Diff line change
@@ -1,107 +1,29 @@
package fileio

import (
"errors"
"github.com/edsrzf/mmap-go"
"os"
"sync"
)

type MMapIO struct {
fd *os.File // system file descriptor
data mmap.MMap // the mapping area corresponding to the file
dirty bool // has changed
offset int64 // next write location
fileSize int64 // max file size
type mmapFileController struct {
lock sync.Mutex
files map[string]*MMapIO
}

// NewMMapIOManager Initialize Mmap IO
func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) {
mmapIO := &MMapIO{fileSize: fileSize}

fd, err := os.OpenFile(
fileName,
os.O_CREATE|os.O_RDWR|os.O_APPEND,
DataFilePerm,
)
if err != nil {
return nil, err
}
info, _ := fd.Stat()

// Expand files to maximum file size, crop when saving
if err := fd.Truncate(fileSize); err != nil {
return nil, err
}

// Building mappings between memory and disk files
b, err := mmap.Map(fd, mmap.RDWR, 0)
if err != nil {
return nil, err
}

mmapIO.fd = fd
mmapIO.data = b
mmapIO.offset = info.Size()
return mmapIO, nil
}

// Read Copy data from the mapping area to byte slice
func (mio *MMapIO) Read(b []byte, offset int64) (int, error) {
return copy(b, mio.data[offset:]), nil
}

// Write Copy data from byte slice to the mapping area
func (mio *MMapIO) Write(b []byte) (int, error) {
oldOffset := mio.offset
newOffset := mio.offset + int64(len(b))
if newOffset > mio.fileSize {
return 0, errors.New("exceed file max content length")
}

mio.offset = newOffset
mio.dirty = true
return copy(mio.data[oldOffset:], b), nil
var controller = mmapFileController{
lock: sync.Mutex{},
files: map[string]*MMapIO{},
}

// Sync Synchronize data from memory to disk
func (mio *MMapIO) Sync() error {
if !mio.dirty {
return nil
}

if err := mio.data.Flush(); err != nil {
return err
}

mio.dirty = false
return nil
}
// NewMMapIOManager Initialize Mmap IO
func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) {
controller.lock.Lock()
defer controller.lock.Unlock()

// Close file
func (mio *MMapIO) Close() (err error) {
if err = mio.fd.Truncate(mio.offset); err != nil {
return err
}
if err = mio.Sync(); err != nil {
return err
if v, ok := controller.files[fileName]; ok {
v.AddCount()
return v, nil
}
if err = mio.UnMap(); err != nil {
panic(err)
}
return mio.fd.Close()
}

// Size return the size of current file
func (mio *MMapIO) Size() (int64, error) {
return mio.offset, nil
}

// UnMap Unmapping between memory and files
func (mio *MMapIO) UnMap() error {
if mio.data == nil {
return nil
}
err := mio.data.Unmap()
mio.data = nil
return err
manager, err := (&MMapIO{fileName: fileName, fileSize: fileSize}).Init()
return manager, err
}
125 changes: 125 additions & 0 deletions engine/fileio/mmap_io_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//go:build linux

package fileio

import (
"errors"
atomic2 "go.uber.org/atomic"
"os"
"syscall"
"unsafe"
)

type MMapIO struct {
fd *os.File // system file descriptor
data []byte // the mapping area corresponding to the file
offset int64 // next write location
fileSize int64 // max file size
fileName string
count atomic2.Int32 // the count of dbs using this mmap io
}

func (mio *MMapIO) Init() (*MMapIO, error) {
fd, err := os.OpenFile(mio.fileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, DataFilePerm)
if err != nil {
return nil, err
}
mio.fd = fd

info, _ := fd.Stat()
mio.offset = info.Size()

// Expand files to maximum file size, crop when saving
if err := fd.Truncate(mio.fileSize); err != nil {
return nil, err
}

// Building mappings between memory and disk files
b, err := syscall.Mmap(int(mio.fd.Fd()), 0, int(mio.fileSize),
syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
mio.data = b

return mio, nil
}

// UnMapUnix Unmapping between memory and files
func (mio *MMapIO) unmap() error {
if mio.data == nil {
return nil
}

err := syscall.Munmap(mio.data)
mio.data = nil

return err
}

// Read Copy data from the mapping area to byte slice
func (mio *MMapIO) Read(b []byte, offset int64) (int, error) {
return copy(b, mio.data[offset:]), nil
}

// Write Copy data from byte slice to the mapping area
func (mio *MMapIO) Write(b []byte) (int, error) {
oldOffset := mio.offset
newOffset := mio.offset + int64(len(b))
if newOffset > mio.fileSize {
return 0, errors.New("exceed file max content length")
}

mio.offset = newOffset
return copy(mio.data[oldOffset:newOffset], b), nil
}

// Sync Synchronize data from memory to disk
func (mio *MMapIO) Sync() error {
_, _, err := syscall.Syscall(syscall.SYS_MSYNC, uintptr(unsafe.Pointer(&mio.data[0])), uintptr(mio.offset), uintptr(syscall.MS_SYNC))
if err != 0 {
return err
}
return nil
}

// Size return the size of current file
func (mio *MMapIO) Size() (int64, error) {
return mio.offset, nil
}

// Close file
func (mio *MMapIO) Close() (err error) {
controller.lock.Lock()
defer controller.lock.Unlock()

mio.SubCount()
if mio.GetCount() > 0 {
return nil
}

delete(controller.files, mio.fileName)

if err = mio.fd.Truncate(mio.offset); err != nil {
return err
}
if err = mio.Sync(); err != nil {
return err
}
if err = mio.unmap(); err != nil {
panic(err)
}
return mio.fd.Close()
}

func (mio *MMapIO) GetCount() int32 {
return mio.count.Load()
}

func (mio *MMapIO) AddCount() {
mio.count.Add(1)
}

func (mio *MMapIO) SubCount() {
mio.count.Add(-1)
}
Loading