Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions migrations/20260411_210100_add_dashboard_live_notify_triggers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
exports.up = async function (knex) {
await knex.schema.raw(`
CREATE OR REPLACE FUNCTION notify_dashboard_events_changed() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('dashboard_events_changed', TG_OP);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`)

await knex.schema.raw(`
DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events;

CREATE TRIGGER dashboard_events_changed_trigger
AFTER INSERT OR UPDATE OR DELETE ON events
FOR EACH STATEMENT
EXECUTE FUNCTION notify_dashboard_events_changed();
`)

await knex.schema.raw(`
CREATE OR REPLACE FUNCTION notify_dashboard_users_changed() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('dashboard_users_changed', TG_OP);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`)

await knex.schema.raw(`
DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users;

CREATE TRIGGER dashboard_users_changed_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH STATEMENT
EXECUTE FUNCTION notify_dashboard_users_changed();
`)
}

exports.down = async function (knex) {
await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events;')
await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users;')
await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_events_changed();')
await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_users_changed();')
}
40 changes: 40 additions & 0 deletions migrations/20260412_020000_add_dashboard_kpi_indexes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Migration: add dashboard KPI query indexes
*
* Without these the incremental collector degrades to sequential scans:
* - idx_events_cursor → covers the (first_seen, id) cursor predicate used in every
* incremental delta query and the bootstrap cursor select.
* - idx_events_pubkey → covers the GROUP BY event_pubkey in the all-time talker query.
* - idx_users_cursor → covers the (updated_at, pubkey) cursor predicate used in the
* user delta / cursor-select queries.
*
* All three are created CONCURRENTLY so they don't lock the table on a live relay.
* knex does not support CREATE INDEX CONCURRENTLY natively, so we use raw SQL and
* set `disableTransactions` to true (DDL inside a transaction would negate CONCURRENTLY).
*/

exports.up = async (knex) => {
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_cursor
ON events (first_seen, id);
`)

await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_pubkey
ON events (event_pubkey);
`)
Comment on lines +22 to +25
Copy link

Copilot AI Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This migration creates idx_events_pubkey on events(event_pubkey), but the events table already defines an index on event_pubkey (created in the initial events table migration). Adding a second btree index on the same column increases write amplification and disk usage without improving plans. Consider dropping idx_events_pubkey and relying on the existing index (or change it to a composite/covering index if you need different ordering).

Copilot uses AI. Check for mistakes.
Comment on lines +22 to +25
Copy link

Copilot AI Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This migration creates idx_events_pubkey on events(event_pubkey), but the events table already has a btree index on event_pubkey from its original migration. A duplicate index increases insert/update cost and disk usage without improving query plans. Consider removing idx_events_pubkey (or changing it to a genuinely different composite/covering index if needed).

Copilot uses AI. Check for mistakes.

await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_cursor
ON users (updated_at, pubkey);
`)
}

exports.down = async (knex) => {
await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_cursor;')
await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_pubkey;')
await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_users_cursor;')
}

// Required so knex doesn't wrap the CONCURRENTLY statements in a transaction.
exports.config = { transaction: false }
46 changes: 46 additions & 0 deletions migrations/20260415_010000_add_nip29_group_control_tables.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
exports.up = async function (knex) {
await knex.schema.createTable('groups', (table) => {
table.string('group_id', 64).primary()
table.binary('owner_pubkey').notNullable()
table.string('leader_relay_url', 255).notNullable()
table.timestamp('created_at', { useTz: true }).defaultTo(knex.fn.now())
})

await knex.schema.createTable('group_control_log', (table) => {
table.bigIncrements('sequence_id').primary()
table.string('group_id', 64).notNullable().references('group_id').inTable('groups')
table.bigInteger('group_sequence').notNullable()
table.binary('event_id').notNullable().unique()
table.binary('pubkey').notNullable()
table.integer('kind').notNullable()
table.jsonb('raw_event').notNullable()
table.string('state_root', 64).notNullable()
table.timestamp('created_at', { useTz: true }).defaultTo(knex.fn.now())

table.unique(['group_id', 'group_sequence'])
table.index(['group_id', 'group_sequence'])
table.index(['group_id', 'created_at'])
table.index(['kind'])
})

await knex.raw(
'ALTER TABLE group_control_log ADD CONSTRAINT group_control_log_kind_check CHECK (kind >= 9000 AND kind <= 9006);'
)

await knex.schema.createTable('group_memberships', (table) => {
table.string('group_id', 64).notNullable().references('group_id').inTable('groups')
table.binary('pubkey').notNullable()
table.string('role', 20).notNullable()

table.primary(['group_id', 'pubkey'])
table.index(['group_id', 'role'])
table.index(['pubkey'])
})
}

exports.down = async function (knex) {
await knex.schema.dropTableIfExists('group_memberships')
await knex.raw('ALTER TABLE IF EXISTS group_control_log DROP CONSTRAINT IF EXISTS group_control_log_kind_check;')
await knex.schema.dropTableIfExists('group_control_log')
await knex.schema.dropTableIfExists('groups')
}
55 changes: 55 additions & 0 deletions migrations/20260419_120000_add_dashboard_updated_revision.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
exports.up = async function (knex) {
await knex.schema.createTable('dashboard_state', (table) => {
table.integer('id').primary()
table.bigInteger('revision').notNullable().defaultTo(0)
table.timestamp('updated_at', { useTz: false }).notNullable().defaultTo(knex.fn.now())
})

await knex('dashboard_state')
.insert({
id: 1,
revision: 0,
updated_at: knex.fn.now(),
})
.onConflict('id')
.ignore()

await knex.schema.raw(`
CREATE OR REPLACE FUNCTION dashboard_updated() RETURNS TRIGGER AS $$
BEGIN
UPDATE dashboard_state
SET
revision = revision + 1,
updated_at = NOW()
WHERE id = 1;
Comment on lines +20 to +24
Copy link

