diff --git a/.gitignore b/.gitignore index 5242542..693daf8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ main -.vscode \ No newline at end of file +.vscode/ \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 865607d..314edee 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,7 @@ package main import ( "os" + "time" "github.com/SystemBuilders/LocKey/internal/lockservice" "github.com/SystemBuilders/LocKey/internal/lockservice/node" @@ -12,7 +13,7 @@ func main() { zerolog.New(os.Stdout).With() log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) - ls := lockservice.NewSimpleLockService(log) + ls := lockservice.NewSimpleLockService(log, 5*time.Second) scfg := lockservice.NewSimpleConfig("127.0.0.1", "1234") node.Start(ls, *scfg) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 411e997..7ad57e3 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -38,11 +38,14 @@ type SimpleClient struct { // whether the process owning the lock has an active session // or not, this guarantee has to be ensured by the client. sessionAcquisitions map[id.ID][]lockservice.Descriptors + + sessionDuration time.Duration } // NewSimpleClient returns a new SimpleClient of the given parameters. // This client works with or without the existance of a cache. -func NewSimpleClient(config *lockservice.SimpleConfig, log zerolog.Logger, cache *cache.LRUCache) *SimpleClient { +func NewSimpleClient(config *lockservice.SimpleConfig, log zerolog.Logger, cache *cache.LRUCache, + sessionDuration time.Duration) *SimpleClient { clientID := id.Create() sessions := make(map[id.ID]session.Session) sessionTimers := make(map[id.ID]chan struct{}) @@ -55,6 +58,7 @@ func NewSimpleClient(config *lockservice.SimpleConfig, log zerolog.Logger, cache sessions: sessions, sessionTimers: sessionTimers, sessionAcquisitions: sessionAcquisitions, + sessionDuration: sessionDuration, } } @@ -434,8 +438,8 @@ func (sc *SimpleClient) startSession(processID id.ID) { sc.mu.Lock() sc.sessionTimers[processID] = timerChan sc.mu.Unlock() - // Sessions last for 200ms. - time.Sleep(200 * time.Millisecond) + // Sessions last for user configured duration. + time.Sleep(sc.sessionDuration) sc.mu.Lock() sc.sessionTimers[processID] <- struct{}{} diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index ef5e36d..e60e3f8 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -17,7 +17,9 @@ func TestLockService(t *testing.T) { log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") - ls := lockservice.NewSimpleLockService(log) + duration := 2 * time.Second // 2 second expiry + sessionDuration := 5 * time.Second + ls := lockservice.NewSimpleLockService(log, duration) quit := make(chan bool, 1) go func() { @@ -42,7 +44,7 @@ func TestLockService(t *testing.T) { t.Run("acquire test release test", func(t *testing.T) { size := 5 cache := cache.NewLRUCache(size) - sc := NewSimpleClient(scfg, log, cache) + sc := NewSimpleClient(scfg, log, cache, sessionDuration) session := sc.Connect() @@ -76,7 +78,7 @@ func TestLockService(t *testing.T) { t.Run("acquire test, acquire test, release test", func(t *testing.T) { size := 5 cache := cache.NewLRUCache(size) - sc := NewSimpleClient(scfg, log, cache) + sc := NewSimpleClient(scfg, log, cache, sessionDuration) session := sc.Connect() d := lockservice.NewObjectDescriptor("test") @@ -104,7 +106,7 @@ func TestLockService(t *testing.T) { t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { size := 2 cache := cache.NewLRUCache(size) - sc := NewSimpleClient(scfg, log, cache) + sc := NewSimpleClient(scfg, log, cache, sessionDuration) session := sc.Connect() d := lockservice.NewObjectDescriptor("test") @@ -144,7 +146,7 @@ func TestLockService(t *testing.T) { }) t.Run("acquire test and release after session expiry", func(t *testing.T) { - sc := NewSimpleClient(scfg, log, nil) + sc := NewSimpleClient(scfg, log, nil, sessionDuration) session := sc.Connect() d := lockservice.NewObjectDescriptor("test3") @@ -154,14 +156,34 @@ func TestLockService(t *testing.T) { t.Errorf("acquire: got %q want %q", got, want) } - // Wait for the session to expire - time.Sleep(500 * time.Millisecond) + // Wait for the lock's lease to expire + time.Sleep(3 * time.Second) + got = sc.Release(d, session) - want = ErrSessionNonExistent + want = lockservice.ErrCantReleaseFile if got != want { t.Errorf("release: got %q want %q", got, want) } }) + t.Run("try acquiring after lock expiry; should succeed", func(t *testing.T) { + sc := NewSimpleClient(scfg, log, nil, sessionDuration) + session := sc.Connect() + d := lockservice.NewObjectDescriptor("test2") + + got := sc.Acquire(d, session) + var want error + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + // Wait for the lock's lease to expire + time.Sleep(3 * time.Second) + + got = sc.Acquire(d, session) + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + }) quit <- true return @@ -173,7 +195,7 @@ func BenchmarkLocKeyWithoutCache(b *testing.B) { log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") - ls := lockservice.NewSimpleLockService(log) + ls := lockservice.NewSimpleLockService(log, 5) quit := make(chan bool, 1) go func() { @@ -188,7 +210,8 @@ func BenchmarkLocKeyWithoutCache(b *testing.B) { }() time.Sleep(100 * time.Millisecond) - sc := NewSimpleClient(scfg, log, nil) + sessionDuration := 5 * time.Second + sc := NewSimpleClient(scfg, log, nil, sessionDuration) session := sc.Connect() d := lockservice.NewObjectDescriptor("test") for n := 0; n < b.N; n++ { @@ -211,7 +234,8 @@ func BenchmarkLocKeyWithCache(b *testing.B) { log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") - ls := lockservice.NewSimpleLockService(log) + duration := 2 * time.Second // 2 second expiry + ls := lockservice.NewSimpleLockService(log, duration) quit := make(chan bool, 1) go func() { @@ -228,7 +252,8 @@ func BenchmarkLocKeyWithCache(b *testing.B) { size := 5 cache := cache.NewLRUCache(size) - sc := NewSimpleClient(scfg, log, cache) + sessionDuration := 5 * time.Second + sc := NewSimpleClient(scfg, log, cache, sessionDuration) session := sc.Connect() d := lockservice.NewObjectDescriptor("test") for n := 0; n < b.N; n++ { diff --git a/internal/lockservice/simpleLockService.go b/internal/lockservice/simpleLockService.go index dc2fa9f..6d06e19 100644 --- a/internal/lockservice/simpleLockService.go +++ b/internal/lockservice/simpleLockService.go @@ -1,15 +1,27 @@ package lockservice import ( + "fmt" "sync" + "time" "github.com/rs/zerolog" ) // SafeLockMap is the lockserver's data structure type SafeLockMap struct { - LockMap map[string]string - Mutex sync.Mutex + LockMap map[string]*LockMapEntry + LeaseDuration time.Duration + Mutex sync.Mutex +} + +// LockMapEntry defines the structure for objects placed +// in the LockMap. It consists of the owner of the lock +// that is acquired and the timestamp at which the +// acquisition took place. +type LockMapEntry struct { + owner string + timestamp time.Time } // SimpleConfig implements Config. @@ -87,6 +99,14 @@ func (sd *LockDescriptor) Owner() string { return sd.UserID } +// NewLockMapEntry returns an instance of a LockMapEntry +func NewLockMapEntry(owner string, timestamp time.Time) *LockMapEntry { + return &LockMapEntry{ + owner: owner, + timestamp: timestamp, + } +} + // NewSimpleConfig returns an instance of the SimpleConfig. func NewSimpleConfig(IPAddr, PortAddr string) *SimpleConfig { return &SimpleConfig{ @@ -111,64 +131,108 @@ func NewObjectDescriptor(ObjectID string) *ObjectDescriptor { } // NewSimpleLockService creates and returns a new lock service ready to use. -func NewSimpleLockService(log zerolog.Logger) *SimpleLockService { +func NewSimpleLockService(log zerolog.Logger, leaseDuration time.Duration) *SimpleLockService { safeLockMap := &SafeLockMap{ - LockMap: make(map[string]string), + LockMap: make(map[string]*LockMapEntry), } + safeLockMap.LeaseDuration = leaseDuration return &SimpleLockService{ log: log, lockMap: safeLockMap, } } +func fmtDuration(d time.Duration) string { + d = d.Round(time.Minute) + h := d / time.Hour + d -= h * time.Hour + ms := d / time.Microsecond + return fmt.Sprintf("%02d:%02d", h, ms) +} + +// hasLeaseExpired returns true if the lease for a lock has expired and +// false if the lease is still valid +func hasLeaseExpired(timestamp time.Time, lease time.Duration) bool { + if time.Now().Sub(timestamp) > lease { + return true + } + return false +} // Acquire function lets a client acquire a lock on an object. +// This lock is valid for a fixed duration that is set in the SafeLockMap.LeaseDuration +// field. Beyond this duration, the lock has expired and the entity that owned the lock +// for this period can no longer release it. The lock is open for acquisition after it +// has expired. func (ls *SimpleLockService) Acquire(sd Descriptors) error { ls.lockMap.Mutex.Lock() - if _, ok := ls.lockMap.LockMap[sd.ID()]; ok { + + timestamp := ls.lockMap.LockMap[sd.ID()].timestamp + duration := ls.lockMap.LeaseDuration + // If the lock is not present in the LockMap or + // the lock has expired, then allow the acquisition + if _, ok := ls.lockMap.LockMap[sd.ID()]; !ok || hasLeaseExpired(timestamp, duration) { + ls.lockMap.LockMap[sd.ID()] = NewLockMapEntry(sd.Owner(), time.Now()) ls.lockMap.Mutex.Unlock() ls. log. Debug(). Str("descriptor", sd.ID()). - Msg("can't acquire, already been acquired") - return ErrFileacquired + Str("owner", ls.lockMap.LockMap[sd.ID()].owner). + Time("timestamp", ls.lockMap.LockMap[sd.ID()].timestamp). + Msg("locked") + return nil } - ls.lockMap.LockMap[sd.ID()] = sd.Owner() ls.lockMap.Mutex.Unlock() + // Since the lock is already acquired, return an error ls. log. Debug(). Str("descriptor", sd.ID()). - Str("owner", sd.Owner()). - Msg("locked") - return nil + Msg("can't acquire, already been acquired") + return ErrFileacquired + } // Release lets a client to release a lock on an object. func (ls *SimpleLockService) Release(sd Descriptors) error { ls.lockMap.Mutex.Lock() + timestamp := ls.lockMap.LockMap[sd.ID()].timestamp + duration := ls.lockMap.LeaseDuration // Only the entity that posseses the lock for this object // is allowed to release the lock - if ls.lockMap.LockMap[sd.ID()] == sd.Owner() { - delete(ls.lockMap.LockMap, sd.ID()) + if _, ok := ls.lockMap.LockMap[sd.ID()]; !ok { + // trying to release an unacquired lock ls. log. Debug(). Str("descriptor", sd.ID()). - Str("owner", sd.Owner()). - Msg("released") + Msg("can't release, hasn't been acquired") ls.lockMap.Mutex.Unlock() - return nil - } else if _, ok := ls.lockMap.LockMap[sd.ID()]; !ok { + return ErrCantReleaseFile + + } else if hasLeaseExpired(timestamp, duration) { + // lease expired ls. log. Debug(). Str("descriptor", sd.ID()). - Msg("can't release, hasn't been acquired") + Msg("can't release, lease of lock has expired") ls.lockMap.Mutex.Unlock() return ErrCantReleaseFile + } else if ls.lockMap.LockMap[sd.ID()].owner == sd.Owner() { + // conditions satisfied, lock is released + delete(ls.lockMap.LockMap, sd.ID()) + ls. + log. + Debug(). + Str("descriptor", sd.ID()). + Str("owner", sd.Owner()). + Msg("released") + ls.lockMap.Mutex.Unlock() + return nil } else { + // trying to release a lock that you don't own ls. log. Debug(). @@ -186,14 +250,14 @@ func (ls *SimpleLockService) Release(sd Descriptors) error { func (ls *SimpleLockService) CheckAcquired(sd Descriptors) (string, bool) { ls.lockMap.Mutex.Lock() id := sd.ID() - if owner, ok := ls.lockMap.LockMap[id]; ok { + if entry, ok := ls.lockMap.LockMap[id]; ok { ls.lockMap.Mutex.Unlock() ls. log. Debug(). Str("descriptor", id). Msg("checkacquire success") - return owner, true + return entry.owner, true } ls. log.