diff --git a/internal/fusekernel/fuse_kernel.go b/internal/fusekernel/fuse_kernel.go index 8c47b524..7bbf8fb3 100644 --- a/internal/fusekernel/fuse_kernel.go +++ b/internal/fusekernel/fuse_kernel.go @@ -803,6 +803,7 @@ const ( NotifyCodePoll int32 = 1 NotifyCodeInvalInode int32 = 2 NotifyCodeInvalEntry int32 = 3 + NotifyCodeStore int32 = 4 ) type NotifyInvalInodeOut struct { @@ -817,6 +818,13 @@ type NotifyInvalEntryOut struct { padding uint32 } +type NotifyStoreOut struct { + NodeID uint64 + Offset int64 + Size uint32 + padding uint32 +} + type SyncFSIn struct { Padding uint64 } diff --git a/notifier.go b/notifier.go index e9ea38bc..d976a366 100644 --- a/notifier.go +++ b/notifier.go @@ -13,12 +13,14 @@ import ( type Notifier struct { inodeInvalidations chan invalidateInodeCommand dentryInvalidations chan invalidateEntryCommand + stores chan storeCommand } func NewNotifier() *Notifier { return &Notifier{ inodeInvalidations: make(chan invalidateInodeCommand), dentryInvalidations: make(chan invalidateEntryCommand), + stores: make(chan storeCommand), } } @@ -37,6 +39,13 @@ type invalidateEntryCommand struct { done chan<- error } +type storeCommand struct { + nodeid fuseops.InodeID + offset int64 + data []byte + done chan<- error +} + // InvalidateInode notifies the kernel to invalidate an inode cache entry. See // the libfuse documentation at // https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#a9cb974af9745294ff446d11cba2422f1 @@ -65,6 +74,19 @@ func (n *Notifier) InvalidateEntry(parent fuseops.InodeID, name string) error { return <-done } +// Store notifies the kernel to store the given data at the offset for the +// target inode. See the libfuse documentation at +// https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#af856725ed4a13ed7c17512554043edbc +// for more details. Note that data must be no more than 4GiB in length. +// +// Store blocks until the kernel write completes, and returns the error from the +// kernel, if any. ENOSYS indicates that the kernel does not support stores. +func (n *Notifier) Store(nodeid fuseops.InodeID, offset int64, data []byte) error { + done := make(chan error) + n.stores <- storeCommand{nodeid, offset, data, done} + return <-done +} + func serviceInodeInvalidation(c *Connection, inode fuseops.InodeID, offset, length int64) error { outMsg := c.getOutMessage() defer c.putOutMessage(outMsg) @@ -101,6 +123,23 @@ func serviceEntryInval(c *Connection, parent fuseops.InodeID, name string) error return c.writeOutMessage(outMsg) } +func serviceStore(c *Connection, nodeid fuseops.InodeID, offset int64, data []byte) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + + cmd := fusekernel.NotifyStoreOut{ + NodeID: uint64(nodeid), + Offset: offset, + Size: uint32(len(data)), + } + outMsg.Append(unsafe.Slice((*byte)(unsafe.Pointer(&cmd)), int(unsafe.Sizeof(cmd)))) + outMsg.Append(data) + + outMsg.OutHeader().Error = fusekernel.NotifyCodeStore + outMsg.OutHeader().Len = uint32(outMsg.Len()) + return c.writeOutMessage(outMsg) +} + func (n *Notifier) notify(c *Connection, terminate <-chan struct{}) { for { select { @@ -108,6 +147,8 @@ func (n *Notifier) notify(c *Connection, terminate <-chan struct{}) { i.done <- serviceInodeInvalidation(c, i.inode, i.offset, i.length) case e := <-n.dentryInvalidations: e.done <- serviceEntryInval(c, e.parent, e.name) + case s := <-n.stores: + s.done <- serviceStore(c, s.nodeid, s.offset, s.data) case <-terminate: return } diff --git a/samples/mount_notify_store/mount.go b/samples/mount_notify_store/mount.go new file mode 100644 index 00000000..2402b282 --- /dev/null +++ b/samples/mount_notify_store/mount.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + "flag" + "log" + "time" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/samples/notify_store" +) + +var mountPoint = flag.String("mountpoint", "", "directory to mount the filesystem") + +type ticker struct { + *time.Ticker +} + +func (t *ticker) Ticks() <-chan time.Time { + return t.Ticker.C +} + +func (t *ticker) Tocks() chan<- time.Time { return nil } + +func main() { + flag.Parse() + + if *mountPoint == "" { + log.Fatalf("--mountpoint is required") + } + + t := &ticker{time.NewTicker(time.Second)} + server := notify_store.NewNotifyStoreFS(t) + mfs, err := fuse.Mount(*mountPoint, server, &fuse.MountConfig{}) + if err != nil { + panic(err) + } + if err := mfs.Join(context.Background()); err != nil { + panic(err) + } +} diff --git a/samples/notify_inval/notify_inval.go b/samples/notify_inval/notify_inval.go index 32f52a53..e510b58c 100644 --- a/samples/notify_inval/notify_inval.go +++ b/samples/notify_inval/notify_inval.go @@ -33,6 +33,9 @@ type NotifyTimer interface { // // Unlike package dynamicfs, this implementation does _not_ depend on direct IO. // The invalidations allow file operations to eventually observe the changes. +// +// Note that there is overlap with package notify_store, so that each is a +// self-contained example. func NewNotifyInvalFS(t NotifyTimer) fuse.Server { n := fuse.NewNotifier() fs := ¬ifyInvalInodeFS{ diff --git a/samples/notify_store/notify_store.go b/samples/notify_store/notify_store.go new file mode 100644 index 00000000..3dd6cc5a --- /dev/null +++ b/samples/notify_store/notify_store.go @@ -0,0 +1,182 @@ +package notify_store + +import ( + "context" + "fmt" + "os" + "sync" + "syscall" + "time" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/fuseutil" +) + +var timeLen = len(time.Time{}.Format(time.RFC3339)) + +// NotifyTimer may emit times on Ticks() to trigger filesystem changes. The +// fuse.Server emits the same times in the same order on Tocks(), if not nil, to +// indicate that invalidation is complete. +type NotifyTimer interface { + Ticks() <-chan time.Time + Tocks() chan<- time.Time +} + +// Create a file system with a single file named 'current_time' which always +// contains the current time. +// +// This filesystem is an analog to the libfuse example here: +// https://github.com/libfuse/libfuse/blob/master/example/notify_store_retrieve.c +// +// Unlike package dynamicfs, this implementation does _not_ depend on direct IO. +// The filesystem directly modifies the page cache so file operations eventually +// observe the changes. +// +// Note that there is overlap with package notify_inval, so that each is a +// self-contained example. +func NewNotifyStoreFS(t NotifyTimer) fuse.Server { + n := fuse.NewNotifier() + fs := ¬ifyStoreFS{ + notifier: n, + teardown: make(chan struct{}), + } + + ticks := t.Ticks() + tocks := t.Tocks() + go func() { + for { + select { + case t := <-ticks: + fs.mu.Lock() + fs.currentTime = t + fs.mu.Unlock() + fs.store(t) + if tocks != nil { + tocks <- t + } + case <-fs.teardown: + return + } + } + }() + + return fuse.NewServerWithNotifier(n, fuseutil.NewFileSystemServer(fs)) +} + +type notifyStoreFS struct { + fuseutil.NotImplementedFileSystem + + notifier *fuse.Notifier + teardown chan struct{} + + mu sync.Mutex + currentTime time.Time +} + +const ( + currentTimeFilename = "current_time" + + currentTimeInode = fuseops.RootInodeID + iota +) + +func (fs *notifyStoreFS) store(t time.Time) { + if err := fs.notifier.Store(currentTimeInode, 0, []byte(t.Format(time.RFC3339)+"\n")); err != nil { + fmt.Printf("error storing current_time inode %v: %v\n", currentTimeInode, err) + } +} + +func (fs *notifyStoreFS) fillStat(ino fuseops.InodeID, attrs *fuseops.InodeAttributes) error { + switch ino { + case fuseops.RootInodeID: + attrs.Nlink = 1 + attrs.Mode = 0555 | os.ModeDir + case currentTimeInode: + attrs.Nlink = 1 + attrs.Mode = 0444 + attrs.Size = uint64(timeLen + 1) // with newline + default: + return fuse.ENOENT + } + return nil +} + +func (fs *notifyStoreFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error { + if op.Parent != fuseops.RootInodeID { + return fuse.ENOENT + } + + switch op.Name { + case currentTimeFilename: + op.Entry.Child = currentTimeInode + fs.fillStat(currentTimeInode, &op.Entry.Attributes) + default: + return fuse.ENOENT + } + + distantFuture := time.Now().Add(time.Hour * 300) + op.Entry.AttributesExpiration = distantFuture + op.Entry.EntryExpiration = distantFuture + return nil +} + +func (fs *notifyStoreFS) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error { + return fs.fillStat(op.Inode, &op.Attributes) +} + +func (fs *notifyStoreFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error { + if op.Inode != fuseops.RootInodeID { + return fuse.ENOTDIR + } + + if op.Offset <= 0 { + op.BytesRead += fuseutil.WriteDirent(op.Dst[op.BytesRead:], fuseutil.Dirent{ + Offset: fuseops.DirOffset(1), + Inode: currentTimeInode, + Name: currentTimeFilename, + }) + } + return nil +} + +func (fs *notifyStoreFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error { + if op.Inode == fuseops.RootInodeID { + return syscall.EISDIR + } + if op.Inode != currentTimeInode { + // This should not happen + return fuse.EIO + } + if !op.OpenFlags.IsReadOnly() { + return syscall.EACCES + } + + // Make cache persistent even if the file is closed. This makes it easier to + // see the effects of invalidation. + op.KeepPageCache = true + + return nil +} + +func (fs *notifyStoreFS) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error { + if op.Inode != currentTimeInode { + return fuse.EIO + } + + fmt.Print("Direct read received, bypassing page cache") + + fs.mu.Lock() + t := fs.currentTime + fs.mu.Unlock() + + contents := t.Format(time.RFC3339) + "\n" + + if op.Offset < int64(len(contents)) { + op.BytesRead = copy(op.Dst, contents[op.Offset:]) + } + return nil +} + +func (fs *notifyStoreFS) Destroy() { + close(fs.teardown) +} diff --git a/samples/notify_store/notify_store_test.go b/samples/notify_store/notify_store_test.go new file mode 100644 index 00000000..d2a3e473 --- /dev/null +++ b/samples/notify_store/notify_store_test.go @@ -0,0 +1,76 @@ +package notify_store_test + +import ( + "io/ioutil" + "os" + "path" + "testing" + "time" + + "github.com/jacobsa/fuse/fusetesting" + "github.com/jacobsa/fuse/samples" + "github.com/jacobsa/fuse/samples/notify_store" + . "github.com/jacobsa/ogletest" +) + +func TestNotifyStoreFS(t *testing.T) { RunTests(t) } + +func (t *NotifyStoreFSTest) setTime(tv time.Time) { + t.ticker.tickchan <- tv + t.expectedTime = <-t.ticker.tockchan +} + +func init() { + RegisterTestSuite(&NotifyStoreFSTest{}) +} + +type manualTicker struct { + tickchan chan time.Time + tockchan chan time.Time +} + +func (t *manualTicker) Ticks() <-chan time.Time { return t.tickchan } +func (t *manualTicker) Tocks() chan<- time.Time { return t.tockchan } + +type NotifyStoreFSTest struct { + samples.SampleTest + + ticker *manualTicker + expectedTime time.Time +} + +func (t *NotifyStoreFSTest) SetUp(ti *TestInfo) { + t.ticker = &manualTicker{ + tickchan: make(chan time.Time), + tockchan: make(chan time.Time), + } + t.Server = notify_store.NewNotifyStoreFS(t.ticker) + t.SampleTest.SetUp(ti) +} + +func (t *NotifyStoreFSTest) ReadDir_Root() { + entries, err := fusetesting.ReadDirPicky(t.Dir) + AssertEq(nil, err) + AssertEq(1, len(entries)) + + var fi os.FileInfo + fi = entries[0] + ExpectEq("current_time", fi.Name()) + ExpectEq(len(time.Time{}.Format(time.RFC3339))+1, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) +} + +func (t *NotifyStoreFSTest) ObserveTimeUpdate() { + oldTime := t.expectedTime.Format(time.RFC3339) + + slice, err := ioutil.ReadFile(path.Join(t.Dir, "current_time")) + ExpectEq(err, nil) + ExpectEq(oldTime+"\n", string(slice)) + + t.setTime(t.expectedTime.Add(time.Minute)) + + slice, err = ioutil.ReadFile(path.Join(t.Dir, "current_time")) + ExpectEq(err, nil) + ExpectNe(oldTime+"\n", string(slice)) +}