Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
bfca951
custom cirbuf impl for streammanager
sawka Jan 9, 2026
741a931
add a SetEffectiveWindow to cirbuf
sawka Jan 9, 2026
61dcc8d
fixes for cirbuf
sawka Jan 9, 2026
3d7566c
checkpoint, streammanager
sawka Jan 9, 2026
de588eb
integrate streammanager with broker
sawka Jan 10, 2026
5386109
rename session => job
sawka Jan 12, 2026
fe3644f
working on job manager cmd/interface
sawka Jan 12, 2026
7e6b9b8
jobmanager checkpoint
sawka Jan 12, 2026
30c8756
checkpoint
sawka Jan 12, 2026
c5743ee
checkpoint
sawka Jan 12, 2026
e09d900
job manager start/connect sync fixed, checkpoint
sawka Jan 13, 2026
bada6d9
split up jobmanager file
sawka Jan 13, 2026
93bc1df
set up auth token when starting jobmanager not on jobstart
sawka Jan 14, 2026
19e79e8
daemonization, better signal handling
sawka Jan 14, 2026
cbf39ff
update router to allow routing messages by linkid. also allow respon…
sawka Jan 14, 2026
82ca024
new mode to run router over domain socket
sawka Jan 14, 2026
b4dc321
updates to conncontroller to support new router over domainsocket mode
sawka Jan 14, 2026
cfebfa0
begin work on remotejobstart command
sawka Jan 14, 2026
34f9afd
checkpoint -- finish RemoteStartJobCommand (untested)
sawka Jan 14, 2026
28c9ed4
hook up streambroker to wshrpc
sawka Jan 14, 2026
bb9f51d
update termination flow for jobmanager
sawka Jan 14, 2026
1db5667
better exit info, exitts, store exit data separate from stream eof/error
sawka Jan 15, 2026
caf7fdc
add job termination
sawka Jan 15, 2026
da8b38d
add installid to client
sawka Jan 15, 2026
79eca5c
fix platform specific funcs
sawka Jan 15, 2026
6763b5b
working through bugs, checkpoint
sawka Jan 15, 2026
70273ed
job manager runs now, working more on control
sawka Jan 15, 2026
8fca82f
route:up added as an event from wshrouter
sawka Jan 15, 2026
02534e5
fix bare client rpc
sawka Jan 15, 2026
c4c2168
debugging, lots of new logging, but finally capturing output from job…
sawka Jan 16, 2026
09c4e69
mostly working start/getoutput with data streaming
sawka Jan 16, 2026
51aa95e
reorg job fields, split error, better jobdebug list output
sawka Jan 16, 2026
bce0747
checkpoint on reconnect
sawka Jan 17, 2026
c627a4d
checkpoint, spilt up wshremote
sawka Jan 19, 2026
991769a
working on jobcontroller/remote reconnect/disconnect job manager comm…
sawka Jan 19, 2026
f04cf0d
return bool saying if jobmanager is no longer running (from reconnect)
sawka Jan 19, 2026
5c5dfac
implement terminate and a new SIGHUP handler
sawka Jan 19, 2026
e99d03d
reconnect jobs for conn
sawka Jan 19, 2026
0ec269b
remove barerpc nil checks
sawka Jan 19, 2026
02e76a0
update termination, make more consistent
sawka Jan 19, 2026
b04894c
working on the ability to restart a job stream
sawka Jan 19, 2026
5569778
fix restartstream to not recreate the stream (use the same streamid)
sawka Jan 19, 2026
5421c03
working on reconnect
sawka Jan 19, 2026
1975d67
fix many bugs, get reconnect logic working/tested
sawka Jan 20, 2026
8a89c24
get job input through block working. remove old webcmd code
sawka Jan 20, 2026
dad67c6
stream reader now handles out of order packerts
sawka Jan 20, 2026
a22788c
remove test coverage UX
sawka Jan 20, 2026
609db36
remove extra go routine for streaming as the broker was designed to b…
sawka Jan 20, 2026
861dcec
working through bugs, tightening semantics
sawka Jan 20, 2026
2968757
more updates, tightening things up
sawka Jan 21, 2026
f483add
clean up more semantics
sawka Jan 21, 2026
d2b9595
stub jobid change RPC call to FE block
sawka Jan 21, 2026
30fe82e
force cirbuf shrinking if seteffectivewindow lowers the window size i…
sawka Jan 21, 2026
43a4af1
fix context for go routine
sawka Jan 21, 2026
d5ffc4d
sync.once the cleanup code
sawka Jan 21, 2026
b298c3f
dont allow binding link routes
sawka Jan 21, 2026
827d447
fix fd 3 check
sawka Jan 21, 2026
d17bdb7
do not create conn when checking if it is connected (update misleadin…
sawka Jan 22, 2026
d62ef54
fix json tag for attachedblockid
sawka Jan 22, 2026
d590d1a
comment why we can ignore the error from RegisterTrustedLeaf
sawka Jan 22, 2026
1b35d17
fix nits
sawka Jan 22, 2026
0b839fc
exitcode is optional
sawka Jan 22, 2026
4866dc0
close readPipeWrite after start
sawka Jan 22, 2026
15e35a7
fix nit, add todo
sawka Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,8 @@
},
"directoryFilters": ["-tsunami/frontend/scaffold", "-dist", "-make"]
},
"tailwindCSS.lint.suggestCanonicalClasses": "ignore"
"tailwindCSS.lint.suggestCanonicalClasses": "ignore",
"go.coverageDecorator": {
"type": "gutter"
}
}
4 changes: 3 additions & 1 deletion cmd/server/main-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/wavetermdev/waveterm/pkg/blocklogger"
"github.com/wavetermdev/waveterm/pkg/filebackup"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/jobcontroller"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote/conncontroller"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
Expand Down Expand Up @@ -391,7 +392,7 @@ func createMainWshClient() {
wshfs.RpcClient = rpc
wshutil.DefaultRouter.RegisterTrustedLeaf(rpc, wshutil.DefaultRoute)
wps.Broker.SetClient(wshutil.DefaultRouter)
localConnWsh := wshutil.MakeWshRpc(wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, &wshremote.ServerImpl{}, "conn:local")
localConnWsh := wshutil.MakeWshRpc(wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, wshremote.MakeRemoteRpcServerImpl(nil, wshutil.DefaultRouter, wshclient.GetBareRpcClient(), true), "conn:local")
go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName)
wshutil.DefaultRouter.RegisterTrustedLeaf(localConnWsh, wshutil.MakeConnectionRouteId(wshrpc.LocalConnName))
}
Expand Down Expand Up @@ -572,6 +573,7 @@ func main() {
go backupCleanupLoop()
go startupActivityUpdate(firstLaunch) // must be after startConfigWatcher()
blocklogger.InitBlockLogger()
jobcontroller.InitJobController()
go func() {
defer func() {
panichandler.PanicHandler("GetSystemSummary", recover())
Expand Down
133 changes: 130 additions & 3 deletions cmd/wsh/cmd/wshcmd-connserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ var serverCmd = &cobra.Command{
}

var connServerRouter bool
var connServerRouterDomainSocket bool
var connServerConnName string
var connServerDev bool
var ConnServerWshRouter *wshutil.WshRouter

func init() {
serverCmd.Flags().BoolVar(&connServerRouter, "router", false, "run in local router mode")
serverCmd.Flags().BoolVar(&connServerRouter, "router", false, "run in local router mode (stdio upstream)")
serverCmd.Flags().BoolVar(&connServerRouterDomainSocket, "router-domainsocket", false, "run in local router mode (domain socket upstream)")
serverCmd.Flags().StringVar(&connServerConnName, "conn", "", "connection name")
serverCmd.Flags().BoolVar(&connServerDev, "dev", false, "enable dev mode with file logging and PID in logs")
rootCmd.AddCommand(serverCmd)
Expand Down Expand Up @@ -123,14 +126,20 @@ func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter) (*wshutil.Wsh
RouteId: routeId,
Conn: connServerConnName,
}
connServerClient := wshutil.MakeWshRpc(rpcCtx, &wshremote.ServerImpl{LogWriter: os.Stdout}, routeId)

bareRouteId := wshutil.MakeRandomProcRouteId()
bareClient := wshutil.MakeWshRpc(wshrpc.RpcContext{}, &wshclient.WshServer{}, bareRouteId)
router.RegisterTrustedLeaf(bareClient, bareRouteId)

connServerClient := wshutil.MakeWshRpc(rpcCtx, wshremote.MakeRemoteRpcServerImpl(os.Stdout, router, bareClient, false), routeId)
router.RegisterTrustedLeaf(connServerClient, routeId)
return connServerClient, nil
}

func serverRunRouter() error {
log.Printf("starting connserver router")
router := wshutil.NewWshRouter()
ConnServerWshRouter = router
termProxy := wshutil.MakeRpcProxy("connserver-term")
rawCh := make(chan []byte, wshutil.DefaultOutputChSize)
go func() {
Expand Down Expand Up @@ -209,8 +218,112 @@ func serverRunRouter() error {
select {}
}

func serverRunRouterDomainSocket(jwtToken string) error {
log.Printf("starting connserver router (domain socket upstream)")

// extract socket name from JWT token (unverified - we're on the client side)
sockName, err := wshutil.ExtractUnverifiedSocketName(jwtToken)
if err != nil {
return fmt.Errorf("error extracting socket name from JWT: %v", err)
}

// connect to the forwarded domain socket
sockName = wavebase.ExpandHomeDirSafe(sockName)
conn, err := net.Dial("unix", sockName)
if err != nil {
return fmt.Errorf("error connecting to domain socket %s: %v", sockName, err)
}

// create router
router := wshutil.NewWshRouter()
ConnServerWshRouter = router

// create proxy for the domain socket connection
upstreamProxy := wshutil.MakeRpcProxy("connserver-upstream")

// goroutine to write to the domain socket
go func() {
defer func() {
panichandler.PanicHandler("serverRunRouterDomainSocket:WriteLoop", recover())
}()
writeErr := wshutil.AdaptOutputChToStream(upstreamProxy.ToRemoteCh, conn)
if writeErr != nil {
log.Printf("error writing to upstream domain socket: %v\n", writeErr)
}
}()

// goroutine to read from the domain socket
go func() {
defer func() {
panichandler.PanicHandler("serverRunRouterDomainSocket:ReadLoop", recover())
}()
defer func() {
log.Printf("upstream domain socket closed, shutting down")
wshutil.DoShutdown("", 0, true)
}()
wshutil.AdaptStreamToMsgCh(conn, upstreamProxy.FromRemoteCh)
}()

// register the domain socket connection as upstream
router.RegisterUpstream(upstreamProxy)

// setup the connserver rpc client (leaf)
client, err := setupConnServerRpcClientWithRouter(router)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
wshfs.RpcClient = client

// authenticate with the upstream router using the JWT
_, err = wshclient.AuthenticateCommand(client, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
if err != nil {
return fmt.Errorf("error authenticating with upstream: %v", err)
}
log.Printf("authenticated with upstream router")

// fetch and set JWT public key
log.Printf("trying to get JWT public key")
jwtPublicKeyB64, err := wshclient.GetJwtPublicKeyCommand(client, nil)
if err != nil {
return fmt.Errorf("error getting jwt public key: %v", err)
}
jwtPublicKeyBytes, err := base64.StdEncoding.DecodeString(jwtPublicKeyB64)
if err != nil {
return fmt.Errorf("error decoding jwt public key: %v", err)
}
err = wavejwt.SetPublicKey(jwtPublicKeyBytes)
if err != nil {
return fmt.Errorf("error setting jwt public key: %v", err)
}
log.Printf("got JWT public key")

// set up the local domain socket listener for local wsh commands
unixListener, err := MakeRemoteUnixListener()
if err != nil {
return fmt.Errorf("cannot create unix listener: %v", err)
}
log.Printf("unix listener started")
go func() {
defer func() {
panichandler.PanicHandler("serverRunRouterDomainSocket:runListener", recover())
}()
runListener(unixListener, router)
}()

// run the sysinfo loop
go func() {
defer func() {
panichandler.PanicHandler("serverRunRouterDomainSocket:RunSysInfoLoop", recover())
}()
wshremote.RunSysInfoLoop(client, connServerConnName)
}()

log.Printf("running server (router-domainsocket mode), successfully started")
select {}
}

func serverRunNormal(jwtToken string) error {
err := setupRpcClient(&wshremote.ServerImpl{LogWriter: os.Stdout}, jwtToken)
err := setupRpcClient(wshremote.MakeRemoteRpcServerImpl(os.Stdout, nil, nil, false), jwtToken)
if err != nil {
return err
}
Expand Down Expand Up @@ -283,6 +396,20 @@ func serverRun(cmd *cobra.Command, args []string) error {
}
return err
}
if connServerRouterDomainSocket {
jwtToken, err := askForJwtToken()
if err != nil {
if logFile != nil {
fmt.Fprintf(logFile, "askForJwtToken error: %v\n", err)
}
return err
}
err = serverRunRouterDomainSocket(jwtToken)
if err != nil && logFile != nil {
fmt.Fprintf(logFile, "serverRunRouterDomainSocket error: %v\n", err)
}
return err
}
jwtToken, err := askForJwtToken()
if err != nil {
if logFile != nil {
Expand Down
Loading
Loading