diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7d1e594 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,10 @@ +{ + "cSpell.words": [ + "ardrive", + "arweave", + "m3ters", + "Emmo00", + "ccip", + "Mauchly", + ] +} \ No newline at end of file diff --git a/README.md b/README.md index 4074dda..076c822 100644 --- a/README.md +++ b/README.md @@ -54,3 +54,68 @@ npm install npm run dev ``` + + +# Complete Hook Lifecycle for M3tering Console + +## Initialization Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onBeforeInit` | Called before any initialization begins | None | +| `onDatabaseSetup` | Called after SQLite tables/jobs are initialized | None | +| `onAfterInit` | Called after all initialization completes successfully | None | +| `onInitError` | Called when an error occurs during initialization | `error: any` | + +## MQTT Connection Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMqttConnect` | Called when MQTT client successfully connects to ChirpStack | `client: MqttClient` | +| `onMqttSubscribed` | Called after subscribing to the device uplink topic | `client: MqttClient`, `topic: string` | +| `onMqttError` | Called when an MQTT connection error occurs | `error: any`, `client: MqttClient` | +| `onMqttReconnect` | Called when attempting to reconnect to the MQTT broker | `client: MqttClient` | + +## Message Ingestion Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMessageReceived` | Called when a raw MQTT message is received (before parsing) | `blob: Buffer` | +| `onMessageDropped` | Called when a message is dropped (e.g., device already locked) | `reason: string`, `devEui: string` | + +## Meter Discovery & Registration Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMeterCreated` | Called after a new meter is saved to the database | `newMeter: MeterRecord` | + +## Nonce Synchronization Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onSyncEpochReached` | Called when the sync interval is reached for blockchain synchronization | None | + +## Downstream Distribution Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onTransactionDistribution` | Called before sending transactions to Arweave, prover node, Streamr, or other stores/loggers | `tokenId: number`, `decodedPayload: DecodedPayload`, `pendingTransactions: TransactionRecord[]` | + +## State Encoding & Device Response Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `isOnStateCompute` | Called to determine the device's on/off state (returns boolean) | `m3terId: number` | +| `onIsOnStateComputed` | Called after the on/off state has been computed | `m3terId: number`, `isOn: boolean` | +| `onIsOnStateComputeError` | Called when an error occurs during state computation | `m3terId: number`, `error: any` | +| `onStateEnqueued` | Called after the state is enqueued to gRPC for device response | `state: any`, `latitude: number`, `longitude: number` | + +## Error Handling & Cleanup Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMessageError` | Called when any error occurs during message processing | `error: any` | +| `onDeviceUnlocked` | Called when a device lock is released (regardless of outcome) | `devEui: string` | +| `onMessageProcessingComplete` | Called when message processing finishes (success or error) | None | + +--- diff --git a/console.config.json b/console.config.json new file mode 100644 index 0000000..2503635 --- /dev/null +++ b/console.config.json @@ -0,0 +1,8 @@ +{ + "modules": [ + "core/arweave", + "core/prover", + "core/streamr", + "core/is_on" + ] +} diff --git a/src/index.ts b/src/index.ts index c2e4d79..3e54d04 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,34 +1,36 @@ import "dotenv/config"; -import { handleUplinks } from "./logic/mqtt"; +import { handleUplinks } from "./services/mqtt"; import { Request, Response } from "express"; -import { app } from "./logic/context"; +import { app } from "./services/context"; +import { loadExtensionsFromConfig, runHook } from "./lib/utils"; import setupDatabase, { getAllMeterRecords, deleteMeterByPublicKey } from "./store/sqlite"; -import { initializeVerifiersCache } from "./logic/sync"; -import { publishHeartbeatToStream } from "./logic/streamr"; // Async initialization function async function initializeApp() { try { console.log("[info] Starting application initialization..."); + // Load extensions from config + await loadExtensionsFromConfig(); + console.log("[info] Extensions loaded successfully"); + + runHook("onBeforeInit"); + // Initialize database tables and jobs setupDatabase(); console.log("[info] Database setup completed"); - // Initialize verifiers cache on startup - // (disable ccip read initialization) - // await initializeVerifiersCache(); - // console.log("[info] Verifiers cache initialized successfully"); + runHook("onDatabaseSetup") // Start MQTT handling - handleUplinks(); - console.log("[info] MQTT uplinks handler started"); - - await publishHeartbeatToStream(); + await handleUplinks(); console.log("[info] Application initialization completed successfully"); + + runHook("onAfterInit"); } catch (error) { console.error("[fatal] Failed to initialize application:", error); + runHook("onInitError", error); process.exit(1); } } diff --git a/src/lib/core/arweave/index.ts b/src/lib/core/arweave/index.ts new file mode 100644 index 0000000..d0e076a --- /dev/null +++ b/src/lib/core/arweave/index.ts @@ -0,0 +1,64 @@ +import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk"; +import { Readable } from "stream"; +import Arweave from "arweave"; +import type { DecodedPayload, Hooks } from "../../../types"; + +export default class implements Hooks { + async onTransactionDistribution(m3terId: number, decoded: DecodedPayload) { + // encode transaction into standard format (payload[0]) + // format: nonce | energy | signature | voltage | device_id | longitude | latitude + const transactionHex = decoded.buf; + + const arweave = Arweave.init({ + host: "arweave.net", + protocol: "https", + port: 443, + }); + + const key = await arweave.wallets.generate(); + const signer = new ArweaveSigner(key); + const turbo = TurboFactory.authenticated({ signer }); + + const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; + + const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); + + await turbo.uploadFile({ + fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), + fileSizeFactory: () => byteLength, + dataItemOpts: { + paidBy: await arweave.wallets.jwkToAddress(key), + tags: [ + { name: "Contract-Label", value: contractLabel }, + { name: "Contract-Use", value: "M3tering Protocol Test" }, + { name: "Content-Type", value: "text/plain" }, + { name: "M3ter-ID", value: m3terId.toString() }, + { name: "Timestamp", value: Date.now().toString() }, + { name: "Nonce", value: decoded.nonce.toString() }, + { name: "Energy", value: decoded.energy.toString() }, + { name: "Signature", value: decoded.signature }, + { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" }, + { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" }, + { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" }, + { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, + ], + }, + events: { + onUploadProgress: (progress) => { + console.log("[arweave] Upload progress:", progress); + }, + onError: (error) => { + console.error("[arweave] Upload error:", error); + }, + onSuccess(event) { + console.log("[arweave] Upload successful! Transaction ID:", event); + }, + onUploadSuccess(event) { + console.log("[arweave] Upload completed! Transaction ID:", event); + }, + }, + }); + + console.log(`[arweave] Uploaded transaction ${decoded.nonce} for M3ter ID ${m3terId} to Arweave.`); + } +} diff --git a/src/lib/core/is_on/index.ts b/src/lib/core/is_on/index.ts new file mode 100644 index 0000000..040e0cf --- /dev/null +++ b/src/lib/core/is_on/index.ts @@ -0,0 +1,7 @@ +import type { Hooks } from "../../../types"; + +export default class implements Hooks { + isOnStateCompute(m3terId: number) { + return true; + } +} diff --git a/src/lib/core/prover/index.ts b/src/lib/core/prover/index.ts new file mode 100644 index 0000000..fcb5383 --- /dev/null +++ b/src/lib/core/prover/index.ts @@ -0,0 +1,66 @@ +import { buildBatchPayload } from "../../utils"; +import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types"; + +const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; + +export default class implements Hooks { + async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { + // send pending transactions to prover node + try { + const proverURL = await getProverURL(); + console.info(`Sending pending transactions to prover: ${proverURL}`); + + const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); + + console.info("done sending to prover"); + console.info(`Prover response (text): ${await response?.text()}`); + } catch (error) { + console.error(`Error sending pending transactions to prover: ${error}`); + } + } +} + +/** + * Send transactions to prover node for verification + */ +export async function sendTransactionsToProver( + proverURL: string, + transactionData: BatchTransactionPayload[] +): Promise { + try { + const response = await fetch(`${proverURL}/batch-payloads`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(transactionData), + }); + + console.log("[info] received", response.status, "from the prover"); + + if (!response.ok) { + throw new Error(`Prover responded with status: ${response.status}`); + } + return response; + } catch (err: any) { + console.error("Failed to send transactions to prover:", err.message); + return null; + } +} + +export async function getProverURL(): Promise { + return PREFERRED_PROVER_NODE; +} + +export async function sendPendingTransactionsToProver( + proverURL: string, + pendingTransactions: TransactionRecord[] +) { + console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); + + const requestPayload = buildBatchPayload(pendingTransactions); + + console.log("[info] Request payload:", requestPayload); + + return await sendTransactionsToProver(proverURL, requestPayload); +} diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts new file mode 100644 index 0000000..a16d375 --- /dev/null +++ b/src/lib/core/streamr/index.ts @@ -0,0 +1,41 @@ +import { StreamrClient } from "@streamr/sdk"; +import { buildBatchPayload, retry } from "../../utils"; +import type { Hooks, TransactionRecord } from "../../../types"; + +const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; + +if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { + throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); +} + +export default class implements Hooks { + async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { + // send pending transactions to streamr + try { + console.info(`Sending pending transactions to streamr`); + + await retry(() => publishPendingTransactionsToStreamr(pendingTransactions), 3); + + console.info(`Successfully sent pending transactions to streamr`); + } catch (error) { + console.error(`Error sending pending transactions to streamr: ${error}`); + } + } +} + +async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { + const streamrClient = new StreamrClient({ + auth: { + privateKey: ETHEREUM_PRIVATE_KEY!, + }, + }); + + const stream = await streamrClient.getStream(STREAMR_STREAM_ID!); + + const batchPayload = buildBatchPayload(pendingTransactions); + + await stream.publish(batchPayload); + + // destroy the client to free resources + await streamrClient.destroy(); +} diff --git a/src/logic/decode.ts b/src/lib/decode.ts similarity index 100% rename from src/logic/decode.ts rename to src/lib/decode.ts diff --git a/src/logic/encode.ts b/src/lib/encode.ts similarity index 100% rename from src/logic/encode.ts rename to src/lib/encode.ts diff --git a/src/logic/sync.ts b/src/lib/sync.ts similarity index 96% rename from src/logic/sync.ts rename to src/lib/sync.ts index 909e397..3ca21e5 100644 --- a/src/logic/sync.ts +++ b/src/lib/sync.ts @@ -11,9 +11,9 @@ import { rollup as rollupContract, ccipRevenueReader as ccipRevenueReaderContract, priceContext as priceContextContract, -} from "./context"; +} from "../services/context"; import { JsonRpcProvider, Contract, ZeroAddress } from "ethers"; -import { retry } from "../utils"; +import { retry } from "./utils"; import type { VerifierInfo } from "../types"; // Cache for verifiers - populated once on startup @@ -79,11 +79,11 @@ export async function initializeVerifiersCache(): Promise { /** * Get cached verifiers, throws error if cache is not initialized */ -function getCachedVerifiers(): VerifierInfo[] { +async function getCachedVerifiers(): Promise { if (!isCacheInitialized || !verifiersCache) { - throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); + await initializeVerifiersCache(); } - return verifiersCache; + return verifiersCache!; } /** @@ -147,7 +147,7 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis export async function getCrossChainRevenue(tokenId: number): Promise { try { // Use cached verifiers instead of fetching them each time - const verifiers = getCachedVerifiers(); + const verifiers = await getCachedVerifiers(); let totalRevenue = 0; diff --git a/src/utils.ts b/src/lib/utils.ts similarity index 60% rename from src/utils.ts rename to src/lib/utils.ts index 0fbacdd..8fb7d56 100644 --- a/src/utils.ts +++ b/src/lib/utils.ts @@ -1,6 +1,38 @@ -import { TransactionRecord, BatchTransactionPayload } from "./types"; +import fs from "fs"; +import path from "path"; import { createPublicKey, verify } from "crypto"; -import os from "os"; +import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig } from "../types"; + +const extensions: Hooks[] = []; + +export async function loadExtensionsFromConfig(configPath: string = "console.config.json"): Promise { + const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8")); + + for (const modulePath of config.modules) { + const resolved = path.resolve(__dirname, modulePath); + + const mod = await import(resolved); + extensions.push(new mod.default()); + } + + return extensions; +} + +export async function runHook(hook: K, ...args: Parameters>) { + let result: ReturnType> | boolean = true; + + for (const ext of extensions) { + const fn = ext[hook]; + let functionReturn; + if (fn) functionReturn = await (fn as any)(...args); + + if (typeof functionReturn === "boolean" && hook === "isOnStateCompute") { + result = result && functionReturn; + } + } + + return result; +} /** * Retries a function up to 5 times with exponential backoff @@ -9,11 +41,7 @@ import os from "os"; * @param baseDelay Base delay in milliseconds (default: 1000) * @returns Promise that resolves with the function result or rejects with the last error */ -export async function retry( - fn: () => Promise, - maxRetries: number = 5, - baseDelay: number = 1000 -): Promise { +export async function retry(fn: () => Promise, maxRetries: number = 5, baseDelay: number = 1000): Promise { let lastError: Error; for (let attempt = 0; attempt <= maxRetries; attempt++) { diff --git a/src/logic/arweave.ts b/src/logic/arweave.ts deleted file mode 100644 index a75f0d8..0000000 --- a/src/logic/arweave.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk"; -import { Readable } from "stream"; -import Arweave from "arweave"; -import type { DecodedPayload } from "../types"; - -export async function interact(m3terId: number, decoded: DecodedPayload) { - // encode transaction into standard format (payload[0]) - // format: nonce | energy | signature | voltage | device_id | longitude | latitude - const transactionHex = decoded.buf; - - const arweave = Arweave.init({ - host: "arweave.net", - protocol: "https", - port: 443, - }); - - const key = await arweave.wallets.generate(); - const signer = new ArweaveSigner(key); - const turbo = TurboFactory.authenticated({ signer }); - - const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; - - const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); - - return await turbo.uploadFile({ - fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), - fileSizeFactory: () => byteLength, - dataItemOpts: { - paidBy: await arweave.wallets.jwkToAddress(key), - tags: [ - { name: "Contract-Label", value: contractLabel }, - { name: "Contract-Use", value: "M3tering Protocol Test" }, - { name: "Content-Type", value: "text/plain" }, - { name: "M3ter-ID", value: m3terId.toString() }, - { name: "Timestamp", value: Date.now().toString() }, - { name: "Nonce", value: decoded.nonce.toString() }, - { name: "Energy", value: decoded.energy.toString() }, - { name: "Signature", value: decoded.signature }, - { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" }, - { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" }, - { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" }, - { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, - ], - }, - events: { - onUploadProgress: (progress) => { - console.log("[arweave] Upload progress:", progress); - }, - onError: (error) => { - console.error("[arweave] Upload error:", error); - }, - onSuccess(event) { - console.log("[arweave] Upload successful! Transaction ID:", event); - }, - onUploadSuccess(event) { - console.log("[arweave] Upload completed! Transaction ID:", event); - }, - }, - }); -} diff --git a/src/logic/gps.ts b/src/logic/gps.ts deleted file mode 100644 index f7d44c3..0000000 --- a/src/logic/gps.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { readFileSync } from "fs"; -import { join } from "path"; - -export function getGPS(): number[] { - try { - const data = readFileSync(join("gps_service", "data.json"), "utf-8"); - const gpsData = JSON.parse(data); - if (gpsData.onBoard.lat && gpsData.onBoard.lon) { - return [gpsData.onBoard.lat.toFixed(2), gpsData.onBoard.lon.toFixed(2)]; - } else { - return [gpsData.wifi.lat.toFixed(2), gpsData.wifi.lon.toFixed(2)]; - } - } catch (err) { - return [0.00, 0.00]; - } -} diff --git a/src/logic/prover.ts b/src/logic/prover.ts deleted file mode 100644 index c4f828a..0000000 --- a/src/logic/prover.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { BatchTransactionPayload, TransactionRecord } from "../types"; -import { rollup } from "./context"; -import { buildBatchPayload } from "../utils"; -import { getAllTransactionRecords } from "../store/sqlite"; - -const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; - -// Prover node structure -export interface ProverNode { - id: string; - url: string; - isActive: boolean; - lastSeen: number; -} - -/** - * Get prover node list from smart contract - * Makes a request to a smart contract to get an array of node structs - */ -export async function getProverNodeList(): Promise { - try { - // TODO: Implement smart contract call to get prover nodes - console.log("Fetching prover node list from smart contract..."); - return []; - } catch (err: any) { - console.error("Failed to get prover node list:", err.message); - return []; - } -} - -/** - * Choose a prover node from the fetched list - * Picks one prover node from the available nodes (e.g., random selection, load balancing, etc.) - */ -export function chooseProverNode(nodes: ProverNode[]): ProverNode | null { - try { - // TODO: Implement node selection logic - // Filter active nodes - const activeNodes = nodes.filter((node) => node.isActive); - - if (activeNodes.length === 0) { - console.warn("No active prover nodes available"); - return null; - } - const randomIndex = Math.floor(Math.random() * activeNodes.length); - const selectedNode = activeNodes[randomIndex]; - - console.log("Selected prover node:", { - id: selectedNode.id, - url: selectedNode.url, - }); - return selectedNode; - } catch (err: any) { - console.error("❌ Failed to choose prover node:", err.message); - return null; - } -} - -/** - * Send transactions to prover node for verification - */ -export async function sendTransactionsToProver( - proverURL: string, - transactionData: BatchTransactionPayload[] -): Promise { - try { - const response = await fetch(`${proverURL}/batch-payloads`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(transactionData), - }); - - console.log("[info] received", response.status, "from the prover"); - - if (!response.ok) { - throw new Error(`Prover responded with status: ${response.status}`); - } - return response; - } catch (err: any) { - console.error("Failed to send transactions to prover:", err.message); - return null; - } -} - -/** - * Check prover node status - */ -export async function checkProverNodeStatus(proverURL: string) { - try { - const response = await fetch(`${proverURL}/status`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - }); - - if (!response.ok) { - return false; - } - - return true; - } catch (err: any) { - console.error("❌ Failed to check prover node status:", err); - return false; - } -} - -/** - * Checks the nonce for a meter id onchain - * - * @returns {Promise} meter nonce onchain - */ -export async function checkNonceOnchain(meterId: string): Promise { - try { - const nonce = await rollup.nonce(meterId); - return nonce; - } catch (err: any) { - console.error("Failed to check nonce onchain:", err); - throw err; - } -} - -export async function getProverURL(): Promise { - return PREFERRED_PROVER_NODE; -} - -export async function sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) { - console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); - - const requestPayload = buildBatchPayload(pendingTransactions); - - console.log("[info] Request payload:", requestPayload); - - return await sendTransactionsToProver(proverURL, requestPayload); -} diff --git a/src/logic/streamr.ts b/src/logic/streamr.ts deleted file mode 100644 index 45ba21c..0000000 --- a/src/logic/streamr.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { StreamrClient } from "@streamr/sdk"; -import { TransactionRecord } from "../types"; -import { buildBatchPayload } from "../utils"; - -const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; - -if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { - throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); -} - -export const streamrClient = new StreamrClient({ - auth: { - privateKey: ETHEREUM_PRIVATE_KEY, - }, -}); - -const stream = streamrClient.getStream(STREAMR_STREAM_ID); - -stream.then((stream) => { - console.log(`[Streamr] Connected to stream: ${stream.id}`); -}).catch((error) => { - console.error("[Streamr] Error connecting to stream:", error); -}); - -async function getStream() { - return await stream; -} - -export async function publishHeartbeatToStream() { - const stream = await getStream(); - const heartbeatPayload = { - timestamp: new Date().toISOString(), - }; - await stream.publish(heartbeatPayload); - console.log("[Streamr] Published heartbeat:", heartbeatPayload); -} - -async function publishToStream(data: any) { - const stream = await getStream(); - await stream.publish(data); -} - -export async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { - await publishToStream(buildBatchPayload(pendingTransactions)); -} \ No newline at end of file diff --git a/src/logic/context.ts b/src/services/context.ts similarity index 100% rename from src/logic/context.ts rename to src/services/context.ts diff --git a/src/logic/grpc.ts b/src/services/grpc.ts similarity index 100% rename from src/logic/grpc.ts rename to src/services/grpc.ts diff --git a/src/logic/mqtt.ts b/src/services/mqtt.ts similarity index 58% rename from src/logic/mqtt.ts rename to src/services/mqtt.ts index 537a9af..9c22601 100644 --- a/src/logic/mqtt.ts +++ b/src/services/mqtt.ts @@ -1,8 +1,7 @@ import { connect } from "mqtt"; import { enqueue } from "./grpc"; -import { interact } from "./arweave"; -import { encode } from "./encode"; -import { m3ter as m3terContract } from "./context"; +import { encode } from "../lib/encode"; +import { app, m3ter as m3terContract } from "./context"; import { deleteMeterByPublicKey, getAllTransactionRecords, @@ -14,64 +13,67 @@ import { updateMeterDevEui, updateMeterNonce, } from "../store/sqlite"; -import { State, TransactionRecord } from "../types"; -import { getProverURL, sendPendingTransactionsToProver } from "./prover"; -import { decodePayload } from "./decode"; -import { verifyPayloadSignature } from "../utils"; -import { - getLatestTransactionNonce, - pruneAndSyncOnchain, - getCrossChainRevenue, - getOwedFromPriceContext, -} from "./sync"; -import { createMeterLogger, MeterLogger } from "../utils/logger"; -import { publishPendingTransactionsToStreamr } from "./streamr"; +import type { State, TransactionRecord } from "../types"; +import { decodePayload } from "../lib/decode"; +import { runHook, verifyPayloadSignature } from "../lib/utils"; +import { getLatestTransactionNonce, pruneAndSyncOnchain, getCrossChainRevenue, getOwedFromPriceContext } from "../lib/sync"; +import { createMeterLogger } from "../utils/logger"; const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST; const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain const deviceLocks = new Map(); // Lock per devEUI to prevent concurrent message processing -export function handleUplinks() { - const client = connect({ - host: CHIRPSTACK_HOST, - port: 1883, - clean: true, - connectTimeout: 9000, - reconnectPeriod: 1000, - }); +export function handleUplinks(): Promise { + return new Promise(function (resolve, reject) { + const client = connect({ + host: CHIRPSTACK_HOST, + port: 1883, + clean: true, + connectTimeout: 9000, + reconnectPeriod: 1000, + }); - client.on("connect", () => { - client.subscribe(`application/${process.env.APPLICATION_ID}/device/+/event/up`, () => { - console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); + client.on("reconnect", () => { + runHook("onMqttReconnect", client); }); - }); - client.on("error", (err) => { - console.error("Connection error: ", err); - client.end(); - process.exit(1); - }); + client.on("message", async (_, blob) => { + return await handleMessage(blob); + }); - client.on("reconnect", () => { - console.log("Reconnecting..."); - }); + client.on("connect", () => { + const topic = `application/${process.env.APPLICATION_ID}/device/+/event/up`; + client.subscribe(topic, () => { + console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); + + runHook("onMqttSubscribed", client, topic); + resolve(true); + }); + + runHook("onMqttConnect", client); + }); + + client.on("error", (err) => { + console.error("Connection error: ", err); - client.on("message", async (_, blob) => { - return await handleMessage(blob); + runHook("onMqttError", err, client); + + client.end(); + reject(err); + }); }); } export async function handleMessage(blob: Buffer) { - let devEui: string | undefined; - let logger: MeterLogger = createMeterLogger({}); // Default logger for early errors + runHook("onMessageReceived", blob); - try { - const message = JSON.parse(blob.toString()); - devEui = message["deviceInfo"]["devEui"]; + const message = JSON.parse(blob.toString()); + const devEui = message["deviceInfo"]["devEui"] || null; - // Initialize logger with devEui context - logger = createMeterLogger({ devEui }); + // Create a logger with devEui context + const logger = createMeterLogger({ devEui }); + try { if (!devEui) { console.log("[warn] Message dropped - no devEui found in message"); return; @@ -80,6 +82,7 @@ export async function handleMessage(blob: Buffer) { // Check if this specific device is already being processed if (deviceLocks.get(devEui)) { logger.warn(`Message dropped - device is already being processed`); + runHook("onMessageDropped", "locked", devEui); return; } @@ -90,7 +93,7 @@ export async function handleMessage(blob: Buffer) { logger.info(`Received uplink from device: ${JSON.stringify(message)}`); - // encode transaction into standard format (payload is hex string) + // encoded transaction in standard format (payload is hex string) // format: nonce | energy | signature | voltage | device_id | longitude | latitude const transactionHex = Buffer.from(message["data"], "base64"); const decoded = decodePayload(transactionHex); @@ -128,12 +131,7 @@ export async function handleMessage(blob: Buffer) { const latestNonce = await getLatestTransactionNonce(tokenId); - // Update logger with tokenId context now that we have it - logger = createMeterLogger({ devEui, tokenId, publicKey: `0x${publicKey}` }); - - logger.info( - `Fetched tokenId and latestNonce from chain and local state: ${tokenId}, ${latestNonce}` - ); + logger.info(`Fetched tokenId and latestNonce from chain and local state: ${tokenId}, ${latestNonce}`); // save new meter with devEui const newMeter = { @@ -145,21 +143,16 @@ export async function handleMessage(blob: Buffer) { const existingMeter = getMeterByTokenId(tokenId); - // incase of the public key being updated + // in-case of the public key being updated if (existingMeter && existingMeter.publicKey !== `0x${publicKey}`) { deleteMeterByPublicKey(`0x${publicKey}`); } saveMeter(newMeter); logger.info(`Saved new meter: ${JSON.stringify(newMeter)}`); - } else { - // Update logger with existing meter context - logger = createMeterLogger({ - devEui, - tokenId: existingMeter.tokenId, - publicKey: `0x${publicKey}`, - }); + runHook("onMeterCreated", newMeter); + } else { // update existing meter with devEui if not already set if (!existingMeter.devEui || existingMeter.devEui !== message["deviceInfo"]["devEui"]) { logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`); @@ -181,9 +174,6 @@ export async function handleMessage(blob: Buffer) { throw new Error("Meter not found for public key: " + publicKey); } - // Update logger with complete meter context - logger = createMeterLogger({ devEui, tokenId: m3ter.tokenId, publicKey: `0x${publicKey}` }); - logger.info(`Found meter: ${JSON.stringify(m3ter)}`); // If both latest nonce and received nonce are 0, enqueue 0 immediately @@ -191,9 +181,9 @@ export async function handleMessage(blob: Buffer) { logger.info("Both latest nonce and received nonce are 0, enqueuing 0 immediately"); try { - is_on = true; // Always on - // (await getCrossChainRevenue(m3ter.tokenId)) >= - // (await getOwedFromPriceContext(m3ter.tokenId)); + is_on = true; // Always on + // (await getCrossChainRevenue(m3ter.tokenId)) >= + // (await getOwedFromPriceContext(m3ter.tokenId)); } catch (error) { logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); } @@ -214,6 +204,8 @@ export async function handleMessage(blob: Buffer) { // sync with blockchain every SYNC_EPOCH transactions await pruneAndSyncOnchain(m3ter.tokenId); + runHook("onSyncEpochReached"); + logger.info(`Synced meter with blockchain: ${m3ter.tokenId}`); m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; @@ -225,25 +217,16 @@ export async function handleMessage(blob: Buffer) { const expectedNonce = m3ter.latestNonce + 1; - logger.info( - `Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}` - ); + logger.info(`Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}`); if (decoded.nonce !== expectedNonce && decoded.nonce !== 0) { - throw new Error( - `Invalid nonce. Expected ${expectedNonce}, got ${decoded.nonce}. Public key: ${publicKey}` - ); + throw new Error(`Invalid nonce. Expected ${expectedNonce}, got ${decoded.nonce}. Public key: ${publicKey}`); } // if device nonce is correct if (decoded.nonce === expectedNonce) { logger.info(`Nonce is valid: ${decoded.nonce}`); - // Upload to arweave - await interact(m3ter.tokenId, decoded); - - logger.info(`Uploaded transaction to Arweave for meter ${m3ter.tokenId}`); - // save transaction to local store const transactionRecord = { nonce: decoded.nonce, @@ -251,59 +234,42 @@ export async function handleMessage(blob: Buffer) { receivedAt: Date.now(), raw: transactionHex.toString("hex"), } as TransactionRecord; - insertTransaction(transactionRecord); - updateMeterNonce(`0x${publicKey}`, expectedNonce); logger.info(`Updated meter nonce to: ${expectedNonce}`); const pendingTransactions = getAllTransactionRecords(); + runHook("onTransactionDistribution", m3ter.tokenId, decoded, pendingTransactions); - // send pending transactions to prover node try { - const proverURL = await getProverURL(); - logger.info(`Sending pending transactions to prover: ${proverURL}`); - - const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); - - logger.info("done sending to prover"); - logger.info(`Prover response (text): ${await response?.text()}`); + is_on = await runHook("isOnStateCompute", m3ter.tokenId); } catch (error) { - logger.error(`Error sending pending transactions to prover: ${error}`); + runHook("onIsOnStateComputeError", m3ter.tokenId, error); + logger.error(`Error in isOnStateCompute hook: ${error}`); } - // send pending transactions to streamr - try { - logger.info(`Sending pending transactions to streamr`); - await publishPendingTransactionsToStreamr(pendingTransactions); - logger.info(`Successfully sent pending transactions to streamr`); - } catch (error) { - logger.error(`Error sending pending transactions to streamr: ${error}`); - } - } + runHook("onIsOnStateComputed", m3ter.tokenId, is_on); - try { - is_on = true; // Always on - // (await getCrossChainRevenue(m3ter.tokenId)) >= - // (await getOwedFromPriceContext(m3ter.tokenId)); - } catch (error) { - logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); - } - const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; + const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; - logger.info(`Enqueuing state: ${JSON.stringify(state)}`); + logger.info(`Enqueuing state: ${JSON.stringify(state)}`); - enqueue( - message["deviceInfo"]["devEui"], - encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) - ); + enqueue( + message["deviceInfo"]["devEui"], + encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) + ); + runHook("onStateEnqueued", state, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0); + } } catch (error) { logger.error(`Error handling MQTT message: ${error}`); + runHook("onMessageError", error); } finally { // Release lock for this specific device if (devEui) { deviceLocks.delete(devEui); + runHook("onDeviceUnlocked", devEui); } + runHook("onMessageProcessingComplete"); } } diff --git a/src/types.ts b/src/types.ts index 8082331..f80f4fd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,45 @@ +import { MqttClient } from "mqtt/*"; + +// Application configuration type (console.config.json) +export type AppConfig = { + modules: string[]; +}; + +// Hooks type for lifecycle events +export type Hooks = { + onBeforeInit?: () => void | Promise; + onDatabaseSetup?: () => void | Promise; + onAfterInit?: () => void | Promise; + onInitError?: (error: any) => void | Promise; + + onMqttConnect?: (client: MqttClient) => void | Promise; + onMqttSubscribed?: (client: MqttClient, topic: string) => void | Promise; + onMqttError?: (error: any, client: MqttClient) => void | Promise; + onMqttReconnect?: (client: MqttClient) => void | Promise; + + onMessageReceived?: (blob: Buffer) => void | Promise; + onMessageDropped?: (reason: string, devEui: string) => void | Promise; + + onMeterCreated?: (newMeter: MeterRecord) => void | Promise; + + onSyncEpochReached?: () => void | Promise; + + onTransactionDistribution?: ( + tokenId: number, + decodedPayload: DecodedPayload, + pendingTransactions: TransactionRecord[] + ) => void | Promise; + + isOnStateCompute?: (m3terId: number) => boolean | Promise; + onIsOnStateComputed?: (m3terId: number, isOn: boolean) => void | Promise; + onIsOnStateComputeError?: (m3terId: number, error: any) => void | Promise; + onStateEnqueued?: (state: any, latitude: number, longitude: number) => void | Promise; + + onMessageError?: (error: any) => void | Promise; + onDeviceUnlocked?: (devEui: string) => void | Promise; + onMessageProcessingComplete?: () => void | Promise; +}; + // Meter interface for database operations export interface MeterRecord { publicKey: string; @@ -50,4 +92,4 @@ export interface VerifierInfo { ensName: string; targetAddress: string; verifierAddress: string; -} \ No newline at end of file +} diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 815b8a1..90b0598 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -8,18 +8,10 @@ export interface MeterLogger { export interface MeterContext { devEui?: string; - tokenId?: number; - publicKey?: string; } export function createMeterLogger(context: MeterContext): MeterLogger { - const prefix = context.tokenId - ? `[Meter-${context.tokenId}${context.devEui ? `|${context.devEui}` : ''}]` - : context.devEui - ? `[Device-${context.devEui}]` - : context.publicKey - ? `[PubKey-${context.publicKey.slice(-8)}]` - : '[Unknown-Meter]'; + const prefix = context.devEui ? `[Device-${context.devEui}]` : "[Unknown-Meter]"; return { info: (message: string) => console.log(`${prefix} [info] ${message}`), @@ -27,4 +19,4 @@ export function createMeterLogger(context: MeterContext): MeterLogger { error: (message: string) => console.error(`${prefix} [error] ${message}`), debug: (message: string) => console.log(`${prefix} [debug] ${message}`), }; -} \ No newline at end of file +} diff --git a/tests/logic/sync.test.ts b/tests/logic/sync.test.ts index 4582f68..de1cb09 100644 --- a/tests/logic/sync.test.ts +++ b/tests/logic/sync.test.ts @@ -4,10 +4,10 @@ import { isVerifiersCacheInitialized, getCachedVerifiersCount, getCrossChainRevenue, -} from "../../src/logic/sync"; +} from "../../src/lib/sync"; // Mock the context module -jest.mock("../../src/logic/context", () => ({ +jest.mock("../../src/services/context", () => ({ provider: { resolveName: jest.fn(), }, @@ -19,7 +19,7 @@ jest.mock("../../src/logic/context", () => ({ })); // Mock the retry utility -jest.mock("../../src/utils", () => ({ +jest.mock("../../src/lib/utils", () => ({ retry: jest.fn((fn) => fn()), })); @@ -30,7 +30,7 @@ describe("Verifiers Cache", () => { }); it("should initialize cache successfully", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock successful responses ccipRevenueReader.verifierCount.mockResolvedValue(2n); @@ -55,7 +55,7 @@ describe("Verifiers Cache", () => { }); it("should throw error if ENS resolution fails (returns null)", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock responses with ENS resolution failure ccipRevenueReader.verifierCount.mockResolvedValue(1n); @@ -69,7 +69,7 @@ describe("Verifiers Cache", () => { }); it("should throw error if ENS resolution fails (returns zero address)", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock responses with ENS resolution failure ccipRevenueReader.verifierCount.mockResolvedValue(1n); @@ -82,17 +82,8 @@ describe("Verifiers Cache", () => { expect(getCachedVerifiersCount()).toBe(0); }); - it("should throw error if cache is not initialized when calling getCrossChainRevenue", async () => { - // Ensure cache is not initialized - expect(isVerifiersCacheInitialized()).toBe(false); - - await expect(getCrossChainRevenue(123)).rejects.toThrow( - "Verifiers cache not initialized. Call initializeVerifiersCache() first." - ); - }); - it("should use cached verifiers for getCrossChainRevenue", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock successful initialization ccipRevenueReader.verifierCount.mockResolvedValue(1n);