Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/fusekernel/fuse_kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ const (
NotifyCodePoll int32 = 1
NotifyCodeInvalInode int32 = 2
NotifyCodeInvalEntry int32 = 3
NotifyCodeStore int32 = 4
)

type NotifyInvalInodeOut struct {
Expand All @@ -817,6 +818,13 @@ type NotifyInvalEntryOut struct {
padding uint32
}

type NotifyStoreOut struct {
NodeID uint64
Offset int64
Size uint32
padding uint32
}

type SyncFSIn struct {
Padding uint64
}
41 changes: 41 additions & 0 deletions notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
type Notifier struct {
inodeInvalidations chan invalidateInodeCommand
dentryInvalidations chan invalidateEntryCommand
stores chan storeCommand
}

func NewNotifier() *Notifier {
return &Notifier{
inodeInvalidations: make(chan invalidateInodeCommand),
dentryInvalidations: make(chan invalidateEntryCommand),
stores: make(chan storeCommand),
}
}

Expand All @@ -37,6 +39,13 @@ type invalidateEntryCommand struct {
done chan<- error
}

type storeCommand struct {
nodeid fuseops.InodeID
offset int64
data []byte
done chan<- error
}

// InvalidateInode notifies the kernel to invalidate an inode cache entry. See
// the libfuse documentation at
// https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#a9cb974af9745294ff446d11cba2422f1
Expand Down Expand Up @@ -65,6 +74,19 @@ func (n *Notifier) InvalidateEntry(parent fuseops.InodeID, name string) error {
return <-done
}

// Store notifies the kernel to store the given data at the offset for the
// target inode. See the libfuse documentation at
// https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#af856725ed4a13ed7c17512554043edbc
// for more details. Note that data must be no more than 4GiB in length.
//
// Store blocks until the kernel write completes, and returns the error from the
// kernel, if any. ENOSYS indicates that the kernel does not support stores.
func (n *Notifier) Store(nodeid fuseops.InodeID, offset int64, data []byte) error {
done := make(chan error)
n.stores <- storeCommand{nodeid, offset, data, done}
return <-done
}

func serviceInodeInvalidation(c *Connection, inode fuseops.InodeID, offset, length int64) error {
outMsg := c.getOutMessage()
defer c.putOutMessage(outMsg)
Expand Down Expand Up @@ -101,13 +123,32 @@ func serviceEntryInval(c *Connection, parent fuseops.InodeID, name string) error
return c.writeOutMessage(outMsg)
}

func serviceStore(c *Connection, nodeid fuseops.InodeID, offset int64, data []byte) error {
outMsg := c.getOutMessage()
defer c.putOutMessage(outMsg)

cmd := fusekernel.NotifyStoreOut{
NodeID: uint64(nodeid),
Offset: offset,
Size: uint32(len(data)),
}
outMsg.Append(unsafe.Slice((*byte)(unsafe.Pointer(&cmd)), int(unsafe.Sizeof(cmd))))
outMsg.Append(data)

outMsg.OutHeader().Error = fusekernel.NotifyCodeStore
outMsg.OutHeader().Len = uint32(outMsg.Len())
return c.writeOutMessage(outMsg)
}

func (n *Notifier) notify(c *Connection, terminate <-chan struct{}) {
for {
select {
case i := <-n.inodeInvalidations:
i.done <- serviceInodeInvalidation(c, i.inode, i.offset, i.length)
case e := <-n.dentryInvalidations:
e.done <- serviceEntryInval(c, e.parent, e.name)
case s := <-n.stores:
s.done <- serviceStore(c, s.nodeid, s.offset, s.data)
case <-terminate:
return
}
Expand Down
41 changes: 41 additions & 0 deletions samples/mount_notify_store/mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package main

import (
"context"
"flag"
"log"
"time"

"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/samples/notify_store"
)

var mountPoint = flag.String("mountpoint", "", "directory to mount the filesystem")

type ticker struct {
*time.Ticker
}

func (t *ticker) Ticks() <-chan time.Time {
return t.Ticker.C
}

func (t *ticker) Tocks() chan<- time.Time { return nil }

func main() {
flag.Parse()

if *mountPoint == "" {
log.Fatalf("--mountpoint is required")
}

t := &ticker{time.NewTicker(time.Second)}
server := notify_store.NewNotifyStoreFS(t)
mfs, err := fuse.Mount(*mountPoint, server, &fuse.MountConfig{})
if err != nil {
panic(err)
}
if err := mfs.Join(context.Background()); err != nil {
panic(err)
}
}
3 changes: 3 additions & 0 deletions samples/notify_inval/notify_inval.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type NotifyTimer interface {
//
// Unlike package dynamicfs, this implementation does _not_ depend on direct IO.
// The invalidations allow file operations to eventually observe the changes.
//
// Note that there is overlap with package notify_store, so that each is a
// self-contained example.
func NewNotifyInvalFS(t NotifyTimer) fuse.Server {
n := fuse.NewNotifier()
fs := &notifyInvalInodeFS{
Expand Down
182 changes: 182 additions & 0 deletions samples/notify_store/notify_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package notify_store

import (
"context"
"fmt"
"os"
"sync"
"syscall"
"time"

"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
)

var timeLen = len(time.Time{}.Format(time.RFC3339))

// NotifyTimer may emit times on Ticks() to trigger filesystem changes. The
// fuse.Server emits the same times in the same order on Tocks(), if not nil, to
// indicate that invalidation is complete.
type NotifyTimer interface {
Ticks() <-chan time.Time
Tocks() chan<- time.Time
}

// Create a file system with a single file named 'current_time' which always
// contains the current time.
//
// This filesystem is an analog to the libfuse example here:
// https://github.com/libfuse/libfuse/blob/master/example/notify_store_retrieve.c
//
// Unlike package dynamicfs, this implementation does _not_ depend on direct IO.
// The filesystem directly modifies the page cache so file operations eventually
// observe the changes.
//
// Note that there is overlap with package notify_inval, so that each is a
// self-contained example.
func NewNotifyStoreFS(t NotifyTimer) fuse.Server {
n := fuse.NewNotifier()
fs := &notifyStoreFS{
notifier: n,
teardown: make(chan struct{}),
}

ticks := t.Ticks()
tocks := t.Tocks()
go func() {
for {
select {
case t := <-ticks:
fs.mu.Lock()
fs.currentTime = t
fs.mu.Unlock()
fs.store(t)
if tocks != nil {
tocks <- t
}
case <-fs.teardown:
return
}
}
}()

return fuse.NewServerWithNotifier(n, fuseutil.NewFileSystemServer(fs))
}

type notifyStoreFS struct {
fuseutil.NotImplementedFileSystem

notifier *fuse.Notifier
teardown chan struct{}

mu sync.Mutex
currentTime time.Time
}

const (
currentTimeFilename = "current_time"

currentTimeInode = fuseops.RootInodeID + iota
)

func (fs *notifyStoreFS) store(t time.Time) {
if err := fs.notifier.Store(currentTimeInode, 0, []byte(t.Format(time.RFC3339)+"\n")); err != nil {
fmt.Printf("error storing current_time inode %v: %v\n", currentTimeInode, err)
}
}

func (fs *notifyStoreFS) fillStat(ino fuseops.InodeID, attrs *fuseops.InodeAttributes) error {
switch ino {
case fuseops.RootInodeID:
attrs.Nlink = 1
attrs.Mode = 0555 | os.ModeDir
case currentTimeInode:
attrs.Nlink = 1
attrs.Mode = 0444
attrs.Size = uint64(timeLen + 1) // with newline
default:
return fuse.ENOENT
}
return nil
}

func (fs *notifyStoreFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error {
if op.Parent != fuseops.RootInodeID {
return fuse.ENOENT
}

switch op.Name {
case currentTimeFilename:
op.Entry.Child = currentTimeInode
fs.fillStat(currentTimeInode, &op.Entry.Attributes)
default:
return fuse.ENOENT
}

distantFuture := time.Now().Add(time.Hour * 300)
op.Entry.AttributesExpiration = distantFuture
op.Entry.EntryExpiration = distantFuture
return nil
}

func (fs *notifyStoreFS) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error {
return fs.fillStat(op.Inode, &op.Attributes)
}

func (fs *notifyStoreFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error {
if op.Inode != fuseops.RootInodeID {
return fuse.ENOTDIR
}

if op.Offset <= 0 {
op.BytesRead += fuseutil.WriteDirent(op.Dst[op.BytesRead:], fuseutil.Dirent{
Offset: fuseops.DirOffset(1),
Inode: currentTimeInode,
Name: currentTimeFilename,
})
}
return nil
}

func (fs *notifyStoreFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error {
if op.Inode == fuseops.RootInodeID {
return syscall.EISDIR
}
if op.Inode != currentTimeInode {
// This should not happen
return fuse.EIO
}
if !op.OpenFlags.IsReadOnly() {
return syscall.EACCES
}

// Make cache persistent even if the file is closed. This makes it easier to
// see the effects of invalidation.
op.KeepPageCache = true

return nil
}

func (fs *notifyStoreFS) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error {
if op.Inode != currentTimeInode {
return fuse.EIO
}

fmt.Print("Direct read received, bypassing page cache")

fs.mu.Lock()
t := fs.currentTime
fs.mu.Unlock()

contents := t.Format(time.RFC3339) + "\n"

if op.Offset < int64(len(contents)) {
op.BytesRead = copy(op.Dst, contents[op.Offset:])
}
return nil
}

func (fs *notifyStoreFS) Destroy() {
close(fs.teardown)
}
Loading