Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkgs/edge-worker/deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions pkgs/edge-worker/src/core/Queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,25 @@ export class Queries {
`;
return result.result;
}

/**
* Registers an edge function for monitoring by ensure_workers() cron.
* Called by workers on startup. Sets last_invoked_at to prevent cron from
* pinging during startup (debounce).
*/
async trackWorkerFunction(functionName: string): Promise<void> {
await this.sql`
SELECT pgflow.track_worker_function(${functionName})
`;
}

/**
* Marks a worker as stopped for graceful shutdown signaling.
* Called by workers on beforeunload to allow ensure_workers() to detect death immediately.
*/
async markWorkerStopped(workerId: string): Promise<void> {
await this.sql`
SELECT pgflow.mark_worker_stopped(${workerId}::uuid)
`;
}
}
4 changes: 4 additions & 0 deletions pkgs/edge-worker/src/core/WorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export class WorkerLifecycle<IMessage extends Json> implements ILifecycle {
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
this.workerState.transitionTo(States.Starting);

// Register this edge function for monitoring by ensure_workers() cron.
// Must be called early to set last_invoked_at (debounce) before heartbeat timeout.
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName);

this.logger.debug(`Ensuring queue '${this.queue.queueName}' exists...`);
await this.queue.safeCreate();

Expand Down
4 changes: 4 additions & 0 deletions pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
// Store workerId for supplier pattern
this._workerId = workerBootstrap.workerId;

// Register this edge function for monitoring by ensure_workers() cron.
// Must be called early to set last_invoked_at (debounce) before heartbeat timeout.
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName);

// Compile/verify flow as part of Starting (before registering worker)
if (this.ensureCompiledOnStartup) {
await this.ensureFlowCompiled();
Expand Down
39 changes: 9 additions & 30 deletions pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
resolveConnectionString,
resolveSqlConnection,
} from './resolveConnection.js';
import { Queries } from '../core/Queries.js';

/**
* Supabase Edge Runtime type (without global augmentation to comply with JSR)
Expand Down Expand Up @@ -47,6 +48,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
private _platformResources: SupabaseResources;
private validatedEnv: SupabaseEnv;
private _connectionString: string | undefined;
private queries: Queries;

// Logging factory with dynamic workerId support
private loggingFactory = createLoggingFactory();
Expand Down Expand Up @@ -79,6 +81,9 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
sql: resolveSqlConnection(env, options),
supabase: createServiceSupabaseClient(this.validatedEnv)
};

// Create Queries instance for shutdown handler
this.queries = new Queries(this._platformResources.sql);
}

/**
Expand Down Expand Up @@ -158,35 +163,6 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
return this._platformResources;
}

private async spawnNewEdgeFunction(): Promise<void> {
if (!this.edgeFunctionName) {
throw new Error('functionName cannot be null or empty');
}

const supabaseUrl = this.validatedEnv.SUPABASE_URL;

this.logger.debug('Spawning a new Edge Function...');

const response = await fetch(
`${supabaseUrl}/functions/v1/${this.edgeFunctionName}`,
{
method: 'POST',
headers: {
Authorization: `Bearer ${this.validatedEnv.SUPABASE_ANON_KEY}`,
'Content-Type': 'application/json',
},
}
);

this.logger.debug('Edge Function spawned successfully!');

if (!response.ok) {
throw new Error(
`Edge function returned non-OK status: ${response.status} ${response.statusText}`
);
}
}

private extractFunctionName(req: Request): string {
return new URL(req.url).pathname.replace(/^\/+|\/+$/g, '');
}
Expand All @@ -196,7 +172,10 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
this.logger.debug('Shutting down...');

if (this.worker) {
await this.spawnNewEdgeFunction();
// Signal death to ensure_workers() cron by setting stopped_at.
// This allows the cron to immediately ping for a replacement worker.
const workerId = this.validatedEnv.SB_EXECUTION_ID;
await this.queries.markWorkerStopped(workerId);
}

await this.stopWorker();
Expand Down
128 changes: 128 additions & 0 deletions pkgs/edge-worker/supabase/functions/deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions pkgs/edge-worker/supabase/functions/stopped_at_test/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { EdgeWorker } from '@pgflow/edge-worker';
import { crypto } from 'jsr:@std/crypto';
import { sql } from '../utils.ts';

async function cpuIntensiveTask(payload: { debug?: boolean }) {
let data = new TextEncoder().encode('burn');
const timeId = `stopped_at_test_${Math.random()}`;

if (payload.debug) {
console.time(timeId);
}

for (let i = 0; i < 10000; i++) {
data = new Uint8Array(await crypto.subtle.digest('SHA-256', data));
}

if (payload.debug) {
console.timeEnd(timeId);
console.log(
'[stopped_at_test] last_val = ',
await sql`SELECT nextval('stopped_at_test_seq')`
);
} else {
await sql`SELECT nextval('stopped_at_test_seq')`;
}
}

EdgeWorker.start(cpuIntensiveTask, { queueName: 'stopped_at_test' });
Loading
Loading