-
Notifications
You must be signed in to change notification settings - Fork 229
Phase2: Implementation of metrics collectors for Dashboard #464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
057634f
4b48184
8d1771b
4e8c916
564f405
a8d119b
4b6e89b
28807f4
d5e1e01
583773d
65f23a0
02e6a2b
9648ba7
8962a98
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| exports.up = async function (knex) { | ||
| await knex.schema.raw(` | ||
| CREATE OR REPLACE FUNCTION notify_dashboard_events_changed() RETURNS TRIGGER AS $$ | ||
| BEGIN | ||
| PERFORM pg_notify('dashboard_events_changed', TG_OP); | ||
| RETURN NULL; | ||
| END; | ||
| $$ LANGUAGE plpgsql; | ||
| `) | ||
|
|
||
| await knex.schema.raw(` | ||
| DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events; | ||
|
|
||
| CREATE TRIGGER dashboard_events_changed_trigger | ||
| AFTER INSERT OR UPDATE OR DELETE ON events | ||
| FOR EACH STATEMENT | ||
| EXECUTE FUNCTION notify_dashboard_events_changed(); | ||
| `) | ||
|
|
||
| await knex.schema.raw(` | ||
| CREATE OR REPLACE FUNCTION notify_dashboard_users_changed() RETURNS TRIGGER AS $$ | ||
| BEGIN | ||
| PERFORM pg_notify('dashboard_users_changed', TG_OP); | ||
| RETURN NULL; | ||
| END; | ||
| $$ LANGUAGE plpgsql; | ||
| `) | ||
|
|
||
| await knex.schema.raw(` | ||
| DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users; | ||
|
|
||
| CREATE TRIGGER dashboard_users_changed_trigger | ||
| AFTER INSERT OR UPDATE OR DELETE ON users | ||
| FOR EACH STATEMENT | ||
| EXECUTE FUNCTION notify_dashboard_users_changed(); | ||
| `) | ||
| } | ||
|
|
||
| exports.down = async function (knex) { | ||
| await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events;') | ||
| await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users;') | ||
| await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_events_changed();') | ||
| await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_users_changed();') | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| /** | ||
| * Migration: add dashboard KPI query indexes | ||
| * | ||
| * Without these the incremental collector degrades to sequential scans: | ||
| * - idx_events_cursor → covers the (first_seen, id) cursor predicate used in every | ||
| * incremental delta query and the bootstrap cursor select. | ||
| * - idx_events_pubkey → covers the GROUP BY event_pubkey in the all-time talker query. | ||
| * - idx_users_cursor → covers the (updated_at, pubkey) cursor predicate used in the | ||
| * user delta / cursor-select queries. | ||
| * | ||
| * All three are created CONCURRENTLY so they don't lock the table on a live relay. | ||
| * knex does not support CREATE INDEX CONCURRENTLY natively, so we use raw SQL and | ||
| * set `disableTransactions` to true (DDL inside a transaction would negate CONCURRENTLY). | ||
| */ | ||
|
|
||
| exports.up = async (knex) => { | ||
| await knex.raw(` | ||
| CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_cursor | ||
| ON events (first_seen, id); | ||
| `) | ||
|
|
||
| await knex.raw(` | ||
| CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_pubkey | ||
| ON events (event_pubkey); | ||
| `) | ||
|
Comment on lines
+22
to
+25
|
||
|
|
||
| await knex.raw(` | ||
| CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_cursor | ||
| ON users (updated_at, pubkey); | ||
| `) | ||
| } | ||
|
|
||
| exports.down = async (knex) => { | ||
| await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_cursor;') | ||
| await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_pubkey;') | ||
| await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_users_cursor;') | ||
| } | ||
|
|
||
| // Required so knex doesn't wrap the CONCURRENTLY statements in a transaction. | ||
| exports.config = { transaction: false } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| exports.up = async function (knex) { | ||
| await knex.schema.createTable('groups', (table) => { | ||
| table.string('group_id', 64).primary() | ||
| table.binary('owner_pubkey').notNullable() | ||
| table.string('leader_relay_url', 255).notNullable() | ||
| table.timestamp('created_at', { useTz: true }).defaultTo(knex.fn.now()) | ||
| }) | ||
|
|
||
| await knex.schema.createTable('group_control_log', (table) => { | ||
| table.bigIncrements('sequence_id').primary() | ||
| table.string('group_id', 64).notNullable().references('group_id').inTable('groups') | ||
| table.bigInteger('group_sequence').notNullable() | ||
| table.binary('event_id').notNullable().unique() | ||
| table.binary('pubkey').notNullable() | ||
| table.integer('kind').notNullable() | ||
| table.jsonb('raw_event').notNullable() | ||
| table.string('state_root', 64).notNullable() | ||
| table.timestamp('created_at', { useTz: true }).defaultTo(knex.fn.now()) | ||
|
|
||
| table.unique(['group_id', 'group_sequence']) | ||
| table.index(['group_id', 'group_sequence']) | ||
| table.index(['group_id', 'created_at']) | ||
| table.index(['kind']) | ||
| }) | ||
|
|
||
| await knex.raw( | ||
| 'ALTER TABLE group_control_log ADD CONSTRAINT group_control_log_kind_check CHECK (kind >= 9000 AND kind <= 9006);' | ||
| ) | ||
|
|
||
| await knex.schema.createTable('group_memberships', (table) => { | ||
| table.string('group_id', 64).notNullable().references('group_id').inTable('groups') | ||
| table.binary('pubkey').notNullable() | ||
| table.string('role', 20).notNullable() | ||
|
|
||
| table.primary(['group_id', 'pubkey']) | ||
| table.index(['group_id', 'role']) | ||
| table.index(['pubkey']) | ||
| }) | ||
| } | ||
|
|
||
| exports.down = async function (knex) { | ||
| await knex.schema.dropTableIfExists('group_memberships') | ||
| await knex.raw('ALTER TABLE IF EXISTS group_control_log DROP CONSTRAINT IF EXISTS group_control_log_kind_check;') | ||
| await knex.schema.dropTableIfExists('group_control_log') | ||
| await knex.schema.dropTableIfExists('groups') | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| exports.up = async function (knex) { | ||
| await knex.schema.createTable('dashboard_state', (table) => { | ||
| table.integer('id').primary() | ||
| table.bigInteger('revision').notNullable().defaultTo(0) | ||
| table.timestamp('updated_at', { useTz: false }).notNullable().defaultTo(knex.fn.now()) | ||
| }) | ||
|
|
||
| await knex('dashboard_state') | ||
| .insert({ | ||
| id: 1, | ||
| revision: 0, | ||
| updated_at: knex.fn.now(), | ||
| }) | ||
| .onConflict('id') | ||
| .ignore() | ||
|
|
||
| await knex.schema.raw(` | ||
| CREATE OR REPLACE FUNCTION dashboard_updated() RETURNS TRIGGER AS $$ | ||
| BEGIN | ||
| UPDATE dashboard_state | ||
| SET | ||
| revision = revision + 1, | ||
| updated_at = NOW() | ||
| WHERE id = 1; | ||
|
Comment on lines
+20
to
+24
|
||
|
|
||
| RETURN NULL; | ||
| END; | ||
| $$ LANGUAGE plpgsql; | ||
| `) | ||
|
|
||
| await knex.schema.raw(` | ||
| DROP TRIGGER IF EXISTS dashboard_revision_events_trigger ON events; | ||
|
|
||
| CREATE TRIGGER dashboard_revision_events_trigger | ||
| AFTER INSERT OR UPDATE OR DELETE ON events | ||
| FOR EACH STATEMENT | ||
| EXECUTE FUNCTION dashboard_updated(); | ||
| `) | ||
|
|
||
| await knex.schema.raw(` | ||
| DROP TRIGGER IF EXISTS dashboard_revision_users_trigger ON users; | ||
|
|
||
| CREATE TRIGGER dashboard_revision_users_trigger | ||
| AFTER INSERT OR UPDATE OR DELETE ON users | ||
| FOR EACH STATEMENT | ||
| EXECUTE FUNCTION dashboard_updated(); | ||
| `) | ||
| } | ||
|
|
||
| exports.down = async function (knex) { | ||
| await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_revision_events_trigger ON events;') | ||
| await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_revision_users_trigger ON users;') | ||
| await knex.schema.raw('DROP FUNCTION IF EXISTS dashboard_updated();') | ||
| await knex.schema.dropTableIfExists('dashboard_state') | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| import { Router } from 'express' | ||
|
|
||
| import { createGetKPISnapshotRequestHandler } from '../handlers/request-handlers/get-kpi-snapshot-request-handler' | ||
| import { SnapshotService } from '../services/snapshot-service' | ||
|
|
||
| export const createDashboardRouter = (snapshotService: SnapshotService): Router => { | ||
| const router = Router() | ||
|
|
||
| router.get('/snapshot', createGetKPISnapshotRequestHandler(snapshotService)) | ||
|
|
||
| return router | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| import { IKPICollector, SnapshotService } from './services/snapshot-service' | ||
| import { createDashboardRouter } from './api/dashboard-router' | ||
| import { createLogger } from '../factories/logger-factory' | ||
| import { DashboardServiceConfig } from './config' | ||
| import { DashboardUpdateVersionService } from './services/dashboard-update-version-service' | ||
| import { DashboardWebSocketHub } from './ws/dashboard-ws-hub' | ||
| import express from 'express' | ||
| import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler' | ||
| import { getReadReplicaDbClient } from '../database/client' | ||
| import http from 'http' | ||
| import { KPICollectorService } from './services/kpi-collector-service' | ||
| import { PollingScheduler } from './polling/polling-scheduler' | ||
| import { WebSocketServer } from 'ws' | ||
| const debug = createLogger('dashboard-service:app') | ||
|
|
||
| export interface DashboardService { | ||
| readonly config: DashboardServiceConfig | ||
| readonly snapshotService: SnapshotService | ||
| readonly pollingScheduler: PollingScheduler | ||
| start(): Promise<void> | ||
| stop(): Promise<void> | ||
| getHttpPort(): number | ||
| } | ||
|
|
||
| export const createDashboardService = (config: DashboardServiceConfig): DashboardService => { | ||
| console.info( | ||
| 'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d, useDummyData=%s)', | ||
| config.host, | ||
| config.port, | ||
| config.wsPath, | ||
| config.pollIntervalMs, | ||
| config.useDummyData, | ||
| ) | ||
|
|
||
| const dbClient = config.useDummyData ? undefined : getReadReplicaDbClient() | ||
| const collector: IKPICollector = config.useDummyData | ||
| ? { | ||
| collectMetrics: async () => ({ | ||
| eventsByKind: [], | ||
| admittedUsers: 0, | ||
| satsPaid: 0, | ||
| topTalkers: { allTime: [], recent: [] }, | ||
| }), | ||
| } | ||
| : new KPICollectorService(dbClient) | ||
|
|
||
| const updateVersionProvider = typeof dbClient === 'undefined' | ||
| ? undefined | ||
| : new DashboardUpdateVersionService(dbClient) | ||
|
|
||
| const snapshotService = new SnapshotService(collector, updateVersionProvider) | ||
|
|
||
| const app = express() | ||
| .disable('x-powered-by') | ||
| .get('/healthz', getHealthRequestHandler) | ||
| .use('/api/v1/kpis', createDashboardRouter(snapshotService)) | ||
|
|
||
| const webServer = http.createServer(app) | ||
| const webSocketServer = new WebSocketServer({ | ||
| server: webServer, | ||
| path: config.wsPath, | ||
| }) | ||
|
|
||
| const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot()) | ||
|
|
||
| const pollingScheduler = new PollingScheduler(config.pollIntervalMs, async () => { | ||
| const { snapshot, changed } = await snapshotService.refresh() | ||
|
|
||
| if (!changed) { | ||
| debug('poll tick detected no KPI changes') | ||
| return | ||
| } | ||
|
|
||
| debug('poll tick produced snapshot sequence=%d status=%s', snapshot.sequence, snapshot.status) | ||
| webSocketHub.broadcastTick(snapshot.sequence) | ||
| webSocketHub.broadcastSnapshot(snapshot) | ||
| }) | ||
|
|
||
| const start = async () => { | ||
| if (webServer.listening) { | ||
| debug('start requested but service is already listening') | ||
| return | ||
| } | ||
|
|
||
| console.info('dashboard-service: starting http and websocket servers') | ||
|
|
||
| await new Promise<void>((resolve, reject) => { | ||
| webServer.listen(config.port, config.host, () => { | ||
| const address = webServer.address() | ||
| debug('listening on %o', address) | ||
| console.info('dashboard-service: listening on %o', address) | ||
| resolve() | ||
| }) | ||
| webServer.once('error', (error) => { | ||
| console.error('dashboard-service: failed to start server', error) | ||
| reject(error) | ||
| }) | ||
| }) | ||
|
|
||
| try { | ||
| const initialSnapshotRefresh = await snapshotService.refresh() | ||
| if (initialSnapshotRefresh.changed) { | ||
| debug('initial snapshot prepared with sequence=%d status=%s', initialSnapshotRefresh.snapshot.sequence, initialSnapshotRefresh.snapshot.status) | ||
| } | ||
| } catch (error) { | ||
| console.error('dashboard-service: initial snapshot refresh failed (will retry on next poll)', error) | ||
| } | ||
|
|
||
| pollingScheduler.start() | ||
| console.info('dashboard-service: polling scheduler started') | ||
| } | ||
|
|
||
| const stop = async () => { | ||
| console.info('dashboard-service: stopping service') | ||
| pollingScheduler.stop() | ||
|
|
||
| if (collector?.close) { | ||
| try { | ||
| await collector.close() | ||
| } catch (error) { | ||
| console.error('dashboard-service: failed to close collector resources', error) | ||
| } | ||
| } | ||
|
|
||
| webSocketHub.close() | ||
| await new Promise<void>((resolve, reject) => { | ||
| if (!webServer.listening) { | ||
| debug('stop requested while server was already stopped') | ||
| resolve() | ||
| return | ||
| } | ||
|
|
||
| webServer.close((error) => { | ||
| if (error) { | ||
| console.error('dashboard-service: failed to stop cleanly', error) | ||
| reject(error) | ||
| return | ||
| } | ||
|
|
||
| console.info('dashboard-service: http server closed') | ||
| resolve() | ||
| }) | ||
| }) | ||
| } | ||
|
|
||
| const getHttpPort = (): number => { | ||
| const address = webServer.address() | ||
| return typeof address === 'object' && address !== null ? address.port : config.port | ||
| } | ||
|
|
||
| return { | ||
| config, | ||
| snapshotService, | ||
| pollingScheduler, | ||
| start, | ||
| stop, | ||
| getHttpPort, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This migration creates
idx_events_pubkeyon events(event_pubkey), but the events table already defines an index on event_pubkey (created in the initial events table migration). Adding a second btree index on the same column increases write amplification and disk usage without improving plans. Consider droppingidx_events_pubkeyand relying on the existing index (or change it to a composite/covering index if you need different ordering).