From 98bec22ddceb748fbc2da673f90ebd16f71160c4 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 18:59:21 +0100 Subject: [PATCH 1/6] feat: refactor application structure to modularize components and improve initialization process --- console.config.json | 8 + src/index.ts | 26 +-- src/lib/core/arweave/index.ts | 64 +++++++ src/lib/core/is_on/index.ts | 7 + src/lib/core/prover/index.ts | 66 ++++++++ src/lib/core/streamr/index.ts | 51 ++++++ src/lib/utils.ts | 97 +++++++++++ src/logic/mqtt.ts | 309 ---------------------------------- src/types.ts | 44 ++++- src/utils/logger.ts | 12 +- tests/logic/sync.test.ts | 4 +- 11 files changed, 354 insertions(+), 334 deletions(-) create mode 100644 console.config.json create mode 100644 src/lib/core/arweave/index.ts create mode 100644 src/lib/core/is_on/index.ts create mode 100644 src/lib/core/prover/index.ts create mode 100644 src/lib/core/streamr/index.ts create mode 100644 src/lib/utils.ts delete mode 100644 src/logic/mqtt.ts diff --git a/console.config.json b/console.config.json new file mode 100644 index 0000000..2503635 --- /dev/null +++ b/console.config.json @@ -0,0 +1,8 @@ +{ + "modules": [ + "core/arweave", + "core/prover", + "core/streamr", + "core/is_on" + ] +} diff --git a/src/index.ts b/src/index.ts index c2e4d79..3e54d04 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,34 +1,36 @@ import "dotenv/config"; -import { handleUplinks } from "./logic/mqtt"; +import { handleUplinks } from "./services/mqtt"; import { Request, Response } from "express"; -import { app } from "./logic/context"; +import { app } from "./services/context"; +import { loadExtensionsFromConfig, runHook } from "./lib/utils"; import setupDatabase, { getAllMeterRecords, deleteMeterByPublicKey } from "./store/sqlite"; -import { initializeVerifiersCache } from "./logic/sync"; -import { publishHeartbeatToStream } from "./logic/streamr"; // Async initialization function async function initializeApp() { try { console.log("[info] Starting application initialization..."); + // Load extensions from config + await loadExtensionsFromConfig(); + console.log("[info] Extensions loaded successfully"); + + runHook("onBeforeInit"); + // Initialize database tables and jobs setupDatabase(); console.log("[info] Database setup completed"); - // Initialize verifiers cache on startup - // (disable ccip read initialization) - // await initializeVerifiersCache(); - // console.log("[info] Verifiers cache initialized successfully"); + runHook("onDatabaseSetup") // Start MQTT handling - handleUplinks(); - console.log("[info] MQTT uplinks handler started"); - - await publishHeartbeatToStream(); + await handleUplinks(); console.log("[info] Application initialization completed successfully"); + + runHook("onAfterInit"); } catch (error) { console.error("[fatal] Failed to initialize application:", error); + runHook("onInitError", error); process.exit(1); } } diff --git a/src/lib/core/arweave/index.ts b/src/lib/core/arweave/index.ts new file mode 100644 index 0000000..d0e076a --- /dev/null +++ b/src/lib/core/arweave/index.ts @@ -0,0 +1,64 @@ +import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk"; +import { Readable } from "stream"; +import Arweave from "arweave"; +import type { DecodedPayload, Hooks } from "../../../types"; + +export default class implements Hooks { + async onTransactionDistribution(m3terId: number, decoded: DecodedPayload) { + // encode transaction into standard format (payload[0]) + // format: nonce | energy | signature | voltage | device_id | longitude | latitude + const transactionHex = decoded.buf; + + const arweave = Arweave.init({ + host: "arweave.net", + protocol: "https", + port: 443, + }); + + const key = await arweave.wallets.generate(); + const signer = new ArweaveSigner(key); + const turbo = TurboFactory.authenticated({ signer }); + + const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; + + const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); + + await turbo.uploadFile({ + fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), + fileSizeFactory: () => byteLength, + dataItemOpts: { + paidBy: await arweave.wallets.jwkToAddress(key), + tags: [ + { name: "Contract-Label", value: contractLabel }, + { name: "Contract-Use", value: "M3tering Protocol Test" }, + { name: "Content-Type", value: "text/plain" }, + { name: "M3ter-ID", value: m3terId.toString() }, + { name: "Timestamp", value: Date.now().toString() }, + { name: "Nonce", value: decoded.nonce.toString() }, + { name: "Energy", value: decoded.energy.toString() }, + { name: "Signature", value: decoded.signature }, + { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" }, + { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" }, + { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" }, + { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, + ], + }, + events: { + onUploadProgress: (progress) => { + console.log("[arweave] Upload progress:", progress); + }, + onError: (error) => { + console.error("[arweave] Upload error:", error); + }, + onSuccess(event) { + console.log("[arweave] Upload successful! Transaction ID:", event); + }, + onUploadSuccess(event) { + console.log("[arweave] Upload completed! Transaction ID:", event); + }, + }, + }); + + console.log(`[arweave] Uploaded transaction ${decoded.nonce} for M3ter ID ${m3terId} to Arweave.`); + } +} diff --git a/src/lib/core/is_on/index.ts b/src/lib/core/is_on/index.ts new file mode 100644 index 0000000..040e0cf --- /dev/null +++ b/src/lib/core/is_on/index.ts @@ -0,0 +1,7 @@ +import type { Hooks } from "../../../types"; + +export default class implements Hooks { + isOnStateCompute(m3terId: number) { + return true; + } +} diff --git a/src/lib/core/prover/index.ts b/src/lib/core/prover/index.ts new file mode 100644 index 0000000..fcb5383 --- /dev/null +++ b/src/lib/core/prover/index.ts @@ -0,0 +1,66 @@ +import { buildBatchPayload } from "../../utils"; +import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types"; + +const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; + +export default class implements Hooks { + async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { + // send pending transactions to prover node + try { + const proverURL = await getProverURL(); + console.info(`Sending pending transactions to prover: ${proverURL}`); + + const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); + + console.info("done sending to prover"); + console.info(`Prover response (text): ${await response?.text()}`); + } catch (error) { + console.error(`Error sending pending transactions to prover: ${error}`); + } + } +} + +/** + * Send transactions to prover node for verification + */ +export async function sendTransactionsToProver( + proverURL: string, + transactionData: BatchTransactionPayload[] +): Promise { + try { + const response = await fetch(`${proverURL}/batch-payloads`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(transactionData), + }); + + console.log("[info] received", response.status, "from the prover"); + + if (!response.ok) { + throw new Error(`Prover responded with status: ${response.status}`); + } + return response; + } catch (err: any) { + console.error("Failed to send transactions to prover:", err.message); + return null; + } +} + +export async function getProverURL(): Promise { + return PREFERRED_PROVER_NODE; +} + +export async function sendPendingTransactionsToProver( + proverURL: string, + pendingTransactions: TransactionRecord[] +) { + console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); + + const requestPayload = buildBatchPayload(pendingTransactions); + + console.log("[info] Request payload:", requestPayload); + + return await sendTransactionsToProver(proverURL, requestPayload); +} diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts new file mode 100644 index 0000000..bddec0c --- /dev/null +++ b/src/lib/core/streamr/index.ts @@ -0,0 +1,51 @@ +import { StreamrClient } from "@streamr/sdk"; +import { buildBatchPayload } from "../../utils"; +import type { Hooks, TransactionRecord } from "../../../types"; + +const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; + +if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { + throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); +} + +export const streamrClient = new StreamrClient({ + auth: { + privateKey: ETHEREUM_PRIVATE_KEY, + }, +}); + +const stream = streamrClient.getStream(STREAMR_STREAM_ID); + +stream + .then((stream) => { + console.log(`[Streamr] Connected to stream: ${stream.id}`); + }) + .catch((error) => { + console.error("[Streamr] Error connecting to stream:", error); + }); + +export default class implements Hooks { + async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { + // send pending transactions to streamr + try { + console.info(`Sending pending transactions to streamr`); + await publishPendingTransactionsToStreamr(pendingTransactions); + console.info(`Successfully sent pending transactions to streamr`); + } catch (error) { + console.error(`Error sending pending transactions to streamr: ${error}`); + } + } +} + +async function getStream() { + return await stream; +} + +async function publishToStream(data: any) { + const stream = await getStream(); + await stream.publish(data); +} + +async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { + await publishToStream(buildBatchPayload(pendingTransactions)); +} diff --git a/src/lib/utils.ts b/src/lib/utils.ts new file mode 100644 index 0000000..8fb7d56 --- /dev/null +++ b/src/lib/utils.ts @@ -0,0 +1,97 @@ +import fs from "fs"; +import path from "path"; +import { createPublicKey, verify } from "crypto"; +import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig } from "../types"; + +const extensions: Hooks[] = []; + +export async function loadExtensionsFromConfig(configPath: string = "console.config.json"): Promise { + const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8")); + + for (const modulePath of config.modules) { + const resolved = path.resolve(__dirname, modulePath); + + const mod = await import(resolved); + extensions.push(new mod.default()); + } + + return extensions; +} + +export async function runHook(hook: K, ...args: Parameters>) { + let result: ReturnType> | boolean = true; + + for (const ext of extensions) { + const fn = ext[hook]; + let functionReturn; + if (fn) functionReturn = await (fn as any)(...args); + + if (typeof functionReturn === "boolean" && hook === "isOnStateCompute") { + result = result && functionReturn; + } + } + + return result; +} + +/** + * Retries a function up to 5 times with exponential backoff + * @param fn Function to retry + * @param maxRetries Maximum number of retries (default: 5) + * @param baseDelay Base delay in milliseconds (default: 1000) + * @returns Promise that resolves with the function result or rejects with the last error + */ +export async function retry(fn: () => Promise, maxRetries: number = 5, baseDelay: number = 1000): Promise { + let lastError: Error; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error as Error; + + if (attempt === maxRetries) { + throw lastError; + } + + const delay = baseDelay * Math.pow(2, attempt); + console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error); + + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + throw lastError!; +} + +export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] { + return transactions.map((transaction) => ({ + m3ter_id: Number(transaction.identifier), + message: transaction.raw, + })); +} + +export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): boolean { + try { + const message = transaction.subarray(0, 8); + const signature = transaction.subarray(8, 72); + + // Wrap raw key in SPKI DER + const spkiPrefix = Buffer.from("302a300506032b6570032100", "hex"); + const derKey = Buffer.concat([new Uint8Array(spkiPrefix), new Uint8Array(rawPubKey)]); + + const publicKey = createPublicKey({ + key: derKey, + format: "der", + type: "spki", + }); + + // Verify + const ok = verify(null, new Uint8Array(message), publicKey, new Uint8Array(signature)); + + return ok; + } catch (error) { + console.error("Error verifying signature:", error); + return false; + } +} diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts deleted file mode 100644 index 537a9af..0000000 --- a/src/logic/mqtt.ts +++ /dev/null @@ -1,309 +0,0 @@ -import { connect } from "mqtt"; -import { enqueue } from "./grpc"; -import { interact } from "./arweave"; -import { encode } from "./encode"; -import { m3ter as m3terContract } from "./context"; -import { - deleteMeterByPublicKey, - getAllTransactionRecords, - getMeterByDevEui, - getMeterByPublicKey, - getMeterByTokenId, - insertTransaction, - saveMeter, - updateMeterDevEui, - updateMeterNonce, -} from "../store/sqlite"; -import { State, TransactionRecord } from "../types"; -import { getProverURL, sendPendingTransactionsToProver } from "./prover"; -import { decodePayload } from "./decode"; -import { verifyPayloadSignature } from "../utils"; -import { - getLatestTransactionNonce, - pruneAndSyncOnchain, - getCrossChainRevenue, - getOwedFromPriceContext, -} from "./sync"; -import { createMeterLogger, MeterLogger } from "../utils/logger"; -import { publishPendingTransactionsToStreamr } from "./streamr"; - -const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST; -const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain -const deviceLocks = new Map(); // Lock per devEUI to prevent concurrent message processing - -export function handleUplinks() { - const client = connect({ - host: CHIRPSTACK_HOST, - port: 1883, - clean: true, - connectTimeout: 9000, - reconnectPeriod: 1000, - }); - - client.on("connect", () => { - client.subscribe(`application/${process.env.APPLICATION_ID}/device/+/event/up`, () => { - console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); - }); - }); - - client.on("error", (err) => { - console.error("Connection error: ", err); - client.end(); - process.exit(1); - }); - - client.on("reconnect", () => { - console.log("Reconnecting..."); - }); - - client.on("message", async (_, blob) => { - return await handleMessage(blob); - }); -} - -export async function handleMessage(blob: Buffer) { - let devEui: string | undefined; - let logger: MeterLogger = createMeterLogger({}); // Default logger for early errors - - try { - const message = JSON.parse(blob.toString()); - devEui = message["deviceInfo"]["devEui"]; - - // Initialize logger with devEui context - logger = createMeterLogger({ devEui }); - - if (!devEui) { - console.log("[warn] Message dropped - no devEui found in message"); - return; - } - - // Check if this specific device is already being processed - if (deviceLocks.get(devEui)) { - logger.warn(`Message dropped - device is already being processed`); - return; - } - - let is_on = true; - - // Set lock for this specific device - deviceLocks.set(devEui, true); - - logger.info(`Received uplink from device: ${JSON.stringify(message)}`); - - // encode transaction into standard format (payload is hex string) - // format: nonce | energy | signature | voltage | device_id | longitude | latitude - const transactionHex = Buffer.from(message["data"], "base64"); - const decoded = decodePayload(transactionHex); - let publicKey = decoded.extensions.deviceId; - let payloadHadPublicKey = !!publicKey; - - logger.info(`Decoded payload: ${JSON.stringify(decoded)}`); - - if (!publicKey) { - // try to find public key by DevEui - const meterByDevEui = getMeterByDevEui(devEui); - - if (!meterByDevEui) { - throw new Error("Device EUI not associated with any meter: " + devEui); - } - - publicKey = meterByDevEui.publicKey.replace("0x", ""); - } - - // verify transaction signature - const isValid = verifyPayloadSignature(transactionHex, Buffer.from(publicKey!, "hex")); - if (!isValid) { - throw new Error("Invalid transaction signature for meter with public key: " + publicKey); - } - - logger.info("Verified signature"); - - if (payloadHadPublicKey) { - logger.info(`Payload contained public key: ${publicKey}`); - // save public key with device EUI mapping if not already saved - const existingMeter = getMeterByPublicKey(`0x${publicKey}`); - - if (!existingMeter) { - const tokenId = Number(await m3terContract.tokenID(`0x${publicKey}`)); - - const latestNonce = await getLatestTransactionNonce(tokenId); - - // Update logger with tokenId context now that we have it - logger = createMeterLogger({ devEui, tokenId, publicKey: `0x${publicKey}` }); - - logger.info( - `Fetched tokenId and latestNonce from chain and local state: ${tokenId}, ${latestNonce}` - ); - - // save new meter with devEui - const newMeter = { - publicKey: `0x${publicKey}`, - devEui: message["deviceInfo"]["devEui"], - tokenId, - latestNonce, - }; - - const existingMeter = getMeterByTokenId(tokenId); - - // incase of the public key being updated - if (existingMeter && existingMeter.publicKey !== `0x${publicKey}`) { - deleteMeterByPublicKey(`0x${publicKey}`); - } - - saveMeter(newMeter); - logger.info(`Saved new meter: ${JSON.stringify(newMeter)}`); - } else { - // Update logger with existing meter context - logger = createMeterLogger({ - devEui, - tokenId: existingMeter.tokenId, - publicKey: `0x${publicKey}`, - }); - - // update existing meter with devEui if not already set - if (!existingMeter.devEui || existingMeter.devEui !== message["deviceInfo"]["devEui"]) { - logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`); - updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]); - } - - // fetch and update latest nonce from chain - const latestNonce = await getLatestTransactionNonce(existingMeter.tokenId); - - logger.info(`Fetched latestNonce from chain and local state: ${latestNonce}`); - - updateMeterNonce(`0x${publicKey}`, latestNonce); - } - } - - let m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; - - if (!m3ter) { - throw new Error("Meter not found for public key: " + publicKey); - } - - // Update logger with complete meter context - logger = createMeterLogger({ devEui, tokenId: m3ter.tokenId, publicKey: `0x${publicKey}` }); - - logger.info(`Found meter: ${JSON.stringify(m3ter)}`); - - // If both latest nonce and received nonce are 0, enqueue 0 immediately - if (m3ter.latestNonce === 0 && decoded.nonce === 0) { - logger.info("Both latest nonce and received nonce are 0, enqueuing 0 immediately"); - - try { - is_on = true; // Always on - // (await getCrossChainRevenue(m3ter.tokenId)) >= - // (await getOwedFromPriceContext(m3ter.tokenId)); - } catch (error) { - logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); - } - - const state = { nonce: 0, is_on }; - - logger.info(`Enqueuing state: ${JSON.stringify(state)}`); - - enqueue( - message["deviceInfo"]["devEui"], - encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) - ); - - return; // Exit early without processing the transaction - } - - if (m3ter.latestNonce % SYNC_EPOCH === 0) { - // sync with blockchain every SYNC_EPOCH transactions - await pruneAndSyncOnchain(m3ter.tokenId); - - logger.info(`Synced meter with blockchain: ${m3ter.tokenId}`); - - m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; - - if (!m3ter) { - throw new Error("Meter not found after sync for public key: " + publicKey); - } - } - - const expectedNonce = m3ter.latestNonce + 1; - - logger.info( - `Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}` - ); - - if (decoded.nonce !== expectedNonce && decoded.nonce !== 0) { - throw new Error( - `Invalid nonce. Expected ${expectedNonce}, got ${decoded.nonce}. Public key: ${publicKey}` - ); - } - - // if device nonce is correct - if (decoded.nonce === expectedNonce) { - logger.info(`Nonce is valid: ${decoded.nonce}`); - - // Upload to arweave - await interact(m3ter.tokenId, decoded); - - logger.info(`Uploaded transaction to Arweave for meter ${m3ter.tokenId}`); - - // save transaction to local store - const transactionRecord = { - nonce: decoded.nonce, - identifier: m3ter.tokenId, - receivedAt: Date.now(), - raw: transactionHex.toString("hex"), - } as TransactionRecord; - - insertTransaction(transactionRecord); - - updateMeterNonce(`0x${publicKey}`, expectedNonce); - - logger.info(`Updated meter nonce to: ${expectedNonce}`); - - const pendingTransactions = getAllTransactionRecords(); - - // send pending transactions to prover node - try { - const proverURL = await getProverURL(); - logger.info(`Sending pending transactions to prover: ${proverURL}`); - - const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); - - logger.info("done sending to prover"); - logger.info(`Prover response (text): ${await response?.text()}`); - } catch (error) { - logger.error(`Error sending pending transactions to prover: ${error}`); - } - - // send pending transactions to streamr - try { - logger.info(`Sending pending transactions to streamr`); - await publishPendingTransactionsToStreamr(pendingTransactions); - logger.info(`Successfully sent pending transactions to streamr`); - } catch (error) { - logger.error(`Error sending pending transactions to streamr: ${error}`); - } - } - - try { - is_on = true; // Always on - // (await getCrossChainRevenue(m3ter.tokenId)) >= - // (await getOwedFromPriceContext(m3ter.tokenId)); - } catch (error) { - logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); - } - const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; - - logger.info(`Enqueuing state: ${JSON.stringify(state)}`); - - enqueue( - message["deviceInfo"]["devEui"], - encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) - ); - } catch (error) { - logger.error(`Error handling MQTT message: ${error}`); - } finally { - // Release lock for this specific device - if (devEui) { - deviceLocks.delete(devEui); - } - } -} diff --git a/src/types.ts b/src/types.ts index 8082331..f80f4fd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,45 @@ +import { MqttClient } from "mqtt/*"; + +// Application configuration type (console.config.json) +export type AppConfig = { + modules: string[]; +}; + +// Hooks type for lifecycle events +export type Hooks = { + onBeforeInit?: () => void | Promise; + onDatabaseSetup?: () => void | Promise; + onAfterInit?: () => void | Promise; + onInitError?: (error: any) => void | Promise; + + onMqttConnect?: (client: MqttClient) => void | Promise; + onMqttSubscribed?: (client: MqttClient, topic: string) => void | Promise; + onMqttError?: (error: any, client: MqttClient) => void | Promise; + onMqttReconnect?: (client: MqttClient) => void | Promise; + + onMessageReceived?: (blob: Buffer) => void | Promise; + onMessageDropped?: (reason: string, devEui: string) => void | Promise; + + onMeterCreated?: (newMeter: MeterRecord) => void | Promise; + + onSyncEpochReached?: () => void | Promise; + + onTransactionDistribution?: ( + tokenId: number, + decodedPayload: DecodedPayload, + pendingTransactions: TransactionRecord[] + ) => void | Promise; + + isOnStateCompute?: (m3terId: number) => boolean | Promise; + onIsOnStateComputed?: (m3terId: number, isOn: boolean) => void | Promise; + onIsOnStateComputeError?: (m3terId: number, error: any) => void | Promise; + onStateEnqueued?: (state: any, latitude: number, longitude: number) => void | Promise; + + onMessageError?: (error: any) => void | Promise; + onDeviceUnlocked?: (devEui: string) => void | Promise; + onMessageProcessingComplete?: () => void | Promise; +}; + // Meter interface for database operations export interface MeterRecord { publicKey: string; @@ -50,4 +92,4 @@ export interface VerifierInfo { ensName: string; targetAddress: string; verifierAddress: string; -} \ No newline at end of file +} diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 815b8a1..90b0598 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -8,18 +8,10 @@ export interface MeterLogger { export interface MeterContext { devEui?: string; - tokenId?: number; - publicKey?: string; } export function createMeterLogger(context: MeterContext): MeterLogger { - const prefix = context.tokenId - ? `[Meter-${context.tokenId}${context.devEui ? `|${context.devEui}` : ''}]` - : context.devEui - ? `[Device-${context.devEui}]` - : context.publicKey - ? `[PubKey-${context.publicKey.slice(-8)}]` - : '[Unknown-Meter]'; + const prefix = context.devEui ? `[Device-${context.devEui}]` : "[Unknown-Meter]"; return { info: (message: string) => console.log(`${prefix} [info] ${message}`), @@ -27,4 +19,4 @@ export function createMeterLogger(context: MeterContext): MeterLogger { error: (message: string) => console.error(`${prefix} [error] ${message}`), debug: (message: string) => console.log(`${prefix} [debug] ${message}`), }; -} \ No newline at end of file +} diff --git a/tests/logic/sync.test.ts b/tests/logic/sync.test.ts index 4582f68..22d7db9 100644 --- a/tests/logic/sync.test.ts +++ b/tests/logic/sync.test.ts @@ -4,7 +4,7 @@ import { isVerifiersCacheInitialized, getCachedVerifiersCount, getCrossChainRevenue, -} from "../../src/logic/sync"; +} from "../../src/lib/sync"; // Mock the context module jest.mock("../../src/logic/context", () => ({ @@ -19,7 +19,7 @@ jest.mock("../../src/logic/context", () => ({ })); // Mock the retry utility -jest.mock("../../src/utils", () => ({ +jest.mock("../../src/lib/utils", () => ({ retry: jest.fn((fn) => fn()), })); From cb0e5c152d768dff7ac11461b51adb976acfa819 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:00:51 +0100 Subject: [PATCH 2/6] feat: remove deprecated logic files and move grpc and mqtt services --- src/logic/arweave.ts | 60 ------- src/logic/decode.ts | 49 ----- src/logic/encode.ts | 92 ---------- src/logic/gps.ts | 16 -- src/logic/prover.ts | 137 -------------- src/logic/streamr.ts | 45 ----- src/logic/sync.ts | 201 --------------------- src/{logic => services}/context.ts | 0 src/{logic => services}/grpc.ts | 0 src/services/mqtt.ts | 275 +++++++++++++++++++++++++++++ 10 files changed, 275 insertions(+), 600 deletions(-) delete mode 100644 src/logic/arweave.ts delete mode 100644 src/logic/decode.ts delete mode 100644 src/logic/encode.ts delete mode 100644 src/logic/gps.ts delete mode 100644 src/logic/prover.ts delete mode 100644 src/logic/streamr.ts delete mode 100644 src/logic/sync.ts rename src/{logic => services}/context.ts (100%) rename src/{logic => services}/grpc.ts (100%) create mode 100644 src/services/mqtt.ts diff --git a/src/logic/arweave.ts b/src/logic/arweave.ts deleted file mode 100644 index a75f0d8..0000000 --- a/src/logic/arweave.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk"; -import { Readable } from "stream"; -import Arweave from "arweave"; -import type { DecodedPayload } from "../types"; - -export async function interact(m3terId: number, decoded: DecodedPayload) { - // encode transaction into standard format (payload[0]) - // format: nonce | energy | signature | voltage | device_id | longitude | latitude - const transactionHex = decoded.buf; - - const arweave = Arweave.init({ - host: "arweave.net", - protocol: "https", - port: 443, - }); - - const key = await arweave.wallets.generate(); - const signer = new ArweaveSigner(key); - const turbo = TurboFactory.authenticated({ signer }); - - const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; - - const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); - - return await turbo.uploadFile({ - fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), - fileSizeFactory: () => byteLength, - dataItemOpts: { - paidBy: await arweave.wallets.jwkToAddress(key), - tags: [ - { name: "Contract-Label", value: contractLabel }, - { name: "Contract-Use", value: "M3tering Protocol Test" }, - { name: "Content-Type", value: "text/plain" }, - { name: "M3ter-ID", value: m3terId.toString() }, - { name: "Timestamp", value: Date.now().toString() }, - { name: "Nonce", value: decoded.nonce.toString() }, - { name: "Energy", value: decoded.energy.toString() }, - { name: "Signature", value: decoded.signature }, - { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" }, - { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" }, - { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" }, - { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, - ], - }, - events: { - onUploadProgress: (progress) => { - console.log("[arweave] Upload progress:", progress); - }, - onError: (error) => { - console.error("[arweave] Upload error:", error); - }, - onSuccess(event) { - console.log("[arweave] Upload successful! Transaction ID:", event); - }, - onUploadSuccess(event) { - console.log("[arweave] Upload completed! Transaction ID:", event); - }, - }, - }); -} diff --git a/src/logic/decode.ts b/src/logic/decode.ts deleted file mode 100644 index 49c9185..0000000 --- a/src/logic/decode.ts +++ /dev/null @@ -1,49 +0,0 @@ -import type { DecodedPayload } from "../types"; - -export function decodePayload(buf: Buffer) { - if (buf.length < 72) { - throw new Error("Payload too short. Must be at least 72 bytes"); - } - - // --- Core fields --- - const nonce = buf.readUInt32BE(0); - - const rawEnergy = buf.readUInt32BE(4); - const energyKWh = rawEnergy / 1e6; - - const signature = buf.subarray(8, 72).toString("hex"); - - // --- Optional extensions --- - const ext = {} as NonNullable; - let offset = 72; - - if (buf.length >= offset + 2) { - ext.voltage = buf.readUInt16BE(offset) / 10; - offset += 2; - } - - if (buf.length >= offset + 32) { - ext.deviceId = buf.subarray(offset, offset + 32).toString("hex"); - offset += 32; - } - - if (buf.length >= offset + 3) { - let lon = buf.readIntBE(offset, 3); // signed 3-byte int - ext.longitude = lon / 1e5; - offset += 3; - } - - if (buf.length >= offset + 3) { - let lat = buf.readIntBE(offset, 3); // signed 3-byte int - ext.latitude = lat / 1e5; - offset += 3; - } - - return { - nonce, - energy: energyKWh, - signature, - extensions: ext, - buf, - }; -} diff --git a/src/logic/encode.ts b/src/logic/encode.ts deleted file mode 100644 index df9234c..0000000 --- a/src/logic/encode.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { State, TransactionRecord } from "../types"; - -export function intToByteArray(num: number, byteLength: number = 4) { - const byteArray = new Uint8Array(byteLength); - for (let i = 0; i < byteLength; i++) { - byteArray[byteLength - 1 - i] = (num >> (i * 8)) & 0xff; - } - return Array.from(byteArray); -} - -export function stringToByteArray(str: string, length: number | null = null) { - const encoder = new TextEncoder(); // UTF-8 - const encoded = encoder.encode(str); - const byteArray = new Uint8Array(length === null ? encoded.length : length); - byteArray.set(encoded.slice(0, length == null ? encoded.length : length)); // truncate if longer - return Array.from(byteArray); -} - -function floatToByteArray(float: number) { - const buffer = new ArrayBuffer(4); // 4 bytes for a float (32-bit) - const view = new DataView(buffer); - view.setFloat32(0, float, false); // false for Big Endian - return Array.from(new Uint8Array(buffer)); -} - -/** - * - * @notice only needs `nonce` from the state - */ -export function encode(state: State, latitude: number, longitude: number) { - let responseBytes = floatToByteArray(latitude).concat( - floatToByteArray(longitude) - ); - - let nonce = state.nonce; - - if (nonce) { - responseBytes = intToByteArray(nonce).concat(responseBytes); - } - - responseBytes.unshift(state.is_on ? 1 : 0); - return responseBytes; -} - -/** - * Encode transaction data into a byte array. - * - * format: nonce(4 bytes) | energy (4 bytes) | signature(64 bytes) | voltage(2 bytes) | device_id(32 bytes) | longitude(3 bytes) | latitude(3 bytes) - * - * @param nonce - The transaction nonce. - * @param energy - The energy value. - * @param signature - The transaction signature. - * @param voltage - The voltage value. - * @param deviceId - The device ID. - * @param longitude - The longitude value. - * @returns The encoded transaction byte array. - */ -export function encodeTransaction({ - nonce, - energy, - signature, - voltage, - deviceId, - longitude, - latitude, -}: { - nonce: number; - energy: number; - signature: string; - voltage: number; - deviceId: string; - longitude: number; - latitude: number; -}) { - const encodedNonce = intToByteArray(nonce, 4); - const encodedEnergy = intToByteArray(energy * 10e6, 4); - const encodedSignature = stringToByteArray(signature, 64); - const encodedVoltage = intToByteArray(voltage * 10, 2); - const encodedDeviceId = stringToByteArray(deviceId, 32); - const encodedLongitude = intToByteArray(longitude * 10e5, 3); - const encodedLatitude = intToByteArray(latitude * 10e5, 3); - - return [ - ...encodedNonce, - ...encodedEnergy, - ...encodedSignature, - ...encodedVoltage, - ...encodedDeviceId, - ...encodedLongitude, - ...encodedLatitude, - ]; -} diff --git a/src/logic/gps.ts b/src/logic/gps.ts deleted file mode 100644 index f7d44c3..0000000 --- a/src/logic/gps.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { readFileSync } from "fs"; -import { join } from "path"; - -export function getGPS(): number[] { - try { - const data = readFileSync(join("gps_service", "data.json"), "utf-8"); - const gpsData = JSON.parse(data); - if (gpsData.onBoard.lat && gpsData.onBoard.lon) { - return [gpsData.onBoard.lat.toFixed(2), gpsData.onBoard.lon.toFixed(2)]; - } else { - return [gpsData.wifi.lat.toFixed(2), gpsData.wifi.lon.toFixed(2)]; - } - } catch (err) { - return [0.00, 0.00]; - } -} diff --git a/src/logic/prover.ts b/src/logic/prover.ts deleted file mode 100644 index c4f828a..0000000 --- a/src/logic/prover.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { BatchTransactionPayload, TransactionRecord } from "../types"; -import { rollup } from "./context"; -import { buildBatchPayload } from "../utils"; -import { getAllTransactionRecords } from "../store/sqlite"; - -const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; - -// Prover node structure -export interface ProverNode { - id: string; - url: string; - isActive: boolean; - lastSeen: number; -} - -/** - * Get prover node list from smart contract - * Makes a request to a smart contract to get an array of node structs - */ -export async function getProverNodeList(): Promise { - try { - // TODO: Implement smart contract call to get prover nodes - console.log("Fetching prover node list from smart contract..."); - return []; - } catch (err: any) { - console.error("Failed to get prover node list:", err.message); - return []; - } -} - -/** - * Choose a prover node from the fetched list - * Picks one prover node from the available nodes (e.g., random selection, load balancing, etc.) - */ -export function chooseProverNode(nodes: ProverNode[]): ProverNode | null { - try { - // TODO: Implement node selection logic - // Filter active nodes - const activeNodes = nodes.filter((node) => node.isActive); - - if (activeNodes.length === 0) { - console.warn("No active prover nodes available"); - return null; - } - const randomIndex = Math.floor(Math.random() * activeNodes.length); - const selectedNode = activeNodes[randomIndex]; - - console.log("Selected prover node:", { - id: selectedNode.id, - url: selectedNode.url, - }); - return selectedNode; - } catch (err: any) { - console.error("❌ Failed to choose prover node:", err.message); - return null; - } -} - -/** - * Send transactions to prover node for verification - */ -export async function sendTransactionsToProver( - proverURL: string, - transactionData: BatchTransactionPayload[] -): Promise { - try { - const response = await fetch(`${proverURL}/batch-payloads`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(transactionData), - }); - - console.log("[info] received", response.status, "from the prover"); - - if (!response.ok) { - throw new Error(`Prover responded with status: ${response.status}`); - } - return response; - } catch (err: any) { - console.error("Failed to send transactions to prover:", err.message); - return null; - } -} - -/** - * Check prover node status - */ -export async function checkProverNodeStatus(proverURL: string) { - try { - const response = await fetch(`${proverURL}/status`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - }); - - if (!response.ok) { - return false; - } - - return true; - } catch (err: any) { - console.error("❌ Failed to check prover node status:", err); - return false; - } -} - -/** - * Checks the nonce for a meter id onchain - * - * @returns {Promise} meter nonce onchain - */ -export async function checkNonceOnchain(meterId: string): Promise { - try { - const nonce = await rollup.nonce(meterId); - return nonce; - } catch (err: any) { - console.error("Failed to check nonce onchain:", err); - throw err; - } -} - -export async function getProverURL(): Promise { - return PREFERRED_PROVER_NODE; -} - -export async function sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) { - console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); - - const requestPayload = buildBatchPayload(pendingTransactions); - - console.log("[info] Request payload:", requestPayload); - - return await sendTransactionsToProver(proverURL, requestPayload); -} diff --git a/src/logic/streamr.ts b/src/logic/streamr.ts deleted file mode 100644 index 45ba21c..0000000 --- a/src/logic/streamr.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { StreamrClient } from "@streamr/sdk"; -import { TransactionRecord } from "../types"; -import { buildBatchPayload } from "../utils"; - -const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; - -if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { - throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); -} - -export const streamrClient = new StreamrClient({ - auth: { - privateKey: ETHEREUM_PRIVATE_KEY, - }, -}); - -const stream = streamrClient.getStream(STREAMR_STREAM_ID); - -stream.then((stream) => { - console.log(`[Streamr] Connected to stream: ${stream.id}`); -}).catch((error) => { - console.error("[Streamr] Error connecting to stream:", error); -}); - -async function getStream() { - return await stream; -} - -export async function publishHeartbeatToStream() { - const stream = await getStream(); - const heartbeatPayload = { - timestamp: new Date().toISOString(), - }; - await stream.publish(heartbeatPayload); - console.log("[Streamr] Published heartbeat:", heartbeatPayload); -} - -async function publishToStream(data: any) { - const stream = await getStream(); - await stream.publish(data); -} - -export async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { - await publishToStream(buildBatchPayload(pendingTransactions)); -} \ No newline at end of file diff --git a/src/logic/sync.ts b/src/logic/sync.ts deleted file mode 100644 index 909e397..0000000 --- a/src/logic/sync.ts +++ /dev/null @@ -1,201 +0,0 @@ -import { - getMeterByPublicKey, - getMeterByTokenId, - getTransactionByNonce, - pruneTransactionsAfter, - pruneTransactionsBefore, - updateMeterNonce, -} from "../store/sqlite"; -import { - provider, - rollup as rollupContract, - ccipRevenueReader as ccipRevenueReaderContract, - priceContext as priceContextContract, -} from "./context"; -import { JsonRpcProvider, Contract, ZeroAddress } from "ethers"; -import { retry } from "../utils"; -import type { VerifierInfo } from "../types"; - -// Cache for verifiers - populated once on startup -let verifiersCache: VerifierInfo[] | null = null; -let isCacheInitialized = false; - -/** - * Initialize verifiers cache on program startup - * Fetches all verifiers and resolves their ENS names once - * Throws error if any fetch/resolution fails - */ -export async function initializeVerifiersCache(): Promise { - try { - console.log("[info] Initializing verifiers cache..."); - - // Get the number of verifiers - const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount())); - console.log(`[info] Found ${verifierCount} verifiers to cache`); - - const verifiers: VerifierInfo[] = []; - - // Fetch all verifiers and resolve their ENS names - for (let i = 0; i < verifierCount; i++) { - try { - // Get verifier info (ensName, targetContractAddress) - const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i)); - - console.log(`[info] Fetching verifier ${i}: ENS: ${ensName}, target: ${targetAddress}`); - - // Resolve ENS name to get the verifier address - const verifierAddress = await retry(() => provider.resolveName(ensName)); - - if (!verifierAddress || verifierAddress === ZeroAddress) { - throw new Error(`Failed to resolve ENS name: ${ensName}`); - } - - console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`); - - verifiers.push({ - ensName, - targetAddress, - verifierAddress, - }); - } catch (error) { - console.error(`[error] Failed to initialize verifier ${i}:`, error); - throw error; // Fail fast as requested - } - } - - // Cache the verifiers - verifiersCache = verifiers; - isCacheInitialized = true; - - console.log(`[info] Successfully cached ${verifiers.length} verifiers`); - } catch (error) { - console.error("[error] Failed to initialize verifiers cache:", error); - isCacheInitialized = false; - verifiersCache = null; - throw error; - } -} - -/** - * Get cached verifiers, throws error if cache is not initialized - */ -function getCachedVerifiers(): VerifierInfo[] { - if (!isCacheInitialized || !verifiersCache) { - throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); - } - return verifiersCache; -} - -/** - * Check if verifiers cache is initialized - */ -export function isVerifiersCacheInitialized(): boolean { - return isCacheInitialized && verifiersCache !== null; -} - -/** - * Get the number of cached verifiers - */ -export function getCachedVerifiersCount(): number { - return verifiersCache?.length ?? 0; -} - -export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise { - const meter = - typeof meterIdentifier === "number" - ? getMeterByTokenId(meterIdentifier) - : getMeterByPublicKey(meterIdentifier); - - if (!meter) { - throw new Error(`Meter with identifier ${meterIdentifier} not found`); - } - - // Check the latest nonce on the blockchain - const onchainNonce = Number(await rollupContract.nonce(meter.tokenId)); - const latestNonce = meter.latestNonce; - - if (onchainNonce > latestNonce) { - const publicKey = meter.publicKey; - // If the onchain nonce is greater, update the local record - updateMeterNonce(publicKey, onchainNonce); - } - // prune transactions with nonce less than or equal to onchainNonce - pruneTransactionsBefore(meter.tokenId, onchainNonce); - - return onchainNonce; -} - -export async function getLatestTransactionNonce(meterIdentifier: number): Promise { - // get latest nonce from chain - let latestNonce = Number(await rollupContract.nonce(meterIdentifier)); - - // check local state for the highest nonce we have - while (true) { - const existingTransaction = getTransactionByNonce(latestNonce + 1, meterIdentifier); - if (existingTransaction) { - latestNonce += 1; - } else { - pruneTransactionsAfter(latestNonce, meterIdentifier); - break; - } - } - - return latestNonce; -} - -// get revenue across suppored chains -export async function getCrossChainRevenue(tokenId: number): Promise { - try { - // Use cached verifiers instead of fetching them each time - const verifiers = getCachedVerifiers(); - - let totalRevenue = 0; - - // Iterate through all cached verifiers and get revenue from each chain - for (const verifier of verifiers) { - try { - console.log(`[info] Getting revenue from ENS: ${verifier.ensName}, target: ${verifier.targetAddress}, verifier: ${verifier.verifierAddress}`); - - // Get revenue from this specific chain using CCIP read - // Parameters: tokenId, target (L2 contract), verifier (resolved from ENS) - const revenue = await retry(() => - ccipRevenueReaderContract.read(tokenId, verifier.targetAddress, verifier.verifierAddress, { - enableCcipRead: true, - }) - ); - const revenueAmount = Number(revenue); - - console.log(`[info] Revenue from ${verifier.ensName} (${verifier.verifierAddress}): ${revenueAmount}`); - totalRevenue += revenueAmount; - } catch (error) { - console.error(`[error] Failed to get revenue from verifier ${verifier.ensName}:`, error); - // Continue with other verifiers even if one fails - } - } - - console.log(`[info] Total cross-chain revenue for token ${tokenId}: ${totalRevenue}`); - return totalRevenue; - } catch (error) { - console.error(`[error] Failed to get cross-chain revenue for token ${tokenId}:`, error); - throw error; - } -} - -// get owed from price context -export async function getOwedFromPriceContext(tokenId: number): Promise { - try { - return await retry(async () => { - console.log(`[info] Getting owed amount for token ${tokenId} from price context`); - - // Call the price context to get the amount the user owes with CCIP read enabled - const owedAmount = await priceContextContract.owed(tokenId); - const owed = Number(owedAmount); - - console.log(`[info] Owed amount for token ${tokenId}: ${owed}`); - return owed; - }); - } catch (error) { - console.error(`[error] Failed to get owed amount for token ${tokenId}:`, error); - throw error; - } -} diff --git a/src/logic/context.ts b/src/services/context.ts similarity index 100% rename from src/logic/context.ts rename to src/services/context.ts diff --git a/src/logic/grpc.ts b/src/services/grpc.ts similarity index 100% rename from src/logic/grpc.ts rename to src/services/grpc.ts diff --git a/src/services/mqtt.ts b/src/services/mqtt.ts new file mode 100644 index 0000000..9c22601 --- /dev/null +++ b/src/services/mqtt.ts @@ -0,0 +1,275 @@ +import { connect } from "mqtt"; +import { enqueue } from "./grpc"; +import { encode } from "../lib/encode"; +import { app, m3ter as m3terContract } from "./context"; +import { + deleteMeterByPublicKey, + getAllTransactionRecords, + getMeterByDevEui, + getMeterByPublicKey, + getMeterByTokenId, + insertTransaction, + saveMeter, + updateMeterDevEui, + updateMeterNonce, +} from "../store/sqlite"; +import type { State, TransactionRecord } from "../types"; +import { decodePayload } from "../lib/decode"; +import { runHook, verifyPayloadSignature } from "../lib/utils"; +import { getLatestTransactionNonce, pruneAndSyncOnchain, getCrossChainRevenue, getOwedFromPriceContext } from "../lib/sync"; +import { createMeterLogger } from "../utils/logger"; + +const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST; +const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain +const deviceLocks = new Map(); // Lock per devEUI to prevent concurrent message processing + +export function handleUplinks(): Promise { + return new Promise(function (resolve, reject) { + const client = connect({ + host: CHIRPSTACK_HOST, + port: 1883, + clean: true, + connectTimeout: 9000, + reconnectPeriod: 1000, + }); + + client.on("reconnect", () => { + runHook("onMqttReconnect", client); + }); + + client.on("message", async (_, blob) => { + return await handleMessage(blob); + }); + + client.on("connect", () => { + const topic = `application/${process.env.APPLICATION_ID}/device/+/event/up`; + client.subscribe(topic, () => { + console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); + + runHook("onMqttSubscribed", client, topic); + resolve(true); + }); + + runHook("onMqttConnect", client); + }); + + client.on("error", (err) => { + console.error("Connection error: ", err); + + runHook("onMqttError", err, client); + + client.end(); + reject(err); + }); + }); +} + +export async function handleMessage(blob: Buffer) { + runHook("onMessageReceived", blob); + + const message = JSON.parse(blob.toString()); + const devEui = message["deviceInfo"]["devEui"] || null; + + // Create a logger with devEui context + const logger = createMeterLogger({ devEui }); + + try { + if (!devEui) { + console.log("[warn] Message dropped - no devEui found in message"); + return; + } + + // Check if this specific device is already being processed + if (deviceLocks.get(devEui)) { + logger.warn(`Message dropped - device is already being processed`); + runHook("onMessageDropped", "locked", devEui); + return; + } + + let is_on = true; + + // Set lock for this specific device + deviceLocks.set(devEui, true); + + logger.info(`Received uplink from device: ${JSON.stringify(message)}`); + + // encoded transaction in standard format (payload is hex string) + // format: nonce | energy | signature | voltage | device_id | longitude | latitude + const transactionHex = Buffer.from(message["data"], "base64"); + const decoded = decodePayload(transactionHex); + let publicKey = decoded.extensions.deviceId; + let payloadHadPublicKey = !!publicKey; + + logger.info(`Decoded payload: ${JSON.stringify(decoded)}`); + + if (!publicKey) { + // try to find public key by DevEui + const meterByDevEui = getMeterByDevEui(devEui); + + if (!meterByDevEui) { + throw new Error("Device EUI not associated with any meter: " + devEui); + } + + publicKey = meterByDevEui.publicKey.replace("0x", ""); + } + + // verify transaction signature + const isValid = verifyPayloadSignature(transactionHex, Buffer.from(publicKey!, "hex")); + if (!isValid) { + throw new Error("Invalid transaction signature for meter with public key: " + publicKey); + } + + logger.info("Verified signature"); + + if (payloadHadPublicKey) { + logger.info(`Payload contained public key: ${publicKey}`); + // save public key with device EUI mapping if not already saved + const existingMeter = getMeterByPublicKey(`0x${publicKey}`); + + if (!existingMeter) { + const tokenId = Number(await m3terContract.tokenID(`0x${publicKey}`)); + + const latestNonce = await getLatestTransactionNonce(tokenId); + + logger.info(`Fetched tokenId and latestNonce from chain and local state: ${tokenId}, ${latestNonce}`); + + // save new meter with devEui + const newMeter = { + publicKey: `0x${publicKey}`, + devEui: message["deviceInfo"]["devEui"], + tokenId, + latestNonce, + }; + + const existingMeter = getMeterByTokenId(tokenId); + + // in-case of the public key being updated + if (existingMeter && existingMeter.publicKey !== `0x${publicKey}`) { + deleteMeterByPublicKey(`0x${publicKey}`); + } + + saveMeter(newMeter); + logger.info(`Saved new meter: ${JSON.stringify(newMeter)}`); + + runHook("onMeterCreated", newMeter); + } else { + // update existing meter with devEui if not already set + if (!existingMeter.devEui || existingMeter.devEui !== message["deviceInfo"]["devEui"]) { + logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`); + updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]); + } + + // fetch and update latest nonce from chain + const latestNonce = await getLatestTransactionNonce(existingMeter.tokenId); + + logger.info(`Fetched latestNonce from chain and local state: ${latestNonce}`); + + updateMeterNonce(`0x${publicKey}`, latestNonce); + } + } + + let m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; + + if (!m3ter) { + throw new Error("Meter not found for public key: " + publicKey); + } + + logger.info(`Found meter: ${JSON.stringify(m3ter)}`); + + // If both latest nonce and received nonce are 0, enqueue 0 immediately + if (m3ter.latestNonce === 0 && decoded.nonce === 0) { + logger.info("Both latest nonce and received nonce are 0, enqueuing 0 immediately"); + + try { + is_on = true; // Always on + // (await getCrossChainRevenue(m3ter.tokenId)) >= + // (await getOwedFromPriceContext(m3ter.tokenId)); + } catch (error) { + logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); + } + + const state = { nonce: 0, is_on }; + + logger.info(`Enqueuing state: ${JSON.stringify(state)}`); + + enqueue( + message["deviceInfo"]["devEui"], + encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) + ); + + return; // Exit early without processing the transaction + } + + if (m3ter.latestNonce % SYNC_EPOCH === 0) { + // sync with blockchain every SYNC_EPOCH transactions + await pruneAndSyncOnchain(m3ter.tokenId); + + runHook("onSyncEpochReached"); + + logger.info(`Synced meter with blockchain: ${m3ter.tokenId}`); + + m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; + + if (!m3ter) { + throw new Error("Meter not found after sync for public key: " + publicKey); + } + } + + const expectedNonce = m3ter.latestNonce + 1; + + logger.info(`Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}`); + + if (decoded.nonce !== expectedNonce && decoded.nonce !== 0) { + throw new Error(`Invalid nonce. Expected ${expectedNonce}, got ${decoded.nonce}. Public key: ${publicKey}`); + } + + // if device nonce is correct + if (decoded.nonce === expectedNonce) { + logger.info(`Nonce is valid: ${decoded.nonce}`); + + // save transaction to local store + const transactionRecord = { + nonce: decoded.nonce, + identifier: m3ter.tokenId, + receivedAt: Date.now(), + raw: transactionHex.toString("hex"), + } as TransactionRecord; + insertTransaction(transactionRecord); + updateMeterNonce(`0x${publicKey}`, expectedNonce); + + logger.info(`Updated meter nonce to: ${expectedNonce}`); + + const pendingTransactions = getAllTransactionRecords(); + runHook("onTransactionDistribution", m3ter.tokenId, decoded, pendingTransactions); + + try { + is_on = await runHook("isOnStateCompute", m3ter.tokenId); + } catch (error) { + runHook("onIsOnStateComputeError", m3ter.tokenId, error); + logger.error(`Error in isOnStateCompute hook: ${error}`); + } + + runHook("onIsOnStateComputed", m3ter.tokenId, is_on); + + const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; + + logger.info(`Enqueuing state: ${JSON.stringify(state)}`); + + enqueue( + message["deviceInfo"]["devEui"], + encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) + ); + runHook("onStateEnqueued", state, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0); + } + } catch (error) { + logger.error(`Error handling MQTT message: ${error}`); + runHook("onMessageError", error); + } finally { + // Release lock for this specific device + if (devEui) { + deviceLocks.delete(devEui); + runHook("onDeviceUnlocked", devEui); + } + runHook("onMessageProcessingComplete"); + } +} From da21690aca41abe2909ee7ecdafc567608db38dc Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:01:42 +0100 Subject: [PATCH 3/6] feat: add modular decode and encode functions, initialize verifiers cache, and remove unused utils --- .vscode/settings.json | 10 +++ src/lib/decode.ts | 49 ++++++++++ src/lib/encode.ts | 92 +++++++++++++++++++ src/lib/sync.ts | 201 ++++++++++++++++++++++++++++++++++++++++++ src/utils.ts | 69 --------------- 5 files changed, 352 insertions(+), 69 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 src/lib/decode.ts create mode 100644 src/lib/encode.ts create mode 100644 src/lib/sync.ts delete mode 100644 src/utils.ts diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7d1e594 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,10 @@ +{ + "cSpell.words": [ + "ardrive", + "arweave", + "m3ters", + "Emmo00", + "ccip", + "Mauchly", + ] +} \ No newline at end of file diff --git a/src/lib/decode.ts b/src/lib/decode.ts new file mode 100644 index 0000000..49c9185 --- /dev/null +++ b/src/lib/decode.ts @@ -0,0 +1,49 @@ +import type { DecodedPayload } from "../types"; + +export function decodePayload(buf: Buffer) { + if (buf.length < 72) { + throw new Error("Payload too short. Must be at least 72 bytes"); + } + + // --- Core fields --- + const nonce = buf.readUInt32BE(0); + + const rawEnergy = buf.readUInt32BE(4); + const energyKWh = rawEnergy / 1e6; + + const signature = buf.subarray(8, 72).toString("hex"); + + // --- Optional extensions --- + const ext = {} as NonNullable; + let offset = 72; + + if (buf.length >= offset + 2) { + ext.voltage = buf.readUInt16BE(offset) / 10; + offset += 2; + } + + if (buf.length >= offset + 32) { + ext.deviceId = buf.subarray(offset, offset + 32).toString("hex"); + offset += 32; + } + + if (buf.length >= offset + 3) { + let lon = buf.readIntBE(offset, 3); // signed 3-byte int + ext.longitude = lon / 1e5; + offset += 3; + } + + if (buf.length >= offset + 3) { + let lat = buf.readIntBE(offset, 3); // signed 3-byte int + ext.latitude = lat / 1e5; + offset += 3; + } + + return { + nonce, + energy: energyKWh, + signature, + extensions: ext, + buf, + }; +} diff --git a/src/lib/encode.ts b/src/lib/encode.ts new file mode 100644 index 0000000..df9234c --- /dev/null +++ b/src/lib/encode.ts @@ -0,0 +1,92 @@ +import { State, TransactionRecord } from "../types"; + +export function intToByteArray(num: number, byteLength: number = 4) { + const byteArray = new Uint8Array(byteLength); + for (let i = 0; i < byteLength; i++) { + byteArray[byteLength - 1 - i] = (num >> (i * 8)) & 0xff; + } + return Array.from(byteArray); +} + +export function stringToByteArray(str: string, length: number | null = null) { + const encoder = new TextEncoder(); // UTF-8 + const encoded = encoder.encode(str); + const byteArray = new Uint8Array(length === null ? encoded.length : length); + byteArray.set(encoded.slice(0, length == null ? encoded.length : length)); // truncate if longer + return Array.from(byteArray); +} + +function floatToByteArray(float: number) { + const buffer = new ArrayBuffer(4); // 4 bytes for a float (32-bit) + const view = new DataView(buffer); + view.setFloat32(0, float, false); // false for Big Endian + return Array.from(new Uint8Array(buffer)); +} + +/** + * + * @notice only needs `nonce` from the state + */ +export function encode(state: State, latitude: number, longitude: number) { + let responseBytes = floatToByteArray(latitude).concat( + floatToByteArray(longitude) + ); + + let nonce = state.nonce; + + if (nonce) { + responseBytes = intToByteArray(nonce).concat(responseBytes); + } + + responseBytes.unshift(state.is_on ? 1 : 0); + return responseBytes; +} + +/** + * Encode transaction data into a byte array. + * + * format: nonce(4 bytes) | energy (4 bytes) | signature(64 bytes) | voltage(2 bytes) | device_id(32 bytes) | longitude(3 bytes) | latitude(3 bytes) + * + * @param nonce - The transaction nonce. + * @param energy - The energy value. + * @param signature - The transaction signature. + * @param voltage - The voltage value. + * @param deviceId - The device ID. + * @param longitude - The longitude value. + * @returns The encoded transaction byte array. + */ +export function encodeTransaction({ + nonce, + energy, + signature, + voltage, + deviceId, + longitude, + latitude, +}: { + nonce: number; + energy: number; + signature: string; + voltage: number; + deviceId: string; + longitude: number; + latitude: number; +}) { + const encodedNonce = intToByteArray(nonce, 4); + const encodedEnergy = intToByteArray(energy * 10e6, 4); + const encodedSignature = stringToByteArray(signature, 64); + const encodedVoltage = intToByteArray(voltage * 10, 2); + const encodedDeviceId = stringToByteArray(deviceId, 32); + const encodedLongitude = intToByteArray(longitude * 10e5, 3); + const encodedLatitude = intToByteArray(latitude * 10e5, 3); + + return [ + ...encodedNonce, + ...encodedEnergy, + ...encodedSignature, + ...encodedVoltage, + ...encodedDeviceId, + ...encodedLongitude, + ...encodedLatitude, + ]; +} diff --git a/src/lib/sync.ts b/src/lib/sync.ts new file mode 100644 index 0000000..1a241ea --- /dev/null +++ b/src/lib/sync.ts @@ -0,0 +1,201 @@ +import { + getMeterByPublicKey, + getMeterByTokenId, + getTransactionByNonce, + pruneTransactionsAfter, + pruneTransactionsBefore, + updateMeterNonce, +} from "../store/sqlite"; +import { + provider, + rollup as rollupContract, + ccipRevenueReader as ccipRevenueReaderContract, + priceContext as priceContextContract, +} from "../services/context"; +import { JsonRpcProvider, Contract, ZeroAddress } from "ethers"; +import { retry } from "./utils"; +import type { VerifierInfo } from "../types"; + +// Cache for verifiers - populated once on startup +let verifiersCache: VerifierInfo[] | null = null; +let isCacheInitialized = false; + +/** + * Initialize verifiers cache on program startup + * Fetches all verifiers and resolves their ENS names once + * Throws error if any fetch/resolution fails + */ +export async function initializeVerifiersCache(): Promise { + try { + console.log("[info] Initializing verifiers cache..."); + + // Get the number of verifiers + const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount())); + console.log(`[info] Found ${verifierCount} verifiers to cache`); + + const verifiers: VerifierInfo[] = []; + + // Fetch all verifiers and resolve their ENS names + for (let i = 0; i < verifierCount; i++) { + try { + // Get verifier info (ensName, targetContractAddress) + const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i)); + + console.log(`[info] Fetching verifier ${i}: ENS: ${ensName}, target: ${targetAddress}`); + + // Resolve ENS name to get the verifier address + const verifierAddress = await retry(() => provider.resolveName(ensName)); + + if (!verifierAddress || verifierAddress === ZeroAddress) { + throw new Error(`Failed to resolve ENS name: ${ensName}`); + } + + console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`); + + verifiers.push({ + ensName, + targetAddress, + verifierAddress, + }); + } catch (error) { + console.error(`[error] Failed to initialize verifier ${i}:`, error); + throw error; // Fail fast as requested + } + } + + // Cache the verifiers + verifiersCache = verifiers; + isCacheInitialized = true; + + console.log(`[info] Successfully cached ${verifiers.length} verifiers`); + } catch (error) { + console.error("[error] Failed to initialize verifiers cache:", error); + isCacheInitialized = false; + verifiersCache = null; + throw error; + } +} + +/** + * Get cached verifiers, throws error if cache is not initialized + */ +function getCachedVerifiers(): VerifierInfo[] { + if (!isCacheInitialized || !verifiersCache) { + throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); + } + return verifiersCache; +} + +/** + * Check if verifiers cache is initialized + */ +export function isVerifiersCacheInitialized(): boolean { + return isCacheInitialized && verifiersCache !== null; +} + +/** + * Get the number of cached verifiers + */ +export function getCachedVerifiersCount(): number { + return verifiersCache?.length ?? 0; +} + +export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise { + const meter = + typeof meterIdentifier === "number" + ? getMeterByTokenId(meterIdentifier) + : getMeterByPublicKey(meterIdentifier); + + if (!meter) { + throw new Error(`Meter with identifier ${meterIdentifier} not found`); + } + + // Check the latest nonce on the blockchain + const onchainNonce = Number(await rollupContract.nonce(meter.tokenId)); + const latestNonce = meter.latestNonce; + + if (onchainNonce > latestNonce) { + const publicKey = meter.publicKey; + // If the onchain nonce is greater, update the local record + updateMeterNonce(publicKey, onchainNonce); + } + // prune transactions with nonce less than or equal to onchainNonce + pruneTransactionsBefore(meter.tokenId, onchainNonce); + + return onchainNonce; +} + +export async function getLatestTransactionNonce(meterIdentifier: number): Promise { + // get latest nonce from chain + let latestNonce = Number(await rollupContract.nonce(meterIdentifier)); + + // check local state for the highest nonce we have + while (true) { + const existingTransaction = getTransactionByNonce(latestNonce + 1, meterIdentifier); + if (existingTransaction) { + latestNonce += 1; + } else { + pruneTransactionsAfter(latestNonce, meterIdentifier); + break; + } + } + + return latestNonce; +} + +// get revenue across suppored chains +export async function getCrossChainRevenue(tokenId: number): Promise { + try { + // Use cached verifiers instead of fetching them each time + const verifiers = getCachedVerifiers(); + + let totalRevenue = 0; + + // Iterate through all cached verifiers and get revenue from each chain + for (const verifier of verifiers) { + try { + console.log(`[info] Getting revenue from ENS: ${verifier.ensName}, target: ${verifier.targetAddress}, verifier: ${verifier.verifierAddress}`); + + // Get revenue from this specific chain using CCIP read + // Parameters: tokenId, target (L2 contract), verifier (resolved from ENS) + const revenue = await retry(() => + ccipRevenueReaderContract.read(tokenId, verifier.targetAddress, verifier.verifierAddress, { + enableCcipRead: true, + }) + ); + const revenueAmount = Number(revenue); + + console.log(`[info] Revenue from ${verifier.ensName} (${verifier.verifierAddress}): ${revenueAmount}`); + totalRevenue += revenueAmount; + } catch (error) { + console.error(`[error] Failed to get revenue from verifier ${verifier.ensName}:`, error); + // Continue with other verifiers even if one fails + } + } + + console.log(`[info] Total cross-chain revenue for token ${tokenId}: ${totalRevenue}`); + return totalRevenue; + } catch (error) { + console.error(`[error] Failed to get cross-chain revenue for token ${tokenId}:`, error); + throw error; + } +} + +// get owed from price context +export async function getOwedFromPriceContext(tokenId: number): Promise { + try { + return await retry(async () => { + console.log(`[info] Getting owed amount for token ${tokenId} from price context`); + + // Call the price context to get the amount the user owes with CCIP read enabled + const owedAmount = await priceContextContract.owed(tokenId); + const owed = Number(owedAmount); + + console.log(`[info] Owed amount for token ${tokenId}: ${owed}`); + return owed; + }); + } catch (error) { + console.error(`[error] Failed to get owed amount for token ${tokenId}:`, error); + throw error; + } +} diff --git a/src/utils.ts b/src/utils.ts deleted file mode 100644 index 0fbacdd..0000000 --- a/src/utils.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { TransactionRecord, BatchTransactionPayload } from "./types"; -import { createPublicKey, verify } from "crypto"; -import os from "os"; - -/** - * Retries a function up to 5 times with exponential backoff - * @param fn Function to retry - * @param maxRetries Maximum number of retries (default: 5) - * @param baseDelay Base delay in milliseconds (default: 1000) - * @returns Promise that resolves with the function result or rejects with the last error - */ -export async function retry( - fn: () => Promise, - maxRetries: number = 5, - baseDelay: number = 1000 -): Promise { - let lastError: Error; - - for (let attempt = 0; attempt <= maxRetries; attempt++) { - try { - return await fn(); - } catch (error) { - lastError = error as Error; - - if (attempt === maxRetries) { - throw lastError; - } - - const delay = baseDelay * Math.pow(2, attempt); - console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error); - - await new Promise((resolve) => setTimeout(resolve, delay)); - } - } - - throw lastError!; -} - -export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] { - return transactions.map((transaction) => ({ - m3ter_id: Number(transaction.identifier), - message: transaction.raw, - })); -} - -export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): boolean { - try { - const message = transaction.subarray(0, 8); - const signature = transaction.subarray(8, 72); - - // Wrap raw key in SPKI DER - const spkiPrefix = Buffer.from("302a300506032b6570032100", "hex"); - const derKey = Buffer.concat([new Uint8Array(spkiPrefix), new Uint8Array(rawPubKey)]); - - const publicKey = createPublicKey({ - key: derKey, - format: "der", - type: "spki", - }); - - // Verify - const ok = verify(null, new Uint8Array(message), publicKey, new Uint8Array(signature)); - - return ok; - } catch (error) { - console.error("Error verifying signature:", error); - return false; - } -} From 04047bc8b978f658ffb59d2163b0a6093094962a Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:35:28 +0100 Subject: [PATCH 4/6] docs: add complete hook lifecycle documentation for M3tering Console --- README.md | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/README.md b/README.md index 4074dda..076c822 100644 --- a/README.md +++ b/README.md @@ -54,3 +54,68 @@ npm install npm run dev ``` + + +# Complete Hook Lifecycle for M3tering Console + +## Initialization Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onBeforeInit` | Called before any initialization begins | None | +| `onDatabaseSetup` | Called after SQLite tables/jobs are initialized | None | +| `onAfterInit` | Called after all initialization completes successfully | None | +| `onInitError` | Called when an error occurs during initialization | `error: any` | + +## MQTT Connection Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMqttConnect` | Called when MQTT client successfully connects to ChirpStack | `client: MqttClient` | +| `onMqttSubscribed` | Called after subscribing to the device uplink topic | `client: MqttClient`, `topic: string` | +| `onMqttError` | Called when an MQTT connection error occurs | `error: any`, `client: MqttClient` | +| `onMqttReconnect` | Called when attempting to reconnect to the MQTT broker | `client: MqttClient` | + +## Message Ingestion Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMessageReceived` | Called when a raw MQTT message is received (before parsing) | `blob: Buffer` | +| `onMessageDropped` | Called when a message is dropped (e.g., device already locked) | `reason: string`, `devEui: string` | + +## Meter Discovery & Registration Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMeterCreated` | Called after a new meter is saved to the database | `newMeter: MeterRecord` | + +## Nonce Synchronization Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onSyncEpochReached` | Called when the sync interval is reached for blockchain synchronization | None | + +## Downstream Distribution Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onTransactionDistribution` | Called before sending transactions to Arweave, prover node, Streamr, or other stores/loggers | `tokenId: number`, `decodedPayload: DecodedPayload`, `pendingTransactions: TransactionRecord[]` | + +## State Encoding & Device Response Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `isOnStateCompute` | Called to determine the device's on/off state (returns boolean) | `m3terId: number` | +| `onIsOnStateComputed` | Called after the on/off state has been computed | `m3terId: number`, `isOn: boolean` | +| `onIsOnStateComputeError` | Called when an error occurs during state computation | `m3terId: number`, `error: any` | +| `onStateEnqueued` | Called after the state is enqueued to gRPC for device response | `state: any`, `latitude: number`, `longitude: number` | + +## Error Handling & Cleanup Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMessageError` | Called when any error occurs during message processing | `error: any` | +| `onDeviceUnlocked` | Called when a device lock is released (regardless of outcome) | `devEui: string` | +| `onMessageProcessingComplete` | Called when message processing finishes (success or error) | None | + +--- From 70cdaab621b02e53b2ae2f34362e7be97cd0b6fe Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:45:33 +0100 Subject: [PATCH 5/6] refactor: modularize Streamr client initialization, destroy client instance after action and enhance transaction publishing with retry logic --- src/lib/core/streamr/index.ts | 44 ++++++++++++++--------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts index bddec0c..a16d375 100644 --- a/src/lib/core/streamr/index.ts +++ b/src/lib/core/streamr/index.ts @@ -1,5 +1,5 @@ import { StreamrClient } from "@streamr/sdk"; -import { buildBatchPayload } from "../../utils"; +import { buildBatchPayload, retry } from "../../utils"; import type { Hooks, TransactionRecord } from "../../../types"; const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; @@ -8,28 +8,14 @@ if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); } -export const streamrClient = new StreamrClient({ - auth: { - privateKey: ETHEREUM_PRIVATE_KEY, - }, -}); - -const stream = streamrClient.getStream(STREAMR_STREAM_ID); - -stream - .then((stream) => { - console.log(`[Streamr] Connected to stream: ${stream.id}`); - }) - .catch((error) => { - console.error("[Streamr] Error connecting to stream:", error); - }); - export default class implements Hooks { async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { // send pending transactions to streamr try { console.info(`Sending pending transactions to streamr`); - await publishPendingTransactionsToStreamr(pendingTransactions); + + await retry(() => publishPendingTransactionsToStreamr(pendingTransactions), 3); + console.info(`Successfully sent pending transactions to streamr`); } catch (error) { console.error(`Error sending pending transactions to streamr: ${error}`); @@ -37,15 +23,19 @@ export default class implements Hooks { } } -async function getStream() { - return await stream; -} +async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { + const streamrClient = new StreamrClient({ + auth: { + privateKey: ETHEREUM_PRIVATE_KEY!, + }, + }); -async function publishToStream(data: any) { - const stream = await getStream(); - await stream.publish(data); -} + const stream = await streamrClient.getStream(STREAMR_STREAM_ID!); -async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { - await publishToStream(buildBatchPayload(pendingTransactions)); + const batchPayload = buildBatchPayload(pendingTransactions); + + await stream.publish(batchPayload); + + // destroy the client to free resources + await streamrClient.destroy(); } From 684a5aaacc82d45bfef9e44272b9d346b99bae55 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:52:15 +0100 Subject: [PATCH 6/6] refactor: update getCachedVerifiers to return a promise and adjust related tests for context module --- src/lib/sync.ts | 8 ++++---- tests/logic/sync.test.ts | 19 +++++-------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/src/lib/sync.ts b/src/lib/sync.ts index 1a241ea..3ca21e5 100644 --- a/src/lib/sync.ts +++ b/src/lib/sync.ts @@ -79,11 +79,11 @@ export async function initializeVerifiersCache(): Promise { /** * Get cached verifiers, throws error if cache is not initialized */ -function getCachedVerifiers(): VerifierInfo[] { +async function getCachedVerifiers(): Promise { if (!isCacheInitialized || !verifiersCache) { - throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); + await initializeVerifiersCache(); } - return verifiersCache; + return verifiersCache!; } /** @@ -147,7 +147,7 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis export async function getCrossChainRevenue(tokenId: number): Promise { try { // Use cached verifiers instead of fetching them each time - const verifiers = getCachedVerifiers(); + const verifiers = await getCachedVerifiers(); let totalRevenue = 0; diff --git a/tests/logic/sync.test.ts b/tests/logic/sync.test.ts index 22d7db9..de1cb09 100644 --- a/tests/logic/sync.test.ts +++ b/tests/logic/sync.test.ts @@ -7,7 +7,7 @@ import { } from "../../src/lib/sync"; // Mock the context module -jest.mock("../../src/logic/context", () => ({ +jest.mock("../../src/services/context", () => ({ provider: { resolveName: jest.fn(), }, @@ -30,7 +30,7 @@ describe("Verifiers Cache", () => { }); it("should initialize cache successfully", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock successful responses ccipRevenueReader.verifierCount.mockResolvedValue(2n); @@ -55,7 +55,7 @@ describe("Verifiers Cache", () => { }); it("should throw error if ENS resolution fails (returns null)", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock responses with ENS resolution failure ccipRevenueReader.verifierCount.mockResolvedValue(1n); @@ -69,7 +69,7 @@ describe("Verifiers Cache", () => { }); it("should throw error if ENS resolution fails (returns zero address)", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock responses with ENS resolution failure ccipRevenueReader.verifierCount.mockResolvedValue(1n); @@ -82,17 +82,8 @@ describe("Verifiers Cache", () => { expect(getCachedVerifiersCount()).toBe(0); }); - it("should throw error if cache is not initialized when calling getCrossChainRevenue", async () => { - // Ensure cache is not initialized - expect(isVerifiersCacheInitialized()).toBe(false); - - await expect(getCrossChainRevenue(123)).rejects.toThrow( - "Verifiers cache not initialized. Call initializeVerifiersCache() first." - ); - }); - it("should use cached verifiers for getCrossChainRevenue", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock successful initialization ccipRevenueReader.verifierCount.mockResolvedValue(1n);