Copilot AI Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dashboard_updated() performs UPDATE dashboard_state ... WHERE id = 1 on every INSERT/UPDATE/DELETE statement for both events and users. Because event writes are typically single-row statements, this introduces a hot-row contention point and extra WAL writes on high-throughput relays. Consider using a sequence-based revision (nextval) or relying on LISTEN/NOTIFY (added separately) instead of updating a single shared row for every write.

Copilot uses AI. Check for mistakes.

RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`)

await knex.schema.raw(`
DROP TRIGGER IF EXISTS dashboard_revision_events_trigger ON events;

CREATE TRIGGER dashboard_revision_events_trigger
AFTER INSERT OR UPDATE OR DELETE ON events
FOR EACH STATEMENT
EXECUTE FUNCTION dashboard_updated();
`)

await knex.schema.raw(`
DROP TRIGGER IF EXISTS dashboard_revision_users_trigger ON users;

CREATE TRIGGER dashboard_revision_users_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH STATEMENT
EXECUTE FUNCTION dashboard_updated();
`)
}

exports.down = async function (knex) {
await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_revision_events_trigger ON events;')
await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_revision_users_trigger ON users;')
await knex.schema.raw('DROP FUNCTION IF EXISTS dashboard_updated();')
await knex.schema.dropTableIfExists('dashboard_state')
}
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
"main": "src/index.ts",
"scripts": {
"dev": "node -r ts-node/register src/index.ts",
"dev:dashboard": "node -r ts-node/register src/dashboard-service/index.ts",
"clean": "rimraf ./{dist,.nyc_output,.test-reports,.coverage}",
"build": "tsc --project tsconfig.build.json",
"prestart": "npm run build",
"start": "cd dist && node src/index.js",
"start:dashboard": "node dist/src/dashboard-service/index.js",
"build:check": "npm run build -- --noEmit",
"lint": "eslint --ext .ts ./src ./test",
"lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test",
Expand All @@ -37,6 +39,7 @@
"db:seed": "knex seed:run",
"pretest:unit": "mkdir -p .test-reports/unit",
"test:unit": "mocha 'test/**/*.spec.ts'",
"test:unit:dashboard": "mocha 'test/unit/dashboard-service/**/*.spec.ts'",
"test:unit:watch": "npm run test:unit -- --min --watch --watch-files src/**/*,test/**/*",
"cover:unit": "nyc --report-dir .coverage/unit npm run test:unit",
"docker:build": "docker build -t nostream .",
Expand Down
12 changes: 12 additions & 0 deletions src/dashboard-service/api/dashboard-router.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Router } from 'express'

import { createGetKPISnapshotRequestHandler } from '../handlers/request-handlers/get-kpi-snapshot-request-handler'
import { SnapshotService } from '../services/snapshot-service'

export const createDashboardRouter = (snapshotService: SnapshotService): Router => {
const router = Router()

router.get('/snapshot', createGetKPISnapshotRequestHandler(snapshotService))

return router
}
159 changes: 159 additions & 0 deletions src/dashboard-service/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import { IKPICollector, SnapshotService } from './services/snapshot-service'
import { createDashboardRouter } from './api/dashboard-router'
import { createLogger } from '../factories/logger-factory'
import { DashboardServiceConfig } from './config'
import { DashboardUpdateVersionService } from './services/dashboard-update-version-service'
import { DashboardWebSocketHub } from './ws/dashboard-ws-hub'
import express from 'express'
import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler'
import { getReadReplicaDbClient } from '../database/client'
import http from 'http'
import { KPICollectorService } from './services/kpi-collector-service'
import { PollingScheduler } from './polling/polling-scheduler'
import { WebSocketServer } from 'ws'
const debug = createLogger('dashboard-service:app')

export interface DashboardService {
readonly config: DashboardServiceConfig
readonly snapshotService: SnapshotService
readonly pollingScheduler: PollingScheduler
start(): Promise<void>
stop(): Promise<void>
getHttpPort(): number
}

export const createDashboardService = (config: DashboardServiceConfig): DashboardService => {
console.info(
'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d, useDummyData=%s)',
config.host,
config.port,
config.wsPath,
config.pollIntervalMs,
config.useDummyData,
)

const dbClient = config.useDummyData ? undefined : getReadReplicaDbClient()
const collector: IKPICollector = config.useDummyData
? {
collectMetrics: async () => ({
eventsByKind: [],
admittedUsers: 0,
satsPaid: 0,
topTalkers: { allTime: [], recent: [] },
}),
}
: new KPICollectorService(dbClient)

const updateVersionProvider = typeof dbClient === 'undefined'
? undefined
: new DashboardUpdateVersionService(dbClient)

const snapshotService = new SnapshotService(collector, updateVersionProvider)

const app = express()
.disable('x-powered-by')
.get('/healthz', getHealthRequestHandler)
.use('/api/v1/kpis', createDashboardRouter(snapshotService))

const webServer = http.createServer(app)
const webSocketServer = new WebSocketServer({
server: webServer,
path: config.wsPath,
})

const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot())

const pollingScheduler = new PollingScheduler(config.pollIntervalMs, async () => {
const { snapshot, changed } = await snapshotService.refresh()

if (!changed) {
debug('poll tick detected no KPI changes')
return
}

debug('poll tick produced snapshot sequence=%d status=%s', snapshot.sequence, snapshot.status)
webSocketHub.broadcastTick(snapshot.sequence)
webSocketHub.broadcastSnapshot(snapshot)
})

const start = async () => {
if (webServer.listening) {
debug('start requested but service is already listening')
return
}

console.info('dashboard-service: starting http and websocket servers')

await new Promise<void>((resolve, reject) => {
webServer.listen(config.port, config.host, () => {
const address = webServer.address()
debug('listening on %o', address)
console.info('dashboard-service: listening on %o', address)
resolve()
})
webServer.once('error', (error) => {
console.error('dashboard-service: failed to start server', error)
reject(error)
})
})

try {
const initialSnapshotRefresh = await snapshotService.refresh()
if (initialSnapshotRefresh.changed) {
debug('initial snapshot prepared with sequence=%d status=%s', initialSnapshotRefresh.snapshot.sequence, initialSnapshotRefresh.snapshot.status)
}
} catch (error) {
console.error('dashboard-service: initial snapshot refresh failed (will retry on next poll)', error)
}

pollingScheduler.start()
console.info('dashboard-service: polling scheduler started')
}

const stop = async () => {
console.info('dashboard-service: stopping service')
pollingScheduler.stop()

if (collector?.close) {
try {
await collector.close()
} catch (error) {
console.error('dashboard-service: failed to close collector resources', error)
}
}

webSocketHub.close()
await new Promise<void>((resolve, reject) => {
if (!webServer.listening) {
debug('stop requested while server was already stopped')
resolve()
return
}

webServer.close((error) => {
if (error) {
console.error('dashboard-service: failed to stop cleanly', error)
reject(error)
return
}

console.info('dashboard-service: http server closed')
resolve()
})
})
}

const getHttpPort = (): number => {
const address = webServer.address()
return typeof address === 'object' && address !== null ? address.port : config.port
}

return {
config,
snapshotService,
pollingScheduler,
start,
stop,
getHttpPort,
}
}
Loading
Loading