Skip to content
Open
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
main
.vscode
.vscode/
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"os"
"time"

"github.com/SystemBuilders/LocKey/internal/lockservice"
"github.com/SystemBuilders/LocKey/internal/lockservice/node"
Expand All @@ -12,7 +13,7 @@ func main() {
zerolog.New(os.Stdout).With()

log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())
ls := lockservice.NewSimpleLockService(log)
ls := lockservice.NewSimpleLockService(log, 5*time.Second)

scfg := lockservice.NewSimpleConfig("127.0.0.1", "1234")
node.Start(ls, *scfg)
Expand Down
10 changes: 7 additions & 3 deletions internal/lockclient/simple_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ type SimpleClient struct {
// whether the process owning the lock has an active session
// or not, this guarantee has to be ensured by the client.
sessionAcquisitions map[id.ID][]lockservice.Descriptors

sessionDuration time.Duration
}

// NewSimpleClient returns a new SimpleClient of the given parameters.
// This client works with or without the existance of a cache.
func NewSimpleClient(config *lockservice.SimpleConfig, log zerolog.Logger, cache *cache.LRUCache) *SimpleClient {
func NewSimpleClient(config *lockservice.SimpleConfig, log zerolog.Logger, cache *cache.LRUCache,
sessionDuration time.Duration) *SimpleClient {
clientID := id.Create()
sessions := make(map[id.ID]session.Session)
sessionTimers := make(map[id.ID]chan struct{})
Expand All @@ -55,6 +58,7 @@ func NewSimpleClient(config *lockservice.SimpleConfig, log zerolog.Logger, cache
sessions: sessions,
sessionTimers: sessionTimers,
sessionAcquisitions: sessionAcquisitions,
sessionDuration: sessionDuration,
}
}

Expand Down Expand Up @@ -434,8 +438,8 @@ func (sc *SimpleClient) startSession(processID id.ID) {
sc.mu.Lock()
sc.sessionTimers[processID] = timerChan
sc.mu.Unlock()
// Sessions last for 200ms.
time.Sleep(200 * time.Millisecond)
// Sessions last for user configured duration.
time.Sleep(sc.sessionDuration)

sc.mu.Lock()
sc.sessionTimers[processID] <- struct{}{}
Expand Down
49 changes: 37 additions & 12 deletions internal/lockclient/simple_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ func TestLockService(t *testing.T) {

log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())
scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234")
ls := lockservice.NewSimpleLockService(log)
duration := 2 * time.Second // 2 second expiry
sessionDuration := 5 * time.Second
ls := lockservice.NewSimpleLockService(log, duration)

quit := make(chan bool, 1)
go func() {
Expand All @@ -42,7 +44,7 @@ func TestLockService(t *testing.T) {
t.Run("acquire test release test", func(t *testing.T) {
size := 5
cache := cache.NewLRUCache(size)
sc := NewSimpleClient(scfg, log, cache)
sc := NewSimpleClient(scfg, log, cache, sessionDuration)

session := sc.Connect()

Expand Down Expand Up @@ -76,7 +78,7 @@ func TestLockService(t *testing.T) {
t.Run("acquire test, acquire test, release test", func(t *testing.T) {
size := 5
cache := cache.NewLRUCache(size)
sc := NewSimpleClient(scfg, log, cache)
sc := NewSimpleClient(scfg, log, cache, sessionDuration)

session := sc.Connect()
d := lockservice.NewObjectDescriptor("test")
Expand Down Expand Up @@ -104,7 +106,7 @@ func TestLockService(t *testing.T) {
t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) {
size := 2
cache := cache.NewLRUCache(size)
sc := NewSimpleClient(scfg, log, cache)
sc := NewSimpleClient(scfg, log, cache, sessionDuration)

session := sc.Connect()
d := lockservice.NewObjectDescriptor("test")
Expand Down Expand Up @@ -144,7 +146,7 @@ func TestLockService(t *testing.T) {
})

t.Run("acquire test and release after session expiry", func(t *testing.T) {
sc := NewSimpleClient(scfg, log, nil)
sc := NewSimpleClient(scfg, log, nil, sessionDuration)
session := sc.Connect()
d := lockservice.NewObjectDescriptor("test3")

Expand All @@ -154,14 +156,34 @@ func TestLockService(t *testing.T) {
t.Errorf("acquire: got %q want %q", got, want)
}

// Wait for the session to expire
time.Sleep(500 * time.Millisecond)
// Wait for the lock's lease to expire
time.Sleep(3 * time.Second)

got = sc.Release(d, session)
want = ErrSessionNonExistent
want = lockservice.ErrCantReleaseFile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So i changed the test that checks if session expiry works to instead check for lock expiry.

This test highlighted the redundancy for either lock expiry / session expiry. I feel like only one of them is needed. Having both is a bit redundant imo. What do you think @SUMUKHA-PK ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, ok

if got != want {
t.Errorf("release: got %q want %q", got, want)
}
})
t.Run("try acquiring after lock expiry; should succeed", func(t *testing.T) {
sc := NewSimpleClient(scfg, log, nil, sessionDuration)
session := sc.Connect()
d := lockservice.NewObjectDescriptor("test2")

got := sc.Acquire(d, session)
var want error
if got != want {
t.Errorf("acquire: got %q want %q", got, want)
}

// Wait for the lock's lease to expire
time.Sleep(3 * time.Second)

got = sc.Acquire(d, session)
if got != want {
t.Errorf("acquire: got %q want %q", got, want)
}
})

quit <- true
return
Expand All @@ -173,7 +195,7 @@ func BenchmarkLocKeyWithoutCache(b *testing.B) {

log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())
scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234")
ls := lockservice.NewSimpleLockService(log)
ls := lockservice.NewSimpleLockService(log, 5)

quit := make(chan bool, 1)
go func() {
Expand All @@ -188,7 +210,8 @@ func BenchmarkLocKeyWithoutCache(b *testing.B) {
}()
time.Sleep(100 * time.Millisecond)

sc := NewSimpleClient(scfg, log, nil)
sessionDuration := 5 * time.Second
sc := NewSimpleClient(scfg, log, nil, sessionDuration)
session := sc.Connect()
d := lockservice.NewObjectDescriptor("test")
for n := 0; n < b.N; n++ {
Expand All @@ -211,7 +234,8 @@ func BenchmarkLocKeyWithCache(b *testing.B) {

log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())
scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234")
ls := lockservice.NewSimpleLockService(log)
duration := 2 * time.Second // 2 second expiry
ls := lockservice.NewSimpleLockService(log, duration)

quit := make(chan bool, 1)
go func() {
Expand All @@ -228,7 +252,8 @@ func BenchmarkLocKeyWithCache(b *testing.B) {

size := 5
cache := cache.NewLRUCache(size)
sc := NewSimpleClient(scfg, log, cache)
sessionDuration := 5 * time.Second
sc := NewSimpleClient(scfg, log, cache, sessionDuration)
session := sc.Connect()
d := lockservice.NewObjectDescriptor("test")
for n := 0; n < b.N; n++ {
Expand Down
104 changes: 84 additions & 20 deletions internal/lockservice/simpleLockService.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
package lockservice

import (
"fmt"
"sync"
"time"

"github.com/rs/zerolog"
)

// SafeLockMap is the lockserver's data structure
type SafeLockMap struct {
LockMap map[string]string
Mutex sync.Mutex
LockMap map[string]*LockMapEntry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SafeLockMap.LockMap seems weird.
Make the variable as map or something that doesnt repeat weirdly. (Kinda a Go standard)

LeaseDuration time.Duration
Mutex sync.Mutex
}

// LockMapEntry defines the structure for objects placed
// in the LockMap. It consists of the owner of the lock
// that is acquired and the timestamp at which the
// acquisition took place.
type LockMapEntry struct {
owner string
timestamp time.Time
}

// SimpleConfig implements Config.
Expand Down Expand Up @@ -87,6 +99,14 @@ func (sd *LockDescriptor) Owner() string {
return sd.UserID
}

// NewLockMapEntry returns an instance of a LockMapEntry
func NewLockMapEntry(owner string, timestamp time.Time) *LockMapEntry {
return &LockMapEntry{
owner: owner,
timestamp: timestamp,
}
}

// NewSimpleConfig returns an instance of the SimpleConfig.
func NewSimpleConfig(IPAddr, PortAddr string) *SimpleConfig {
return &SimpleConfig{
Expand All @@ -111,64 +131,108 @@ func NewObjectDescriptor(ObjectID string) *ObjectDescriptor {
}

// NewSimpleLockService creates and returns a new lock service ready to use.
func NewSimpleLockService(log zerolog.Logger) *SimpleLockService {
func NewSimpleLockService(log zerolog.Logger, leaseDuration time.Duration) *SimpleLockService {
safeLockMap := &SafeLockMap{
LockMap: make(map[string]string),
LockMap: make(map[string]*LockMapEntry),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be a pointer?

}
safeLockMap.LeaseDuration = leaseDuration
return &SimpleLockService{
log: log,
lockMap: safeLockMap,
}
}
func fmtDuration(d time.Duration) string {
d = d.Round(time.Minute)
h := d / time.Hour
d -= h * time.Hour
ms := d / time.Microsecond
return fmt.Sprintf("%02d:%02d", h, ms)
}

// hasLeaseExpired returns true if the lease for a lock has expired and
// false if the lease is still valid
func hasLeaseExpired(timestamp time.Time, lease time.Duration) bool {
if time.Now().Sub(timestamp) > lease {
return true
}
return false
}

// Acquire function lets a client acquire a lock on an object.
// This lock is valid for a fixed duration that is set in the SafeLockMap.LeaseDuration
// field. Beyond this duration, the lock has expired and the entity that owned the lock
// for this period can no longer release it. The lock is open for acquisition after it
// has expired.
func (ls *SimpleLockService) Acquire(sd Descriptors) error {
ls.lockMap.Mutex.Lock()
if _, ok := ls.lockMap.LockMap[sd.ID()]; ok {

timestamp := ls.lockMap.LockMap[sd.ID()].timestamp
duration := ls.lockMap.LeaseDuration
// If the lock is not present in the LockMap or
// the lock has expired, then allow the acquisition
if _, ok := ls.lockMap.LockMap[sd.ID()]; !ok || hasLeaseExpired(timestamp, duration) {
ls.lockMap.LockMap[sd.ID()] = NewLockMapEntry(sd.Owner(), time.Now())
ls.lockMap.Mutex.Unlock()
ls.
log.
Debug().
Str("descriptor", sd.ID()).
Msg("can't acquire, already been acquired")
return ErrFileacquired
Str("owner", ls.lockMap.LockMap[sd.ID()].owner).
Time("timestamp", ls.lockMap.LockMap[sd.ID()].timestamp).
Msg("locked")
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the if condition has changed. The new if condition is essential NOT of the previous one. If you see the additions at like #188, it contains what has been removed here.

}
ls.lockMap.LockMap[sd.ID()] = sd.Owner()
ls.lockMap.Mutex.Unlock()
// Since the lock is already acquired, return an error
ls.
log.
Debug().
Str("descriptor", sd.ID()).
Str("owner", sd.Owner()).
Msg("locked")
return nil
Msg("can't acquire, already been acquired")
return ErrFileacquired

}

// Release lets a client to release a lock on an object.
func (ls *SimpleLockService) Release(sd Descriptors) error {
ls.lockMap.Mutex.Lock()
timestamp := ls.lockMap.LockMap[sd.ID()].timestamp
duration := ls.lockMap.LeaseDuration
// Only the entity that posseses the lock for this object
// is allowed to release the lock
if ls.lockMap.LockMap[sd.ID()] == sd.Owner() {
delete(ls.lockMap.LockMap, sd.ID())
if _, ok := ls.lockMap.LockMap[sd.ID()]; !ok {
// trying to release an unacquired lock
ls.
log.
Debug().
Str("descriptor", sd.ID()).
Str("owner", sd.Owner()).
Msg("released")
Msg("can't release, hasn't been acquired")
ls.lockMap.Mutex.Unlock()
return nil
} else if _, ok := ls.lockMap.LockMap[sd.ID()]; !ok {
return ErrCantReleaseFile

} else if hasLeaseExpired(timestamp, duration) {
// lease expired
ls.
log.
Debug().
Str("descriptor", sd.ID()).
Msg("can't release, hasn't been acquired")
Msg("can't release, lease of lock has expired")
ls.lockMap.Mutex.Unlock()
return ErrCantReleaseFile

} else if ls.lockMap.LockMap[sd.ID()].owner == sd.Owner() {
// conditions satisfied, lock is released
delete(ls.lockMap.LockMap, sd.ID())
ls.
log.
Debug().
Str("descriptor", sd.ID()).
Str("owner", sd.Owner()).
Msg("released")
ls.lockMap.Mutex.Unlock()
return nil
} else {
// trying to release a lock that you don't own
ls.
log.
Debug().
Expand All @@ -186,14 +250,14 @@ func (ls *SimpleLockService) Release(sd Descriptors) error {
func (ls *SimpleLockService) CheckAcquired(sd Descriptors) (string, bool) {
ls.lockMap.Mutex.Lock()
id := sd.ID()
if owner, ok := ls.lockMap.LockMap[id]; ok {
if entry, ok := ls.lockMap.LockMap[id]; ok {
ls.lockMap.Mutex.Unlock()
ls.
log.
Debug().
Str("descriptor", id).
Msg("checkacquire success")
return owner, true
return entry.owner, true
}
ls.
log.
Expand Down