From 63b2acb498647a53f414275595ba5aabcdbfa951 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 14 Aug 2024 10:23:27 +0200 Subject: [PATCH 1/3] refactor(core): Simplify Redis client types (no-changelog) --- .../cli/src/services/cache/cache.service.ts | 2 +- .../services/redis/RedisServiceBaseClasses.ts | 19 +++---------------- .../redis/RedisServicePubSubPublisher.ts | 2 +- .../redis/RedisServicePubSubSubscriber.ts | 2 +- .../services/redis/redis-client.service.ts | 7 ++++--- .../cli/src/services/redis/redis.types.ts | 19 +++++++++++++++++++ 6 files changed, 29 insertions(+), 22 deletions(-) create mode 100644 packages/cli/src/services/redis/redis.types.ts diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index daf51911ff362..eaa58c32f0887 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -45,7 +45,7 @@ export class CacheService extends TypedEmitter { ); const redisClient = redisClientService.createClient({ - type: 'client(cache)', + type: 'cache(n8n)', extraOptions: { keyPrefix: prefix }, }); diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index ba2cc41a91dce..032497a82022b 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -4,20 +4,7 @@ import { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/Logger'; import { RedisClientService } from './redis-client.service'; - -export type RedisClientType = - | 'subscriber' - | 'client' - | 'bclient' - | 'subscriber(bull)' - | 'client(bull)' - | 'bclient(bull)' - | 'client(cache)' - | 'publisher' - | 'consumer' - | 'producer' - | 'list-sender' - | 'list-receiver'; +import type { RedisClient } from './redis.types'; export type RedisServiceMessageHandler = | ((channel: string, message: string) => void) @@ -34,7 +21,7 @@ class RedisServiceBase { private readonly redisClientService: RedisClientService, ) {} - async init(type: RedisClientType = 'client'): Promise { + async init(type: RedisClient): Promise { if (this.redisClient && this.isInitialized) { return; } @@ -60,7 +47,7 @@ class RedisServiceBase { export abstract class RedisServiceBaseSender extends RedisServiceBase { senderId: string; - async init(type: RedisClientType = 'client'): Promise { + async init(type: RedisClient): Promise { await super.init(type); this.senderId = config.get('redis.queueModeId'); } diff --git a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts index ff810ba7c6523..23ca9a5b6935e 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts @@ -9,7 +9,7 @@ import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; @Service() export class RedisServicePubSubPublisher extends RedisServiceBaseSender { async init(): Promise { - await super.init('publisher'); + await super.init('publisher(n8n)'); } async publish(channel: string, message: string): Promise { diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index 87518264283b2..144647009f16b 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -5,7 +5,7 @@ import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; @Service() export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { async init(): Promise { - await super.init('subscriber'); + await super.init('subscriber(n8n)'); this.redisClient?.on('message', (channel: string, message: string) => { this.messageHandlers.forEach((handler: (channel: string, message: string) => void) => diff --git a/packages/cli/src/services/redis/redis-client.service.ts b/packages/cli/src/services/redis/redis-client.service.ts index b5c86523e006a..06e102acac92a 100644 --- a/packages/cli/src/services/redis/redis-client.service.ts +++ b/packages/cli/src/services/redis/redis-client.service.ts @@ -2,7 +2,8 @@ import { Service } from 'typedi'; import { Logger } from '@/Logger'; import ioRedis from 'ioredis'; import type { Cluster, RedisOptions } from 'ioredis'; -import type { RedisClientType } from './RedisServiceBaseClasses'; +import type { RedisClient } from './redis.types'; + import { OnShutdown } from '@/decorators/OnShutdown'; import { LOWEST_SHUTDOWN_PRIORITY } from '@/constants'; import { GlobalConfig } from '@n8n/config'; @@ -16,7 +17,7 @@ export class RedisClientService { private readonly globalConfig: GlobalConfig, ) {} - createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) { + createClient(arg: { type: RedisClient; extraOptions?: RedisOptions }) { const client = this.clusterNodes().length > 0 ? this.createClusterClient(arg) @@ -55,7 +56,7 @@ export class RedisClientService { type, extraOptions, }: { - type: RedisClientType; + type: RedisClient; extraOptions?: RedisOptions; }) { const options = this.getOptions({ extraOptions }); diff --git a/packages/cli/src/services/redis/redis.types.ts b/packages/cli/src/services/redis/redis.types.ts new file mode 100644 index 0000000000000..9db6664f9f455 --- /dev/null +++ b/packages/cli/src/services/redis/redis.types.ts @@ -0,0 +1,19 @@ +export type RedisClient = n8nRedisClient | BullRedisClient; + +/** + * Redis client used by n8n. + * + * - `subscriber(n8n)` to listen for messages from scaling mode communication channels + * - `publisher(n8n)` to send messages into scaling mode communication channels + * - `cache(n8n)` for caching operations (variables, resource ownership, etc.) + */ +type n8nRedisClient = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)'; + +/** + * Redis client used internally by Bull. Suffixed with `(bull)` at `ScalingService.setupQueue`. + * + * - `subscriber(bull)` for event listening + * - `client(bull)` for general queue operations + * - `bclient(bull)` for blocking operations when processing jobs + */ +type BullRedisClient = 'subscriber(bull)' | 'client(bull)' | 'bclient(bull)'; From 66e48e99d5b47b6809f91b519ccbebb72d56c03d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 15 Aug 2024 10:04:55 +0200 Subject: [PATCH 2/3] Suffix with `-Type` --- packages/cli/src/services/redis/RedisServiceBaseClasses.ts | 6 +++--- packages/cli/src/services/redis/redis-client.service.ts | 6 +++--- packages/cli/src/services/redis/redis.types.ts | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index ec1774c5023a7..b9db6125b043c 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -4,7 +4,7 @@ import { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/Logger'; import { RedisClientService } from './redis-client.service'; -import type { RedisClient } from './redis.types'; +import type { RedisClientType } from './redis.types'; export type RedisServiceMessageHandler = | ((channel: string, message: string) => void) @@ -21,7 +21,7 @@ class RedisServiceBase { private readonly redisClientService: RedisClientService, ) {} - async init(type: RedisClient): Promise { + async init(type: RedisClientType): Promise { if (this.redisClient && this.isInitialized) { return; } @@ -49,7 +49,7 @@ class RedisServiceBase { export abstract class RedisServiceBaseSender extends RedisServiceBase { senderId: string; - async init(type: RedisClient): Promise { + async init(type: RedisClientType): Promise { await super.init(type); this.senderId = config.get('redis.queueModeId'); } diff --git a/packages/cli/src/services/redis/redis-client.service.ts b/packages/cli/src/services/redis/redis-client.service.ts index 06e102acac92a..21f74bf074d33 100644 --- a/packages/cli/src/services/redis/redis-client.service.ts +++ b/packages/cli/src/services/redis/redis-client.service.ts @@ -2,7 +2,7 @@ import { Service } from 'typedi'; import { Logger } from '@/Logger'; import ioRedis from 'ioredis'; import type { Cluster, RedisOptions } from 'ioredis'; -import type { RedisClient } from './redis.types'; +import type { RedisClientType } from './redis.types'; import { OnShutdown } from '@/decorators/OnShutdown'; import { LOWEST_SHUTDOWN_PRIORITY } from '@/constants'; @@ -17,7 +17,7 @@ export class RedisClientService { private readonly globalConfig: GlobalConfig, ) {} - createClient(arg: { type: RedisClient; extraOptions?: RedisOptions }) { + createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) { const client = this.clusterNodes().length > 0 ? this.createClusterClient(arg) @@ -56,7 +56,7 @@ export class RedisClientService { type, extraOptions, }: { - type: RedisClient; + type: RedisClientType; extraOptions?: RedisOptions; }) { const options = this.getOptions({ extraOptions }); diff --git a/packages/cli/src/services/redis/redis.types.ts b/packages/cli/src/services/redis/redis.types.ts index 9db6664f9f455..4e7aeee08c988 100644 --- a/packages/cli/src/services/redis/redis.types.ts +++ b/packages/cli/src/services/redis/redis.types.ts @@ -1,4 +1,4 @@ -export type RedisClient = n8nRedisClient | BullRedisClient; +export type RedisClientType = n8nRedisClientType | BullRedisClientType; /** * Redis client used by n8n. @@ -7,7 +7,7 @@ export type RedisClient = n8nRedisClient | BullRedisClient; * - `publisher(n8n)` to send messages into scaling mode communication channels * - `cache(n8n)` for caching operations (variables, resource ownership, etc.) */ -type n8nRedisClient = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)'; +type n8nRedisClientType = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)'; /** * Redis client used internally by Bull. Suffixed with `(bull)` at `ScalingService.setupQueue`. @@ -16,4 +16,4 @@ type n8nRedisClient = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)'; * - `client(bull)` for general queue operations * - `bclient(bull)` for blocking operations when processing jobs */ -type BullRedisClient = 'subscriber(bull)' | 'client(bull)' | 'bclient(bull)'; +type BullRedisClientType = 'subscriber(bull)' | 'client(bull)' | 'bclient(bull)'; From 092f52e82b92fdb5c141da1133841fa0678ece61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 15 Aug 2024 10:06:25 +0200 Subject: [PATCH 3/3] Capitalize `N8nRedisClientType` --- packages/cli/src/services/redis/redis.types.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/services/redis/redis.types.ts b/packages/cli/src/services/redis/redis.types.ts index 4e7aeee08c988..ed694904d7b11 100644 --- a/packages/cli/src/services/redis/redis.types.ts +++ b/packages/cli/src/services/redis/redis.types.ts @@ -1,4 +1,4 @@ -export type RedisClientType = n8nRedisClientType | BullRedisClientType; +export type RedisClientType = N8nRedisClientType | BullRedisClientType; /** * Redis client used by n8n. @@ -7,7 +7,7 @@ export type RedisClientType = n8nRedisClientType | BullRedisClientType; * - `publisher(n8n)` to send messages into scaling mode communication channels * - `cache(n8n)` for caching operations (variables, resource ownership, etc.) */ -type n8nRedisClientType = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)'; +type N8nRedisClientType = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)'; /** * Redis client used internally by Bull. Suffixed with `(bull)` at `ScalingService.setupQueue`.