Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"cSpell.words": [
"ardrive",
"arweave",
"m3ters",
"Emmo00",
"ccip",
"Mauchly",
]
}
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,68 @@
npm install
npm run dev
```


# Complete Hook Lifecycle for M3tering Console

## Initialization Phase

| Hook Name | Description | Parameters |
|-----------|-------------|------------|
| `onBeforeInit` | Called before any initialization begins | None |
| `onDatabaseSetup` | Called after SQLite tables/jobs are initialized | None |
| `onAfterInit` | Called after all initialization completes successfully | None |
| `onInitError` | Called when an error occurs during initialization | `error: any` |

## MQTT Connection Phase

| Hook Name | Description | Parameters |
|-----------|-------------|------------|
| `onMqttConnect` | Called when MQTT client successfully connects to ChirpStack | `client: MqttClient` |
| `onMqttSubscribed` | Called after subscribing to the device uplink topic | `client: MqttClient`, `topic: string` |
| `onMqttError` | Called when an MQTT connection error occurs | `error: any`, `client: MqttClient` |
| `onMqttReconnect` | Called when attempting to reconnect to the MQTT broker | `client: MqttClient` |

## Message Ingestion Phase

| Hook Name | Description | Parameters |
|-----------|-------------|------------|
| `onMessageReceived` | Called when a raw MQTT message is received (before parsing) | `blob: Buffer` |
| `onMessageDropped` | Called when a message is dropped (e.g., device already locked) | `reason: string`, `devEui: string` |

## Meter Discovery & Registration Phase

| Hook Name | Description | Parameters |
|-----------|-------------|------------|
| `onMeterCreated` | Called after a new meter is saved to the database | `newMeter: MeterRecord` |

## Nonce Synchronization Phase

| Hook Name | Description | Parameters |
|-----------|-------------|------------|
| `onSyncEpochReached` | Called when the sync interval is reached for blockchain synchronization | None |

## Downstream Distribution Phase

| Hook Name | Description | Parameters |
|-----------|-------------|------------|
| `onTransactionDistribution` | Called before sending transactions to Arweave, prover node, Streamr, or other stores/loggers | `tokenId: number`, `decodedPayload: DecodedPayload`, `pendingTransactions: TransactionRecord[]` |

## State Encoding & Device Response Phase

| Hook Name | Description | Parameters |
|-----------|-------------|------------|
| `isOnStateCompute` | Called to determine the device's on/off state (returns boolean) | `m3terId: number` |
| `onIsOnStateComputed` | Called after the on/off state has been computed | `m3terId: number`, `isOn: boolean` |
| `onIsOnStateComputeError` | Called when an error occurs during state computation | `m3terId: number`, `error: any` |
| `onStateEnqueued` | Called after the state is enqueued to gRPC for device response | `state: any`, `latitude: number`, `longitude: number` |

## Error Handling & Cleanup Phase

| Hook Name | Description | Parameters |
|-----------|-------------|------------|
| `onMessageError` | Called when any error occurs during message processing | `error: any` |
| `onDeviceUnlocked` | Called when a device lock is released (regardless of outcome) | `devEui: string` |
| `onMessageProcessingComplete` | Called when message processing finishes (success or error) | None |

---
8 changes: 8 additions & 0 deletions console.config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"modules": [
"core/arweave",
"core/prover",
"core/streamr",
"core/is_on"
]
}
26 changes: 14 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
import "dotenv/config";
import { handleUplinks } from "./logic/mqtt";
import { handleUplinks } from "./services/mqtt";
import { Request, Response } from "express";
import { app } from "./logic/context";
import { app } from "./services/context";
import { loadExtensionsFromConfig, runHook } from "./lib/utils";
import setupDatabase, { getAllMeterRecords, deleteMeterByPublicKey } from "./store/sqlite";
import { initializeVerifiersCache } from "./logic/sync";
import { publishHeartbeatToStream } from "./logic/streamr";

// Async initialization function
async function initializeApp() {
try {
console.log("[info] Starting application initialization...");

// Load extensions from config
await loadExtensionsFromConfig();
console.log("[info] Extensions loaded successfully");

runHook("onBeforeInit");

// Initialize database tables and jobs
setupDatabase();
console.log("[info] Database setup completed");

// Initialize verifiers cache on startup
// (disable ccip read initialization)
// await initializeVerifiersCache();
// console.log("[info] Verifiers cache initialized successfully");
runHook("onDatabaseSetup")

// Start MQTT handling
handleUplinks();
console.log("[info] MQTT uplinks handler started");

await publishHeartbeatToStream();
await handleUplinks();

console.log("[info] Application initialization completed successfully");

runHook("onAfterInit");
} catch (error) {
console.error("[fatal] Failed to initialize application:", error);
runHook("onInitError", error);
process.exit(1);
}
}
Expand Down
64 changes: 64 additions & 0 deletions src/lib/core/arweave/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk";
import { Readable } from "stream";
import Arweave from "arweave";
import type { DecodedPayload, Hooks } from "../../../types";

export default class implements Hooks {
async onTransactionDistribution(m3terId: number, decoded: DecodedPayload) {
// encode transaction into standard format (payload[0])
// format: nonce | energy | signature | voltage | device_id | longitude | latitude
const transactionHex = decoded.buf;

const arweave = Arweave.init({
host: "arweave.net",
protocol: "https",
port: 443,
});

const key = await arweave.wallets.generate();
const signer = new ArweaveSigner(key);
const turbo = TurboFactory.authenticated({ signer });

const contractLabel = process.env.CONTRACT_LABEL || "M3ters";

const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8");

await turbo.uploadFile({
fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }),
fileSizeFactory: () => byteLength,
dataItemOpts: {
paidBy: await arweave.wallets.jwkToAddress(key),
tags: [
{ name: "Contract-Label", value: contractLabel },
{ name: "Contract-Use", value: "M3tering Protocol Test" },
{ name: "Content-Type", value: "text/plain" },
{ name: "M3ter-ID", value: m3terId.toString() },
{ name: "Timestamp", value: Date.now().toString() },
{ name: "Nonce", value: decoded.nonce.toString() },
{ name: "Energy", value: decoded.energy.toString() },
{ name: "Signature", value: decoded.signature },
{ name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" },
{ name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" },
{ name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" },
{ name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" },
],
},
events: {
onUploadProgress: (progress) => {
console.log("[arweave] Upload progress:", progress);
},
onError: (error) => {
console.error("[arweave] Upload error:", error);
},
onSuccess(event) {
console.log("[arweave] Upload successful! Transaction ID:", event);
},
onUploadSuccess(event) {
console.log("[arweave] Upload completed! Transaction ID:", event);
},
},
});

console.log(`[arweave] Uploaded transaction ${decoded.nonce} for M3ter ID ${m3terId} to Arweave.`);
}
}
7 changes: 7 additions & 0 deletions src/lib/core/is_on/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import type { Hooks } from "../../../types";

export default class implements Hooks {
isOnStateCompute(m3terId: number) {
return true;
}
}
66 changes: 66 additions & 0 deletions src/lib/core/prover/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { buildBatchPayload } from "../../utils";
import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types";

const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing";

export default class implements Hooks {
async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) {
// send pending transactions to prover node
try {
const proverURL = await getProverURL();
console.info(`Sending pending transactions to prover: ${proverURL}`);

const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions);

console.info("done sending to prover");
console.info(`Prover response (text): ${await response?.text()}`);
} catch (error) {
console.error(`Error sending pending transactions to prover: ${error}`);
}
}
}

/**
* Send transactions to prover node for verification
*/
export async function sendTransactionsToProver(
proverURL: string,
transactionData: BatchTransactionPayload[]
): Promise<Response | null> {
try {
const response = await fetch(`${proverURL}/batch-payloads`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(transactionData),
});

console.log("[info] received", response.status, "from the prover");

if (!response.ok) {
throw new Error(`Prover responded with status: ${response.status}`);
}
return response;
} catch (err: any) {
console.error("Failed to send transactions to prover:", err.message);
return null;
}
}

export async function getProverURL(): Promise<string | null> {
return PREFERRED_PROVER_NODE;
}

export async function sendPendingTransactionsToProver(
proverURL: string,
pendingTransactions: TransactionRecord[]
) {
console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL);

const requestPayload = buildBatchPayload(pendingTransactions);

console.log("[info] Request payload:", requestPayload);

return await sendTransactionsToProver(proverURL, requestPayload);
}
41 changes: 41 additions & 0 deletions src/lib/core/streamr/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { StreamrClient } from "@streamr/sdk";
import { buildBatchPayload, retry } from "../../utils";
import type { Hooks, TransactionRecord } from "../../../types";

const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env;

if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) {
throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables");
}

export default class implements Hooks {
async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) {
// send pending transactions to streamr
try {
console.info(`Sending pending transactions to streamr`);

await retry(() => publishPendingTransactionsToStreamr(pendingTransactions), 3);

console.info(`Successfully sent pending transactions to streamr`);
} catch (error) {
console.error(`Error sending pending transactions to streamr: ${error}`);
}
}
}

async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) {
const streamrClient = new StreamrClient({
auth: {
privateKey: ETHEREUM_PRIVATE_KEY!,
},
});

const stream = await streamrClient.getStream(STREAMR_STREAM_ID!);

const batchPayload = buildBatchPayload(pendingTransactions);

await stream.publish(batchPayload);

// destroy the client to free resources
await streamrClient.destroy();
}
File renamed without changes.
File renamed without changes.
12 changes: 6 additions & 6 deletions src/logic/sync.ts → src/lib/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import {
rollup as rollupContract,
ccipRevenueReader as ccipRevenueReaderContract,
priceContext as priceContextContract,
} from "./context";
} from "../services/context";
import { JsonRpcProvider, Contract, ZeroAddress } from "ethers";
import { retry } from "../utils";
import { retry } from "./utils";
import type { VerifierInfo } from "../types";

// Cache for verifiers - populated once on startup
Expand Down Expand Up @@ -79,11 +79,11 @@ export async function initializeVerifiersCache(): Promise<void> {
/**
* Get cached verifiers, throws error if cache is not initialized
*/
function getCachedVerifiers(): VerifierInfo[] {
async function getCachedVerifiers(): Promise<VerifierInfo[]> {
if (!isCacheInitialized || !verifiersCache) {
throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first.");
await initializeVerifiersCache();
}
return verifiersCache;
return verifiersCache!;
}

/**
Expand Down Expand Up @@ -147,7 +147,7 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis
export async function getCrossChainRevenue(tokenId: number): Promise<number> {
try {
// Use cached verifiers instead of fetching them each time
const verifiers = getCachedVerifiers();
const verifiers = await getCachedVerifiers();

let totalRevenue = 0;

Expand Down
Loading