diff --git a/migrations/20260411_210100_add_dashboard_live_notify_triggers.js b/migrations/20260411_210100_add_dashboard_live_notify_triggers.js new file mode 100644 index 00000000..6cc63c3c --- /dev/null +++ b/migrations/20260411_210100_add_dashboard_live_notify_triggers.js @@ -0,0 +1,44 @@ +exports.up = async function (knex) { + await knex.schema.raw(` + CREATE OR REPLACE FUNCTION notify_dashboard_events_changed() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('dashboard_events_changed', TG_OP); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `) + + await knex.schema.raw(` + DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events; + + CREATE TRIGGER dashboard_events_changed_trigger + AFTER INSERT OR UPDATE OR DELETE ON events + FOR EACH STATEMENT + EXECUTE FUNCTION notify_dashboard_events_changed(); + `) + + await knex.schema.raw(` + CREATE OR REPLACE FUNCTION notify_dashboard_users_changed() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('dashboard_users_changed', TG_OP); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `) + + await knex.schema.raw(` + DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users; + + CREATE TRIGGER dashboard_users_changed_trigger + AFTER INSERT OR UPDATE OR DELETE ON users + FOR EACH STATEMENT + EXECUTE FUNCTION notify_dashboard_users_changed(); + `) +} + +exports.down = async function (knex) { + await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events;') + await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users;') + await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_events_changed();') + await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_users_changed();') +} diff --git a/migrations/20260412_020000_add_dashboard_kpi_indexes.js b/migrations/20260412_020000_add_dashboard_kpi_indexes.js new file mode 100644 index 00000000..63af1af2 --- /dev/null +++ b/migrations/20260412_020000_add_dashboard_kpi_indexes.js @@ -0,0 +1,40 @@ +/** + * Migration: add dashboard KPI query indexes + * + * Without these the incremental collector degrades to sequential scans: + * - idx_events_cursor → covers the (first_seen, id) cursor predicate used in every + * incremental delta query and the bootstrap cursor select. + * - idx_events_pubkey → covers the GROUP BY event_pubkey in the all-time talker query. + * - idx_users_cursor → covers the (updated_at, pubkey) cursor predicate used in the + * user delta / cursor-select queries. + * + * All three are created CONCURRENTLY so they don't lock the table on a live relay. + * knex does not support CREATE INDEX CONCURRENTLY natively, so we use raw SQL and + * set `disableTransactions` to true (DDL inside a transaction would negate CONCURRENTLY). + */ + +exports.up = async (knex) => { + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_cursor + ON events (first_seen, id); + `) + + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_pubkey + ON events (event_pubkey); + `) + + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_cursor + ON users (updated_at, pubkey); + `) +} + +exports.down = async (knex) => { + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_cursor;') + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_pubkey;') + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_users_cursor;') +} + +// Required so knex doesn't wrap the CONCURRENTLY statements in a transaction. +exports.config = { transaction: false } diff --git a/migrations/20260415_010000_add_nip29_group_control_tables.js b/migrations/20260415_010000_add_nip29_group_control_tables.js new file mode 100644 index 00000000..c70fbc3e --- /dev/null +++ b/migrations/20260415_010000_add_nip29_group_control_tables.js @@ -0,0 +1,46 @@ +exports.up = async function (knex) { + await knex.schema.createTable('groups', (table) => { + table.string('group_id', 64).primary() + table.binary('owner_pubkey').notNullable() + table.string('leader_relay_url', 255).notNullable() + table.timestamp('created_at', { useTz: true }).defaultTo(knex.fn.now()) + }) + + await knex.schema.createTable('group_control_log', (table) => { + table.bigIncrements('sequence_id').primary() + table.string('group_id', 64).notNullable().references('group_id').inTable('groups') + table.bigInteger('group_sequence').notNullable() + table.binary('event_id').notNullable().unique() + table.binary('pubkey').notNullable() + table.integer('kind').notNullable() + table.jsonb('raw_event').notNullable() + table.string('state_root', 64).notNullable() + table.timestamp('created_at', { useTz: true }).defaultTo(knex.fn.now()) + + table.unique(['group_id', 'group_sequence']) + table.index(['group_id', 'group_sequence']) + table.index(['group_id', 'created_at']) + table.index(['kind']) + }) + + await knex.raw( + 'ALTER TABLE group_control_log ADD CONSTRAINT group_control_log_kind_check CHECK (kind >= 9000 AND kind <= 9006);' + ) + + await knex.schema.createTable('group_memberships', (table) => { + table.string('group_id', 64).notNullable().references('group_id').inTable('groups') + table.binary('pubkey').notNullable() + table.string('role', 20).notNullable() + + table.primary(['group_id', 'pubkey']) + table.index(['group_id', 'role']) + table.index(['pubkey']) + }) +} + +exports.down = async function (knex) { + await knex.schema.dropTableIfExists('group_memberships') + await knex.raw('ALTER TABLE IF EXISTS group_control_log DROP CONSTRAINT IF EXISTS group_control_log_kind_check;') + await knex.schema.dropTableIfExists('group_control_log') + await knex.schema.dropTableIfExists('groups') +} \ No newline at end of file diff --git a/migrations/20260419_120000_add_dashboard_updated_revision.js b/migrations/20260419_120000_add_dashboard_updated_revision.js new file mode 100644 index 00000000..baae40e7 --- /dev/null +++ b/migrations/20260419_120000_add_dashboard_updated_revision.js @@ -0,0 +1,55 @@ +exports.up = async function (knex) { + await knex.schema.createTable('dashboard_state', (table) => { + table.integer('id').primary() + table.bigInteger('revision').notNullable().defaultTo(0) + table.timestamp('updated_at', { useTz: false }).notNullable().defaultTo(knex.fn.now()) + }) + + await knex('dashboard_state') + .insert({ + id: 1, + revision: 0, + updated_at: knex.fn.now(), + }) + .onConflict('id') + .ignore() + + await knex.schema.raw(` + CREATE OR REPLACE FUNCTION dashboard_updated() RETURNS TRIGGER AS $$ + BEGIN + UPDATE dashboard_state + SET + revision = revision + 1, + updated_at = NOW() + WHERE id = 1; + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `) + + await knex.schema.raw(` + DROP TRIGGER IF EXISTS dashboard_revision_events_trigger ON events; + + CREATE TRIGGER dashboard_revision_events_trigger + AFTER INSERT OR UPDATE OR DELETE ON events + FOR EACH STATEMENT + EXECUTE FUNCTION dashboard_updated(); + `) + + await knex.schema.raw(` + DROP TRIGGER IF EXISTS dashboard_revision_users_trigger ON users; + + CREATE TRIGGER dashboard_revision_users_trigger + AFTER INSERT OR UPDATE OR DELETE ON users + FOR EACH STATEMENT + EXECUTE FUNCTION dashboard_updated(); + `) +} + +exports.down = async function (knex) { + await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_revision_events_trigger ON events;') + await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_revision_users_trigger ON users;') + await knex.schema.raw('DROP FUNCTION IF EXISTS dashboard_updated();') + await knex.schema.dropTableIfExists('dashboard_state') +} diff --git a/package.json b/package.json index 9d9f902f..e78a3b12 100644 --- a/package.json +++ b/package.json @@ -23,10 +23,12 @@ "main": "src/index.ts", "scripts": { "dev": "node -r ts-node/register src/index.ts", + "dev:dashboard": "node -r ts-node/register src/dashboard-service/index.ts", "clean": "rimraf ./{dist,.nyc_output,.test-reports,.coverage}", "build": "tsc --project tsconfig.build.json", "prestart": "npm run build", "start": "cd dist && node src/index.js", + "start:dashboard": "node dist/src/dashboard-service/index.js", "build:check": "npm run build -- --noEmit", "lint": "eslint --ext .ts ./src ./test", "lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test", @@ -37,6 +39,7 @@ "db:seed": "knex seed:run", "pretest:unit": "mkdir -p .test-reports/unit", "test:unit": "mocha 'test/**/*.spec.ts'", + "test:unit:dashboard": "mocha 'test/unit/dashboard-service/**/*.spec.ts'", "test:unit:watch": "npm run test:unit -- --min --watch --watch-files src/**/*,test/**/*", "cover:unit": "nyc --report-dir .coverage/unit npm run test:unit", "docker:build": "docker build -t nostream .", diff --git a/src/dashboard-service/api/dashboard-router.ts b/src/dashboard-service/api/dashboard-router.ts new file mode 100644 index 00000000..dc977c52 --- /dev/null +++ b/src/dashboard-service/api/dashboard-router.ts @@ -0,0 +1,12 @@ +import { Router } from 'express' + +import { createGetKPISnapshotRequestHandler } from '../handlers/request-handlers/get-kpi-snapshot-request-handler' +import { SnapshotService } from '../services/snapshot-service' + +export const createDashboardRouter = (snapshotService: SnapshotService): Router => { + const router = Router() + + router.get('/snapshot', createGetKPISnapshotRequestHandler(snapshotService)) + + return router +} diff --git a/src/dashboard-service/app.ts b/src/dashboard-service/app.ts new file mode 100644 index 00000000..b2ed319d --- /dev/null +++ b/src/dashboard-service/app.ts @@ -0,0 +1,159 @@ +import { IKPICollector, SnapshotService } from './services/snapshot-service' +import { createDashboardRouter } from './api/dashboard-router' +import { createLogger } from '../factories/logger-factory' +import { DashboardServiceConfig } from './config' +import { DashboardUpdateVersionService } from './services/dashboard-update-version-service' +import { DashboardWebSocketHub } from './ws/dashboard-ws-hub' +import express from 'express' +import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler' +import { getReadReplicaDbClient } from '../database/client' +import http from 'http' +import { KPICollectorService } from './services/kpi-collector-service' +import { PollingScheduler } from './polling/polling-scheduler' +import { WebSocketServer } from 'ws' +const debug = createLogger('dashboard-service:app') + +export interface DashboardService { + readonly config: DashboardServiceConfig + readonly snapshotService: SnapshotService + readonly pollingScheduler: PollingScheduler + start(): Promise + stop(): Promise + getHttpPort(): number +} + +export const createDashboardService = (config: DashboardServiceConfig): DashboardService => { + console.info( + 'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d, useDummyData=%s)', + config.host, + config.port, + config.wsPath, + config.pollIntervalMs, + config.useDummyData, + ) + + const dbClient = config.useDummyData ? undefined : getReadReplicaDbClient() + const collector: IKPICollector = config.useDummyData + ? { + collectMetrics: async () => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { allTime: [], recent: [] }, + }), + } + : new KPICollectorService(dbClient) + + const updateVersionProvider = typeof dbClient === 'undefined' + ? undefined + : new DashboardUpdateVersionService(dbClient) + + const snapshotService = new SnapshotService(collector, updateVersionProvider) + + const app = express() + .disable('x-powered-by') + .get('/healthz', getHealthRequestHandler) + .use('/api/v1/kpis', createDashboardRouter(snapshotService)) + + const webServer = http.createServer(app) + const webSocketServer = new WebSocketServer({ + server: webServer, + path: config.wsPath, + }) + + const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot()) + + const pollingScheduler = new PollingScheduler(config.pollIntervalMs, async () => { + const { snapshot, changed } = await snapshotService.refresh() + + if (!changed) { + debug('poll tick detected no KPI changes') + return + } + + debug('poll tick produced snapshot sequence=%d status=%s', snapshot.sequence, snapshot.status) + webSocketHub.broadcastTick(snapshot.sequence) + webSocketHub.broadcastSnapshot(snapshot) + }) + + const start = async () => { + if (webServer.listening) { + debug('start requested but service is already listening') + return + } + + console.info('dashboard-service: starting http and websocket servers') + + await new Promise((resolve, reject) => { + webServer.listen(config.port, config.host, () => { + const address = webServer.address() + debug('listening on %o', address) + console.info('dashboard-service: listening on %o', address) + resolve() + }) + webServer.once('error', (error) => { + console.error('dashboard-service: failed to start server', error) + reject(error) + }) + }) + + try { + const initialSnapshotRefresh = await snapshotService.refresh() + if (initialSnapshotRefresh.changed) { + debug('initial snapshot prepared with sequence=%d status=%s', initialSnapshotRefresh.snapshot.sequence, initialSnapshotRefresh.snapshot.status) + } + } catch (error) { + console.error('dashboard-service: initial snapshot refresh failed (will retry on next poll)', error) + } + + pollingScheduler.start() + console.info('dashboard-service: polling scheduler started') + } + + const stop = async () => { + console.info('dashboard-service: stopping service') + pollingScheduler.stop() + + if (collector?.close) { + try { + await collector.close() + } catch (error) { + console.error('dashboard-service: failed to close collector resources', error) + } + } + + webSocketHub.close() + await new Promise((resolve, reject) => { + if (!webServer.listening) { + debug('stop requested while server was already stopped') + resolve() + return + } + + webServer.close((error) => { + if (error) { + console.error('dashboard-service: failed to stop cleanly', error) + reject(error) + return + } + + console.info('dashboard-service: http server closed') + resolve() + }) + }) + } + + const getHttpPort = (): number => { + const address = webServer.address() + return typeof address === 'object' && address !== null ? address.port : config.port + } + + return { + config, + snapshotService, + pollingScheduler, + start, + stop, + getHttpPort, + } +} diff --git a/src/dashboard-service/config.ts b/src/dashboard-service/config.ts new file mode 100644 index 00000000..17da8a42 --- /dev/null +++ b/src/dashboard-service/config.ts @@ -0,0 +1,49 @@ +import { parseArgs } from 'util' + +export interface DashboardServiceConfig { + host: string + port: number + wsPath: string + pollIntervalMs: number + useDummyData: boolean +} + +const parseBoolean = (value: string | undefined, fallback = false): boolean => { + if (typeof value === 'undefined') { + return fallback + } + + return value === '1' || value.toLowerCase() === 'true' +} + +const parseInteger = (value: string | undefined, fallback: number): number => { + if (typeof value === 'undefined' || value === '') { + return fallback + } + + const parsed = Number(value) + if (!Number.isInteger(parsed) || parsed < 0) { + return fallback + } + + return parsed +} + +export const getDashboardServiceConfig = (): DashboardServiceConfig => { + const { values } = parseArgs({ + args: process.argv.slice(2), + options: { + port: { type: 'string', short: 'p' }, + host: { type: 'string', short: 'h' }, + }, + strict: false, + }) + + return { + host: (values.host as string) ?? process.env.DASHBOARD_SERVICE_HOST ?? '127.0.0.1', + port: parseInteger(values.port as string, parseInteger(process.env.DASHBOARD_SERVICE_PORT, 8011)), + wsPath: process.env.DASHBOARD_WS_PATH ?? '/api/v1/kpis/stream', + pollIntervalMs: parseInteger(process.env.DASHBOARD_POLL_INTERVAL_MS, 5000), + useDummyData: parseBoolean(process.env.DASHBOARD_USE_DUMMY_DATA, false), + } +} diff --git a/src/dashboard-service/controllers/get-health-controller.ts b/src/dashboard-service/controllers/get-health-controller.ts new file mode 100644 index 00000000..0c7037a3 --- /dev/null +++ b/src/dashboard-service/controllers/get-health-controller.ts @@ -0,0 +1,12 @@ +import { Request, Response } from 'express' + +import { IController } from '../../@types/controllers' + +export class GetHealthController implements IController { + public async handleRequest(_request: Request, response: Response): Promise { + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send({ status: 'ok' }) + } +} \ No newline at end of file diff --git a/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts new file mode 100644 index 00000000..741d9902 --- /dev/null +++ b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts @@ -0,0 +1,20 @@ +import { Request, Response } from 'express' + +import { DashboardSnapshotResponse } from '../types' +import { IController } from '../../@types/controllers' +import { SnapshotService } from '../services/snapshot-service' + +export class GetKPISnapshotController implements IController { + public constructor(private readonly snapshotService: SnapshotService) { } + + public async handleRequest(_request: Request, response: Response): Promise { + const payload: DashboardSnapshotResponse = { + data: this.snapshotService.getSnapshot(), + } + + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send(payload) + } +} \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts new file mode 100644 index 00000000..e8f2cea4 --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts @@ -0,0 +1,5 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetHealthController } from '../../controllers/get-health-controller' + +export const getHealthRequestHandler = withController(() => new GetHealthController()) \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts new file mode 100644 index 00000000..507e181b --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts @@ -0,0 +1,8 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetKPISnapshotController } from '../../controllers/get-kpi-snapshot-controller' +import { SnapshotService } from '../../services/snapshot-service' + +export const createGetKPISnapshotRequestHandler = (snapshotService: SnapshotService) => { + return withController(() => new GetKPISnapshotController(snapshotService)) +} \ No newline at end of file diff --git a/src/dashboard-service/index.ts b/src/dashboard-service/index.ts new file mode 100644 index 00000000..4f64161c --- /dev/null +++ b/src/dashboard-service/index.ts @@ -0,0 +1,42 @@ +import { createLogger } from '../factories/logger-factory' + +import { createDashboardService } from './app' +import { getDashboardServiceConfig } from './config' + +const debug = createLogger('dashboard-service:index') + +const run = async () => { + const config = getDashboardServiceConfig() + console.info('dashboard-service: bootstrapping with config %o', config) + const service = createDashboardService(config) + + const shutdown = async () => { + console.info('dashboard-service: received shutdown signal') + debug('received shutdown signal') + await service.stop() + process.exit(0) + } + + process + .on('SIGINT', shutdown) + .on('SIGTERM', shutdown) + + process.on('uncaughtException', (error) => { + console.error('dashboard-service: uncaught exception', error) + }) + + process.on('unhandledRejection', (error) => { + console.error('dashboard-service: unhandled rejection', error) + }) + + await service.start() +} + +if (require.main === module) { + run().catch((error) => { + console.error('dashboard-service: unable to start', error) + process.exit(1) + }) +} + +export { run } diff --git a/src/dashboard-service/polling/polling-scheduler.ts b/src/dashboard-service/polling/polling-scheduler.ts new file mode 100644 index 00000000..58663adb --- /dev/null +++ b/src/dashboard-service/polling/polling-scheduler.ts @@ -0,0 +1,69 @@ +import { createLogger } from '../../factories/logger-factory' + +type Tick = () => Promise | void + +const debug = createLogger('dashboard-service:polling') + +/** + * Runs a tick callback on a fixed cadence, but — unlike setInterval — never + * overlaps: the next tick is only scheduled *after* the current one resolves + * or rejects. This prevents DB query storms when a poll takes longer than the + * configured interval. + */ +export class PollingScheduler { + private timer: NodeJS.Timeout | undefined + private running = false + + public constructor( + private readonly intervalMs: number, + private readonly tick: Tick, + ) { } + + public start(): void { + if (this.running) { + return + } + + this.running = true + debug('starting scheduler with interval %d ms', this.intervalMs) + this.scheduleNext() + } + + public stop(): void { + if (!this.running) { + return + } + + debug('stopping scheduler') + this.running = false + + if (this.timer) { + clearTimeout(this.timer) + this.timer = undefined + } + } + + public isRunning(): boolean { + return this.running + } + + private scheduleNext(): void { + if (!this.running) { + return + } + + this.timer = setTimeout(() => { + this.timer = undefined + + Promise.resolve(this.tick()) + .catch((error) => { + console.error('dashboard-service: polling tick failed', error) + }) + .finally(() => { + // Schedule the next tick only after the current one completes, + // regardless of success or failure. + this.scheduleNext() + }) + }, this.intervalMs) + } +} diff --git a/src/dashboard-service/services/dashboard-update-version-service.ts b/src/dashboard-service/services/dashboard-update-version-service.ts new file mode 100644 index 00000000..5bba9349 --- /dev/null +++ b/src/dashboard-service/services/dashboard-update-version-service.ts @@ -0,0 +1,34 @@ +import { createLogger } from '../../factories/logger-factory' +import { DatabaseClient } from '../../@types/base' + +const debug = createLogger('dashboard-service:update-version') + +interface IDashboardStateRow { + revision: string | number +} + +export class DashboardUpdateVersionService { + private disabled = false + + public constructor(private readonly dbClient: DatabaseClient) { } + + public async getCurrentVersion(): Promise { + if (this.disabled) { + return + } + + try { + const row = await this.dbClient('dashboard_state') + .select('revision') + .where('id', 1) + .first() + + return typeof row === 'undefined' ? '0' : String(row.revision) + } catch (error) { + this.disabled = true + console.error('dashboard-service: dashboard revision lookup unavailable, falling back to full polling', error) + debug('dashboard revision lookup disabled after query failure') + return + } + } +} diff --git a/src/dashboard-service/services/kpi-collector-service.ts b/src/dashboard-service/services/kpi-collector-service.ts new file mode 100644 index 00000000..51139733 --- /dev/null +++ b/src/dashboard-service/services/kpi-collector-service.ts @@ -0,0 +1,134 @@ +import { DashboardMetrics, EventsByKindCount, TopTalker } from '../types' +import { createLogger } from '../../factories/logger-factory' +import { DatabaseClient } from '../../@types/base' + +const debug = createLogger('dashboard-service:kpi-collector') + +const DEFAULT_TRACKED_KINDS = [7, 1, 6, 1984, 4, 3, 9735] + +const toNumber = (value: unknown): number => { + if (typeof value === 'number') { + return value + } + + if (typeof value === 'string' && value !== '') { + return Number(value) + } + + return 0 +} + +export class KPICollectorService { + public constructor( + private readonly dbClient: DatabaseClient, + private readonly trackedKinds: number[] = DEFAULT_TRACKED_KINDS, + private readonly topTalkersLimit = 10, + private readonly recentDays = 3, + ) { } + + public async collectMetrics(): Promise { + debug('collecting dashboard metrics') + + const [ + eventsByKind, + admittedUsers, + satsPaid, + allTimeTopTalkers, + recentTopTalkers, + ] = await Promise.all([ + this.getEventsByKind(), + this.getAdmittedUsersCount(), + this.getSatsPaidCount(), + this.getTopTalkersAllTime(), + this.getTopTalkersRecent(), + ]) + + return { + eventsByKind, + admittedUsers, + satsPaid, + topTalkers: { + allTime: allTimeTopTalkers, + recent: recentTopTalkers, + }, + } + } + + private async getEventsByKind(): Promise { + const rows = await this.dbClient('events') + .select('event_kind') + .count('id as count') + .whereIn('event_kind', this.trackedKinds) + .groupBy('event_kind') + .orderBy('count', 'desc') as Array<{ event_kind: number, count: string }> + + const other = await this.dbClient('events') + .whereNotIn('event_kind', this.trackedKinds) + .count<{ count: string }>('id as count') + .first() + + const eventsByKind = rows.map((row) => { + return { + kind: String(row.event_kind), + count: toNumber(row.count), + } + }) + + eventsByKind.push({ + kind: 'other', + count: toNumber(other?.count), + }) + + return eventsByKind + } + + private async getAdmittedUsersCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .count<{ count: string }>('pubkey as count') + .first() + + return toNumber(result?.count) + } + + private async getSatsPaidCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .sum<{ total: string | null }>('balance as total') + .first() + + const millisats = toNumber(result?.total) + return millisats / 1000 + } + + private async getTopTalkersAllTime(): Promise { + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } + + private async getTopTalkersRecent(): Promise { + const since = new Date(Date.now() - this.recentDays * 24 * 60 * 60 * 1000) + + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .where('first_seen', '>=', since) + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } +} diff --git a/src/dashboard-service/services/snapshot-service.ts b/src/dashboard-service/services/snapshot-service.ts new file mode 100644 index 00000000..ce47ce92 --- /dev/null +++ b/src/dashboard-service/services/snapshot-service.ts @@ -0,0 +1,109 @@ +import { DashboardMetrics, KPISnapshot } from '../types' +import { createLogger } from '../../factories/logger-factory' + +const debug = createLogger('dashboard-service:snapshot-service') + +const defaultMetrics = (): DashboardMetrics => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { + allTime: [], + recent: [], + }, +}) + +export interface ISnapshotRefreshResult { + snapshot: KPISnapshot + changed: boolean +} + +export interface IKPICollector { + collectMetrics(): Promise + close?(): Promise | void +} + +export interface IKPIUpdateVersionProvider { + getCurrentVersion(): Promise +} + +export class SnapshotService { + private lastCollectedVersion: string | undefined + + private metricsFingerprint = JSON.stringify(defaultMetrics()) + + private sequence = 0 + + private snapshot: KPISnapshot = { + sequence: this.sequence, + generatedAt: new Date(0).toISOString(), + status: 'live', + metrics: defaultMetrics(), + } + + public constructor( + private readonly collector: IKPICollector, + private readonly updateVersionProvider?: IKPIUpdateVersionProvider, + ) { } + + public getSnapshot(): KPISnapshot { + return this.snapshot + } + + /** + * Fetches fresh metrics from the collector and updates the snapshot if the + * metrics have changed. Throws if the collector is unavailable — callers + * are responsible for catching and deciding how to surface errors. + */ + public async refresh(): Promise { + const currentVersion = await this.updateVersionProvider?.getCurrentVersion() + + if ( + typeof currentVersion !== 'undefined' + && typeof this.lastCollectedVersion !== 'undefined' + && currentVersion === this.lastCollectedVersion + && this.snapshot.status === 'live' + ) { + debug('dashboard revision unchanged, skipping KPI collection') + return { + snapshot: this.snapshot, + changed: false, + } + } + + const metrics = await this.collector.collectMetrics() + const nextFingerprint = JSON.stringify(metrics) + + if (typeof currentVersion !== 'undefined') { + this.lastCollectedVersion = currentVersion + } + + if (nextFingerprint === this.metricsFingerprint && this.snapshot.status === 'live') { + debug('metrics unchanged, skipping snapshot sequence update') + return { + snapshot: this.snapshot, + changed: false, + } + } + + this.metricsFingerprint = nextFingerprint + + return this.updateSnapshot(metrics, 'live') + } + + private updateSnapshot(metrics: DashboardMetrics, status: 'live' | 'stale'): ISnapshotRefreshResult { + this.sequence += 1 + + this.snapshot = { + sequence: this.sequence, + generatedAt: new Date().toISOString(), + status, + metrics, + } + + return { + snapshot: this.snapshot, + changed: true, + } + } +} diff --git a/src/dashboard-service/types.ts b/src/dashboard-service/types.ts new file mode 100644 index 00000000..15165b30 --- /dev/null +++ b/src/dashboard-service/types.ts @@ -0,0 +1,40 @@ +export interface TopTalker { + pubkey: string + count: number +} + +export interface EventsByKindCount { + kind: string + count: number +} + +export interface DashboardMetrics { + eventsByKind: EventsByKindCount[] + admittedUsers: number + satsPaid: number + topTalkers: { + allTime: TopTalker[] + recent: TopTalker[] + } +} + +export interface KPISnapshot { + sequence: number + generatedAt: string + status: 'live' | 'stale' + metrics: DashboardMetrics +} + +export interface DashboardSnapshotResponse { + data: KPISnapshot +} + +export interface DashboardWebSocketEnvelope { + type: TType + payload: TPayload +} + +export type DashboardServerMessage = + | DashboardWebSocketEnvelope<'dashboard.connected', { at: string }> + | DashboardWebSocketEnvelope<'kpi.snapshot', KPISnapshot> + | DashboardWebSocketEnvelope<'kpi.tick', { at: string, sequence: number }> diff --git a/src/dashboard-service/ws/dashboard-ws-hub.ts b/src/dashboard-service/ws/dashboard-ws-hub.ts new file mode 100644 index 00000000..1b3b330d --- /dev/null +++ b/src/dashboard-service/ws/dashboard-ws-hub.ts @@ -0,0 +1,141 @@ +import { DashboardServerMessage, KPISnapshot } from '../types' +import { RawData, WebSocketServer } from 'ws' +import { createLogger } from '../../factories/logger-factory' +import WebSocket from 'ws' + +const debug = createLogger('dashboard-service:ws') + +export class DashboardWebSocketHub { + public constructor( + private readonly webSocketServer: WebSocketServer, + private readonly getSnapshot: () => KPISnapshot, + ) { + console.info('dashboard-service: websocket hub initialized') + + this.webSocketServer + .on('connection', this.onConnection.bind(this)) + .on('close', () => { + console.info('dashboard-service: websocket server closed') + }) + .on('error', (error) => { + console.error('dashboard-service: websocket server error', error) + }) + } + + public broadcastSnapshot(snapshot: KPISnapshot): void { + this.broadcast({ + type: 'kpi.snapshot', + payload: snapshot, + }) + } + + public broadcastTick(sequence: number): void { + this.broadcast({ + type: 'kpi.tick', + payload: { + at: new Date().toISOString(), + sequence, + }, + }) + } + + public close(): void { + console.info('dashboard-service: closing websocket hub') + this.webSocketServer.clients.forEach((client) => { + client.close() + }) + this.webSocketServer.removeAllListeners() + } + + private onConnection(client: WebSocket): void { + const connectedClients = this.getConnectedClientsCount() + console.info('dashboard-service: websocket client connected (clients=%d)', connectedClients) + + client + .on('close', (code, reason) => { + console.info( + 'dashboard-service: websocket client disconnected (code=%d, reason=%s, clients=%d)', + code, + reason.toString(), + this.getConnectedClientsCount(), + ) + }) + .on('error', (error) => { + console.error('dashboard-service: websocket client error', error) + }) + .on('message', (raw) => { + this.onClientMessage(raw) + }) + + this.send(client, { + type: 'dashboard.connected', + payload: { + at: new Date().toISOString(), + }, + }) + + this.send(client, { + type: 'kpi.snapshot', + payload: this.getSnapshot(), + }) + + debug('dashboard websocket bootstrap snapshot sent') + } + + private onClientMessage(raw: RawData): void { + try { + const rawMessage = this.toUtf8(raw) + const message = JSON.parse(rawMessage) + debug('dashboard websocket client message received: %o', message) + } catch (error) { + console.error('dashboard-service: websocket message parsing failed', error) + } + } + + private broadcast(message: DashboardServerMessage): void { + this.webSocketServer.clients.forEach((client) => { + if (client.readyState !== WebSocket.OPEN) { + return + } + this.send(client, message) + }) + } + + private send(client: WebSocket, message: DashboardServerMessage): void { + if (client.readyState !== WebSocket.OPEN) { + return + } + + try { + client.send(JSON.stringify(message)) + } catch (error) { + console.error('dashboard-service: websocket send failed', error) + } + } + + private toUtf8(raw: RawData): string { + if (typeof raw === 'string') { + return raw + } + + if (Buffer.isBuffer(raw)) { + return raw.toString('utf8') + } + + if (Array.isArray(raw)) { + return raw.map((chunk) => { + if (Buffer.isBuffer(chunk)) { + return chunk.toString('utf8') + } + + return Buffer.from(chunk as ArrayBuffer).toString('utf8') + }).join('') + } + + return Buffer.from(raw).toString('utf8') + } + + private getConnectedClientsCount(): number { + return Array.from(this.webSocketServer.clients).filter((client) => client.readyState === WebSocket.OPEN).length + } +} diff --git a/test/unit/dashboard-service/app.spec.ts b/test/unit/dashboard-service/app.spec.ts new file mode 100644 index 00000000..f8e81331 --- /dev/null +++ b/test/unit/dashboard-service/app.spec.ts @@ -0,0 +1,45 @@ +import axios from 'axios' +import { createDashboardService } from '../../../src/dashboard-service/app' +import { expect } from 'chai' +import WebSocket from 'ws' + +describe('dashboard-service app', () => { + it('serves health, snapshot, and websocket endpoints', async () => { + const service = createDashboardService({ + host: '127.0.0.1', + port: 0, + wsPath: '/api/v1/kpis/stream', + pollIntervalMs: 1000, + useDummyData: true, + collectorMode: 'full', + }) + + await service.start() + + const port = service.getHttpPort() + + const healthResponse = await axios.get(`http://127.0.0.1:${port}/healthz`) + expect(healthResponse.status).to.equal(200) + + const snapshotResponse = await axios.get(`http://127.0.0.1:${port}/api/v1/kpis/snapshot`) + expect(snapshotResponse.status).to.equal(200) + + const snapshotJson = snapshotResponse.data as any + expect(snapshotJson.data).to.have.property('sequence') + + const ws = new WebSocket(`ws://127.0.0.1:${port}/api/v1/kpis/stream`) + + const connectedEvent = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('timeout waiting for ws message')), 2000) + ws.once('message', (raw) => { + clearTimeout(timeout) + resolve(JSON.parse(raw.toString())) + }) + }) + + expect(connectedEvent).to.have.property('type', 'dashboard.connected') + + ws.close() + await service.stop() + }) +}) diff --git a/test/unit/dashboard-service/polling-scheduler.spec.ts b/test/unit/dashboard-service/polling-scheduler.spec.ts new file mode 100644 index 00000000..8855fcb9 --- /dev/null +++ b/test/unit/dashboard-service/polling-scheduler.spec.ts @@ -0,0 +1,97 @@ +import { expect } from 'chai' +import Sinon from 'sinon' + +import { PollingScheduler } from '../../../src/dashboard-service/polling/polling-scheduler' + +describe('PollingScheduler', () => { + let clock: Sinon.SinonFakeTimers + + beforeEach(() => { + clock = Sinon.useFakeTimers() + }) + + afterEach(() => { + clock.restore() + }) + + /** + * The scheduler uses recursive setTimeout (not setInterval), so each tick + * is only enqueued after the previous one resolves. With instant-resolving + * stubs the sequence is: + * T=0 start() → schedules tick at T=1000 + * T=1000 tick #1 resolves → schedules tick at T=2000 + * T=2000 tick #2 resolves → schedules tick at T=3000 + * T=3000 tick #3 resolves → schedules tick at T=4000 + * tickAsync drives the microtask queue between timer firings, so all three + * ticks complete inside tickAsync(3000). + */ + it('runs tick callback on interval while running', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(1000, tick) + + scheduler.start() + await clock.tickAsync(3000) + + expect(tick.callCount).to.equal(3) + scheduler.stop() + }) + + it('stops running when stop is called', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(500, tick) + + scheduler.start() + await clock.tickAsync(1000) // ticks at 500ms, 1000ms → 2 calls + scheduler.stop() + await clock.tickAsync(1000) // no more ticks after stop + + expect(tick.callCount).to.equal(2) + }) + + it('does not overlap ticks when callback is slow', async () => { + // Tick takes 1500ms — longer than the 1000ms interval. + // With setInterval this would cause overlap; with recursive setTimeout it must not. + let running = 0 + let maxConcurrent = 0 + + const tick = Sinon.stub().callsFake(async () => { + running++ + maxConcurrent = Math.max(maxConcurrent, running) + await clock.tickAsync(1500) + running-- + }) + + const scheduler = new PollingScheduler(1000, tick) + scheduler.start() + // Drive enough time for two potential overlapping cycles + await clock.tickAsync(4000) + scheduler.stop() + + expect(maxConcurrent).to.equal(1, 'ticks must not run concurrently') + }) + + it('continues scheduling after a failed tick', async () => { + const tick = Sinon.stub() + .onFirstCall().rejects(new Error('transient error')) + .resolves(undefined) + + const scheduler = new PollingScheduler(1000, tick) + scheduler.start() + await clock.tickAsync(3000) + scheduler.stop() + + // First tick rejects, but the scheduler must recover and run two more. + expect(tick.callCount).to.be.greaterThanOrEqual(2) + }) + + it('isRunning reflects scheduler state', () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(1000, tick) + + expect(scheduler.isRunning()).to.equal(false) + scheduler.start() + expect(scheduler.isRunning()).to.equal(true) + scheduler.stop() + expect(scheduler.isRunning()).to.equal(false) + }) +}) diff --git a/test/unit/dashboard-service/snapshot-service.spec.ts b/test/unit/dashboard-service/snapshot-service.spec.ts new file mode 100644 index 00000000..a1acba8a --- /dev/null +++ b/test/unit/dashboard-service/snapshot-service.spec.ts @@ -0,0 +1,107 @@ +import chai, { expect } from 'chai' +import chaiAsPromised from 'chai-as-promised' +import Sinon from 'sinon' + +import { IKPICollector, SnapshotService } from '../../../src/dashboard-service/services/snapshot-service' +import { DashboardMetrics } from '../../../src/dashboard-service/types' + +chai.use(chaiAsPromised) + +const createMetrics = (overrides: Partial = {}): DashboardMetrics => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { + allTime: [], + recent: [], + }, + ...overrides, +}) + +const makeCollector = (stub: Sinon.SinonStub): IKPICollector => ({ + collectMetrics: stub, +}) + +describe('SnapshotService', () => { + let sandbox: Sinon.SinonSandbox + + beforeEach(() => { + sandbox = Sinon.createSandbox() + }) + + afterEach(() => { + sandbox.restore() + }) + + it('updates snapshot when collected metrics change', async () => { + const firstMetrics = createMetrics({ admittedUsers: 1 }) + const nextMetrics = createMetrics({ admittedUsers: 2 }) + + const stub = sandbox.stub() + .onFirstCall().resolves(firstMetrics) + .onSecondCall().resolves(firstMetrics) + .onThirdCall().resolves(nextMetrics) + + const service = new SnapshotService(makeCollector(stub)) + + const first = await service.refresh() + expect(first.changed).to.equal(true, 'first refresh should report changed') + expect(first.snapshot.sequence).to.equal(1) + expect(first.snapshot.status).to.equal('live') + + const second = await service.refresh() + expect(second.changed).to.equal(false, 'second refresh with same metrics should not change') + expect(second.snapshot.sequence).to.equal(1, 'sequence must not advance when metrics are unchanged') + + const third = await service.refresh() + expect(third.changed).to.equal(true, 'third refresh with new metrics should report changed') + expect(third.snapshot.sequence).to.equal(2) + expect(third.snapshot.metrics.admittedUsers).to.equal(2) + }) + + it('does not advance sequence when metrics are identical across refreshes', async () => { + const metrics = createMetrics({ satsPaid: 100 }) + const stub = sandbox.stub().resolves(metrics) + + const service = new SnapshotService(makeCollector(stub)) + + const first = await service.refresh() + expect(first.changed).to.equal(true) + expect(first.snapshot.sequence).to.equal(1) + + const second = await service.refresh() + expect(second.changed).to.equal(false) + expect(second.snapshot.sequence).to.equal(1) + }) + + it('propagates collector errors to the caller', async () => { + const stub = sandbox.stub().rejects(new Error('db down')) + + const service = new SnapshotService(makeCollector(stub)) + + await expect(service.refresh()).to.be.rejectedWith('db down') + }) + + it('returns the last known snapshot via getSnapshot()', async () => { + const metrics = createMetrics({ admittedUsers: 5 }) + const stub = sandbox.stub().resolves(metrics) + + const service = new SnapshotService(makeCollector(stub)) + + await service.refresh() + + const snap = service.getSnapshot() + expect(snap.sequence).to.equal(1) + expect(snap.status).to.equal('live') + expect(snap.metrics.admittedUsers).to.equal(5) + }) + + it('sets status to live after a successful refresh', async () => { + const stub = sandbox.stub().resolves(createMetrics()) + + const service = new SnapshotService(makeCollector(stub)) + + const { snapshot } = await service.refresh() + expect(snapshot.status).to.equal('live') + }) +})