Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Changesets

Hello and welcome! This folder has been automatically generated by `@changesets/cli`, a build tool that works
with multi-package repos, or single-package repos to help you version and publish your code. You can
find the full documentation for it [in our repository](https://github.com/changesets/changesets).

We have a quick list of common questions to get you started engaging with this project in
[our documentation](https://github.com/changesets/changesets/blob/main/docs/common-questions.md).
2 changes: 1 addition & 1 deletion .changeset/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"$schema": "https://unpkg.com/@changesets/config@3.1.1/schema.json",
"$schema": "https://unpkg.com/@changesets/config@3.1.4/schema.json",
"changelog": "@changesets/cli/changelog",
"commit": false,
"fixed": [],
Expand Down
5 changes: 5 additions & 0 deletions .changeset/light-lilies-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"nostream": minor
---

Add EWMA rate limiter with configurable strategy support
5 changes: 5 additions & 0 deletions .changeset/slimy-bars-burn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"nostream": minor
---

Add EWMA rate limiter with strategy support
40 changes: 9 additions & 31 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,40 +109,18 @@ The settings below are listed in alphabetical order by name. Please keep this ta
| | Defaults to zero. Disabled when set to zero. |
| limits.event.pubkey.whitelist | List of public keys to always allow. Only public keys in this list will be able to post to this relay. Use for private relays. |
| limits.event.rateLimits[].kinds | List of event kinds rate limited. Use `[min, max]` for ranges. Optional. |
| limits.event.rateLimits[].period | Rate limiting period in milliseconds. |
| limits.event.rateLimits[].period | Rate limiting period in milliseconds. For `sliding_window`: the time window during which requests are counted. For `ewma`: the half-life of the exponential decay — shorter values forget bursts faster, longer values are stricter on bursty clients. |
| limits.event.rateLimits[].rate | Maximum number of events during period. |
| limits.event.retention.kind.whitelist | Event kinds excluded from retention purge. NIP-62 `REQUEST_TO_VANISH` is always excluded from retention purge, even if not listed here. |
| limits.event.retention.maxDays | Maximum number of days to retain events. Purge deletes events that are expired (`expires_at`), soft-deleted (`deleted_at`), or older than this window (`created_at`). Any non-positive value disables retention purge. |
| limits.event.retention.pubkey.whitelist | Public keys excluded from retention purge. |
| limits.event.whitelists.ipAddresses | List of IPs (IPv4 or IPv6) to ignore rate limits. |
| limits.event.whitelists.pubkeys | List of public keys to ignore rate limits. |
| limits.message.ipWhitelist | List of IPs (IPv4 or IPv6) to ignore rate limits. |
| limits.message.rateLimits[].period | Rate limit period in milliseconds. |
| limits.client.subscription.maxSubscriptions | Maximum number of subscriptions per connected client. Defaults to 10. Disabled when set to zero. |
| limits.client.subscription.maxFilters | Maximum number of filters per subscription. Defaults to 10. Disabled when set to zero. |
| limits.message.rateLimits[].period | Rate limiting period in milliseconds. For `sliding_window`: the time window. For `ewma`: the half-life of the decay function. |
| limits.message.rateLimits[].rate | Maximum number of messages during period. |
| mirroring.static[].address | Address of mirrored relay. (e.g. ws://100.100.100.100:8008) |
| mirroring.static[].filters | Subscription filters used to mirror. |
| mirroring.static[].limits.event | Event limit overrides for this mirror. See configurations under limits.event. |
| mirroring.static[].secret | Secret to pass to relays. Nostream relays only. Optional. |
| mirroring.static[].skipAdmissionCheck | Disable the admission fee check for events coming from this mirror. |
| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame |
| network.remoteIpHeader | HTTP header from proxy containing IP address from client. |
| nip05.domainBlacklist | List of domains blocked from NIP-05 verification. Authors with NIP-05 at these domains will be rejected. |
| nip05.domainWhitelist | List of domains allowed for NIP-05 verification. If set, only authors verified at these domains can publish. |
| nip05.maxConsecutiveFailures | Number of consecutive verification failures before giving up on an author. Defaults to 20. |
| nip05.mode | NIP-05 verification mode: `enabled` requires verification, `passive` verifies without blocking, `disabled` does nothing. Defaults to `disabled`. |
| nip05.verifyExpiration | Time in milliseconds before a successful NIP-05 verification expires and needs re-checking. Defaults to 604800000 (1 week). |
| nip05.verifyUpdateFrequency | Minimum interval in milliseconds between re-verification attempts for a given author. Defaults to 86400000 (24 hours). |
| paymentProcessors.lnbits.baseURL | Base URL of your Lnbits instance. |
| paymentProcessors.lnbits.callbackBaseURL | Public-facing Nostream's Lnbits Callback URL. (e.g. https://relay.your-domain.com/callbacks/lnbits) |
| paymentProcessors.lnurl.invoiceURL | [LUD-06 Pay Request](https://github.com/lnurl/luds/blob/luds/06.md) provider URL. (e.g. https://getalby.com/lnurlp/your-username) |
| paymentProcessors.zebedee.baseURL | Zebedee's API base URL. |
| paymentProcessors.zebedee.callbackBaseURL | Public-facing Nostream's Zebedee Callback URL (e.g. https://relay.your-domain.com/callbacks/zebedee) |
| paymentProcessors.zebedee.ipWhitelist | List with Zebedee's API Production IPs. See [ZBD API Documentation](https://api-reference.zebedee.io/#c7e18276-6935-4cca-89ae-ad949efe9a6a) for more info. |
| payments.enabled | Enabled payments. Defaults to false. |
| payments.feeSchedules.admission[].amount | Admission fee amount in msats. |
| payments.feeSchedules.admission[].enabled | Enables admission fee. Defaults to false. |
| payments.feeSchedules.admission[].whitelists.event_kinds | List of event kinds to waive admission fee. Use `[min, max]` for ranges. |
| payments.feeSchedules.admission[].whitelists.pubkeys | List of pubkeys to waive admission fee. |
| payments.processor | Either `zebedee`, `lnbits`, `lnurl`. |
| workers.count | Number of workers to spin up to handle incoming connections. |
| | Spin workers as many CPUs are available when set to zero. Defaults to zero. |
| limits.message.ipWhitelist | List of IPs (IPv4 or IPv6) to ignore rate limits. |
| limits.admissionCheck.rateLimits[].period | Rate limiting period in milliseconds. For `sliding_window`: the time window. For `ewma`: the half-life of the decay function. |
| limits.admissionCheck.rateLimits[].rate | Maximum number of admission checks during period. |
| limits.admissionCheck.ipWhitelist | List of IPs (IPv4 or IPv6) to ignore rate limits. |
| limits.rateLimiter.strategy | Rate limiting strategy. Either `ewma` or `sliding_window`. Defaults to `ewma`. When using `ewma`, the `period` field in each rate limit serves as the half-life for the exponential decay function. Note: when switching from `sliding_window` to `ewma`, consider increasing `rate` values slightly as EWMA penalizes bursty behavior more aggressively. |
3 changes: 3 additions & 0 deletions resources/default-settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ workers:
mirroring:
static: []
limits:
# strategy selection configuration for rate limiting:
rateLimiter:
strategy: ewma
invoice:
rateLimits:
- period: 60000
Expand Down
4 changes: 4 additions & 0 deletions src/@types/adapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ export interface ICacheAdapter {
removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number>
getRangeFromSortedSet(key: string, start: number, stop: number): Promise<string[]>
setKeyExpiry(key: string, expiry: number): Promise<void>
deleteKey(key: string): Promise<number>
getHKey(key: string, field: string): Promise<string>
setHKey(key: string, fields: Record<string, string>): Promise<boolean>
eval(script: string, keys: string[], args: string[]): Promise<unknown>
}
5 changes: 5 additions & 0 deletions src/@types/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ export interface RateLimit {
rate: number
}

export interface RateLimiterSettings {
strategy: 'ewma' | 'sliding_window'
}

export interface EventIdLimits {
minLeadingZeroBits?: number
}
Expand Down Expand Up @@ -133,6 +137,7 @@ export interface AdmissionCheckLimits {
}

export interface Limits {
rateLimiter?: RateLimiterSettings
invoice?: InvoiceLimits
admissionCheck?: AdmissionCheckLimits
connection?: ConnectionLimits
Expand Down
33 changes: 32 additions & 1 deletion src/adapters/redis-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import { ICacheAdapter } from '../@types/adapters'
const debug = createLogger('redis-adapter')

export class RedisAdapter implements ICacheAdapter {

private connection: Promise<void>

private scriptShas: Map<string, string> = new Map()

public constructor(private readonly client: CacheClient) {
this.connection = client.connect()
this.connection = client.isOpen ? Promise.resolve() : client.connect()

this.connection.catch((error) => this.onClientError(error))

Expand Down Expand Up @@ -92,4 +95,32 @@ export class RedisAdapter implements ICacheAdapter {

return this.client.zAdd(key, members)
}

public async deleteKey(key: string): Promise<number> {
await this.connection
debug('delete %s key', key)
return this.client.del(key)
}

public async getHKey(key: string, field: string): Promise<string> {
await this.connection
debug('get %s field for key %s', field, key)
return await this.client.hGet(key, field) ?? ''
}

public async setHKey(key: string, fields: Record<string, string>): Promise<boolean> {
await this.connection
debug('set %s key', key)
return await this.client.hSet(key, fields) >= 0
}

public async eval(script: string, keys: string[], args: string[]): Promise<unknown> {
await this.connection
if (!this.scriptShas.has(script)) {
const sha = await this.client.scriptLoad(script)
this.scriptShas.set(script, sha)
}
return await this.client.evalSha(this.scriptShas.get(script)!, { keys, arguments: args })
}

}
4 changes: 2 additions & 2 deletions src/adapters/web-socket-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
private readonly request: IncomingHttpMessage,
private readonly webSocketServer: IWebSocketServerAdapter,
private readonly createMessageHandler: Factory<IMessageHandler, [IncomingMessage, IWebSocketAdapter]>,
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
private readonly rateLimiter: Factory<IRateLimiter>,
private readonly settings: Factory<Settings>,
) {
super()
Expand Down Expand Up @@ -211,7 +211,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
return false
}

const rateLimiter = this.slidingWindowRateLimiter()
const rateLimiter = this.rateLimiter()

const hit = (period: number, rate: number) => rateLimiter.hit(`${client}:message:${period}`, 1, { period, rate })

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { getMasterDbClient, getReadReplicaDbClient } from '../../database/client
import { createSettings } from '../settings-factory'
import { EventRepository } from '../../repositories/event-repository'
import { GetSubmissionCheckController } from '../../controllers/admission/get-admission-check-controller'
import { slidingWindowRateLimiterFactory } from '../rate-limiter-factory'
import { rateLimiterFactory } from '../rate-limiter-factory'
import { UserRepository } from '../../repositories/user-repository'

export const createGetAdmissionCheckController = () => {
Expand All @@ -11,5 +11,5 @@ export const createGetAdmissionCheckController = () => {
const eventRepository = new EventRepository(dbClient, readReplicaDbClient)
const userRepository = new UserRepository(dbClient, eventRepository)

return new GetSubmissionCheckController(userRepository, createSettings, slidingWindowRateLimiterFactory)
return new GetSubmissionCheckController(userRepository, createSettings, rateLimiterFactory)
}
4 changes: 2 additions & 2 deletions src/factories/controllers/post-invoice-controller-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { createSettings } from '../settings-factory'
import { EventRepository } from '../../repositories/event-repository'
import { IController } from '../../@types/controllers'
import { PostInvoiceController } from '../../controllers/invoices/post-invoice-controller'
import { slidingWindowRateLimiterFactory } from '../rate-limiter-factory'
import { rateLimiterFactory } from '../rate-limiter-factory'
import { UserRepository } from '../../repositories/user-repository'

export const createPostInvoiceController = (): IController => {
Expand All @@ -14,5 +14,5 @@ export const createPostInvoiceController = (): IController => {
const userRepository = new UserRepository(dbClient, eventRepository)
const paymentsService = createPaymentsService()

return new PostInvoiceController(userRepository, paymentsService, createSettings, slidingWindowRateLimiterFactory)
return new PostInvoiceController(userRepository, paymentsService, createSettings, rateLimiterFactory)
}
4 changes: 2 additions & 2 deletions src/factories/message-handler-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { EventMessageHandler } from '../handlers/event-message-handler'
import { eventStrategyFactory } from './event-strategy-factory'
import { getCacheClient } from '../cache/client'
import { RedisAdapter } from '../adapters/redis-adapter'
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
import { rateLimiterFactory } from './rate-limiter-factory'
import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler'
import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler'

Expand All @@ -33,9 +33,9 @@ export const messageHandlerFactory =
eventRepository,
userRepository,
createSettings,
slidingWindowRateLimiterFactory,
nip05VerificationRepository,
getCache(),
rateLimiterFactory,
)
}
case MessageType.REQ:
Expand Down
16 changes: 13 additions & 3 deletions src/factories/rate-limiter-factory.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { createSettings } from './settings-factory'
import { EWMARateLimiter } from '../utils/ewma-rate-limiter'
import { getCacheClient } from '../cache/client'
import { ICacheAdapter } from '../@types/adapters'
import { IRateLimiter } from '../@types/utils'
Expand All @@ -6,11 +8,19 @@ import { SlidingWindowRateLimiter } from '../utils/sliding-window-rate-limiter'

let instance: IRateLimiter = undefined

export const slidingWindowRateLimiterFactory = () => {
export const rateLimiterFactory = () => {
if (!instance) {
const cache: ICacheAdapter = new RedisAdapter(getCacheClient())
instance = new SlidingWindowRateLimiter(cache)
const settings = createSettings()
const strategy = settings.limits?.rateLimiter?.strategy ?? 'ewma'

if (strategy === 'sliding_window') {
instance = new SlidingWindowRateLimiter(cache)
} else {
instance = new EWMARateLimiter(cache)
}
}


return instance
}
}
4 changes: 2 additions & 2 deletions src/factories/websocket-adapter-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { IEventRepository, INip05VerificationRepository, IUserRepository } from
import { createSettings } from './settings-factory'
import { IWebSocketServerAdapter } from '../@types/adapters'
import { messageHandlerFactory } from './message-handler-factory'
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
import { rateLimiterFactory } from './rate-limiter-factory'
import { WebSocketAdapter } from '../adapters/web-socket-adapter'

export const webSocketAdapterFactory =
Expand All @@ -20,6 +20,6 @@ export const webSocketAdapterFactory =
request,
webSocketServerAdapter,
messageHandlerFactory(eventRepository, userRepository, nip05VerificationRepository),
slidingWindowRateLimiterFactory,
rateLimiterFactory,
createSettings,
)
4 changes: 2 additions & 2 deletions src/handlers/event-message-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ export class EventMessageHandler implements IMessageHandler {
protected readonly eventRepository: IEventRepository,
protected readonly userRepository: IUserRepository,
private readonly settings: () => Settings,
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
private readonly nip05VerificationRepository: INip05VerificationRepository,
private readonly cache: ICacheAdapter,
private readonly rateLimiter: Factory<IRateLimiter>,
) {}

public async handleMessage(message: IncomingEventMessage): Promise<void> {
Expand Down Expand Up @@ -287,7 +287,7 @@ export class EventMessageHandler implements IMessageHandler {
return false
}

const rateLimiter = this.slidingWindowRateLimiter()
const rateLimiter = this.rateLimiter()

const toString = (input: any | any[]): string => {
return Array.isArray(input) ? `[${input.map(toString)}]` : input.toString()
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/request-handlers/rate-limiter-middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { NextFunction, Request, Response } from 'express'
import { createLogger } from '../../factories/logger-factory'
import { createSettings } from '../../factories/settings-factory'
import { getRemoteAddress } from '../../utils/http'
import { rateLimiterFactory } from '../../factories/rate-limiter-factory'
import { Settings } from '../../@types/settings'
import { slidingWindowRateLimiterFactory } from '../../factories/rate-limiter-factory'

const debug = createLogger('rate-limiter-middleware')

Expand Down Expand Up @@ -34,7 +34,7 @@ export async function isRateLimited(remoteAddress: string, settings: Settings):
return false
}

const rateLimiter = slidingWindowRateLimiterFactory()
const rateLimiter = rateLimiterFactory()

const hit = (period: number, rate: number) =>
rateLimiter.hit(`${remoteAddress}:connection:${period}`, 1, { period: period, rate: rate })
Expand Down
64 changes: 64 additions & 0 deletions src/utils/ewma-rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { IRateLimiter, IRateLimiterOptions } from '../@types/utils'
import { createLogger } from '../factories/logger-factory'
import { ICacheAdapter } from '../@types/adapters'

const debug = createLogger('ewma-rate-limiter')

const rateLimitScript = {
NUMBER_OF_KEYS: 1,
SCRIPT: `
local key = KEYS[1]
local timestamp = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local period = tonumber(ARGV[3])
local R_old = tonumber(redis.call('HGET', key, 'rate')) or 0
local T_old = tonumber(redis.call('HGET', key, 'timestamp')) or timestamp

local deltaT = timestamp - T_old
local lambda = math.log(2) / period
local R_new = R_old * math.exp(-lambda * deltaT) + tonumber(ARGV[4])

redis.call('HSET', key, 'rate', R_new, 'timestamp', timestamp)
redis.call('EXPIRE', key, math.ceil(period / 1000))

if R_new > rate then
return 1
else
return 0
end
`,
}

export const calculateEWMA = (
rOld: number,
deltaT: number,
period: number,
step: number
): number => {
const lambda = Math.log(2) / period
return rOld * Math.exp(-lambda * deltaT) + step
}

export class EWMARateLimiter implements IRateLimiter {
public constructor(
private readonly cache: ICacheAdapter,
) {}

public async hit(
key: string,
step: number,
options: IRateLimiterOptions,
): Promise<boolean> {
const { rate, period } = options

const result = await this.cache.eval(rateLimitScript.SCRIPT,
[key],
[Date.now().toString(), rate.toString(), period.toString(), step.toString()]
)

debug('ewma rate limited on %s bucket: %s', key, result ? 'yes' : 'no')

return result === 1
}

}
7 changes: 7 additions & 0 deletions test/integration/features/rate-limiter/rate-limiter.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Feature: Rate Limiter
@rate-limiter
Scenario: Alice is rate limited when message rate exceeds the limit
Given someone called Alice
And Alice's message rate is already at the limit
When Alice sends a text_note event expecting to be rate limited
Then Alice receives a notice with rate limited
Loading
Loading