Skip to content

Commit

Permalink
fix: sessions unable to start after cleaned up
Browse files Browse the repository at this point in the history
This fixes an issue where if concat sessions were cleaned up (their
processes) reconnecting from a client would always fail. This was
because the session manager was never informed that the underlying
session was terminated. The solution here is to actually let the session
manager do its job and be in charge of all session cleanup.
  • Loading branch information
chrisbenincasa committed Sep 12, 2024
1 parent 2b0a30b commit b84df2f
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 90 deletions.
8 changes: 2 additions & 6 deletions server/src/api/hlsApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ export const hlsApi: RouterPluginAsyncCallback = async (fastify) => {
ip: req.ip,
userAgent: req.headers['user-agent'],
},
{
sessionType: 'hls',
},
{},
);

if (session.isFailure()) {
Expand Down Expand Up @@ -158,9 +156,7 @@ export const hlsApi: RouterPluginAsyncCallback = async (fastify) => {
ip: req.ip,
userAgent: req.headers['user-agent'],
},
{
sessionType: 'hls',
},
{},
);

if (session.isFailure()) {
Expand Down
15 changes: 4 additions & 11 deletions server/src/api/videoApi.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import { ChannelSessionsResponseSchema } from '@tunarr/types/api';
import dayjs from 'dayjs';
import {
forEach,
isEmpty,
isNil,
isNull,
isNumber,
isUndefined,
map,
} from 'lodash-es';
import { isEmpty, isNil, isNull, isNumber, isUndefined, map } from 'lodash-es';
import * as fsSync from 'node:fs';
import { PassThrough } from 'node:stream';
import { Readable } from 'stream';
Expand Down Expand Up @@ -212,7 +204,9 @@ export const videoRouter: RouterPluginAsyncCallback = async (fastify) => {
return res.status(404).send('No session found for channel ID');
}

forEach(sessions, (session) => session.stop());
for (const session of sessions) {
await req.serverCtx.sessionManager.endSession(session);
}

return res.status(201).send();
},
Expand Down Expand Up @@ -242,7 +236,6 @@ export const videoRouter: RouterPluginAsyncCallback = async (fastify) => {
},
},
async (req, res) => {
// TODO Make this a settings opt-in for experimental behavior
const channel = await req.serverCtx.channelDB.getChannel(req.params.id);
if (isNil(channel)) {
return res.status(404).send();
Expand Down
2 changes: 1 addition & 1 deletion server/src/stream/ActiveChannelManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ActiveChannelManagerImpl {
details: ChannelConnectionDetails,
) {
if (!this.#channelTrackers[channelId]) {
this.#channelTrackers[channelId] = new ConnectionTracker();
this.#channelTrackers[channelId] = new ConnectionTracker(channelId);
}

this.#channelTrackers[channelId].addConnection(token, details);
Expand Down
5 changes: 5 additions & 0 deletions server/src/stream/ConcatSession.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isEmpty } from 'lodash-es';
import { Channel } from '../dao/direct/derivedTypes.js';
import { VideoStreamResult } from '../ffmpeg/FfmpegOutputStream.js';
import { ConcatOptions } from '../ffmpeg/ffmpeg.js';
Expand All @@ -20,6 +21,10 @@ export class ConcatSession extends StreamSession<ConcatSessionOptions> {
return new ConcatSession(channel, options);
}

isStale(): boolean {
return isEmpty(this.connections());
}

protected async initializeStream(): Promise<VideoStreamResult> {
this.#stream = await new ConcatStream(this.sessionOptions).startStream(
this.channel.uuid,
Expand Down
22 changes: 15 additions & 7 deletions server/src/stream/ConnectionTracker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { isEmpty, isUndefined, keys } from 'lodash-es';
import { Logger, LoggerFactory } from '../util/logging/LoggerFactory';
import events from 'events';
import { isEmpty, isUndefined, keys } from 'lodash-es';
import { TypedEventEmitter } from '../types/eventEmitter';
import { Logger, LoggerFactory } from '../util/logging/LoggerFactory';

type ConnectionTrackerEvents = {
cleanup: () => void;
Expand All @@ -15,6 +15,10 @@ export class ConnectionTracker<
#connections: Record<string, ConnectionDetails> = {};
#heartbeats: Record<string, number> = {};

constructor(private id: string) {
super();
}

addConnection(token: string, connection: ConnectionDetails) {
this.#connections[token] = { ...connection };
this.#heartbeats[token] = new Date().getTime();
Expand Down Expand Up @@ -50,18 +54,22 @@ export class ConnectionTracker<

scheduleCleanup(delay: number) {
if (this.#cleanupFunc) {
this.#logger.debug('Cleanup already scheduled');
this.#logger.debug('Cleanup already scheduled (id=%s)', this.id);
// We already scheduled shutdown
return;
return false;
}
this.#logger.debug('Scheduling session shutdown');
this.#logger.debug('Scheduling session shutdown (id=%s)', this.id);
this.#cleanupFunc = setTimeout(() => {
this.#logger.debug('Shutting down session');
this.#logger.debug('Shutting down connection tracker (id=%s)', this.id);
if (isEmpty(this.#connections)) {
this.emit('cleanup');
} else {
this.#logger.debug(`Got new connections: %O`, this.#connections);
this.#logger.debug(
`Aborting shutdown. Got new connections in grace period: %O`,
this.#connections,
);
}
}, delay);
return true;
}
}
10 changes: 7 additions & 3 deletions server/src/stream/HlsSession.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import retry from 'async-retry';
import dayjs from 'dayjs';
import { filter, isError, isString, some } from 'lodash-es';
import { filter, isEmpty, isError, isString, some } from 'lodash-es';
import fs from 'node:fs/promises';
import { basename, extname, join, resolve } from 'node:path';
import { StrictOmit } from 'ts-essentials';
Expand Down Expand Up @@ -117,6 +117,11 @@ export class HlsSession extends StreamSession<HlsSessionOptions> {
});
}

isStale(): boolean {
const remainingConnections = this.removeStaleConnections();
return isEmpty(remainingConnections);
}

protected async initializeStream() {
this.logger.debug(`Creating stream directory: ${this.#outPath}`);

Expand Down Expand Up @@ -212,9 +217,8 @@ export class HlsSession extends StreamSession<HlsSessionOptions> {

private async cleanupDirectory() {
try {
return await fs.rm(this.#outPath, {
return await fs.rmdir(this.#outPath, {
recursive: true,
force: true,
});
} catch (err) {
return this.logger.error(
Expand Down
40 changes: 38 additions & 2 deletions server/src/stream/SessionManager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { compact, isError, isNil, isString } from 'lodash-es';
import { compact, isError, isNil, isString, isUndefined } from 'lodash-es';
import { ChannelDB } from '../dao/channelDb.js';
import { Channel } from '../dao/direct/derivedTypes.js';
import { Result } from '../types/result.js';
import { Maybe } from '../types/util.js';
import { LoggerFactory } from '../util/logging/LoggerFactory.js';
import { MutexMap } from '../util/mutexMap.js';
import { ConcatSession, ConcatSessionOptions } from './ConcatSession.js';
import { HlsSession, HlsSessionOptions } from './HlsSession.js';
Expand Down Expand Up @@ -55,6 +56,7 @@ class ChannelNotFoundError extends TypedError {
}

export class SessionManager {
#logger = LoggerFactory.child({ className: this.constructor.name });
#sessionLocker = new MutexMap();
#sessions: Record<SessionKey, StreamSession> = {};

Expand Down Expand Up @@ -88,17 +90,48 @@ export class SessionManager {
return this.#sessions[sessionCacheKey(id, sessionType)];
}

async endSession(id: string, sessionType: SessionType) {
async endSession(session: StreamSession): Promise<void>;
async endSession(
idOrSession: string | StreamSession,
maybeSessionType?: SessionType,
): Promise<void> {
let id: string;
let sessionType: SessionType;
if (idOrSession instanceof StreamSession) {
id = idOrSession.keyObj.id;
sessionType = idOrSession.keyObj.sessionType;
} else {
if (isUndefined(maybeSessionType)) {
throw new Error('Must pass session type if ending stream by ID');
} else {
id = idOrSession;
sessionType = maybeSessionType;
}
}

const lock = await this.#sessionLocker.getOrCreateLock(id);
return await lock.runExclusive(() => {
const session = this.getSession(id, sessionType);
if (isNil(session)) {
return;
}
session.stop();
delete this.#sessions[sessionCacheKey(id, sessionType)];
});
}

cleanupStaleSessions() {
for (const session of Object.values(this.#sessions)) {
if (session.isStale() && session.scheduleCleanup()) {
this.#logger.debug(
'Scheduled cleanup on session (type=%s, id=%s)',
session.sessionType,
session.id,
);
}
}
}

async getOrCreateConcatSession(
channelId: string,
token: string,
Expand Down Expand Up @@ -156,6 +189,9 @@ export class SessionManager {
let session = this.getSession(channelId, sessionType) as Maybe<Session>;
if (isNil(session)) {
session = sessionFactory(channel);
session.on('cleanup', () => {
delete this.#sessions[sessionCacheKey(channelId, sessionType)];
});
this.addSession(channel.uuid, session.sessionType, session);
}

Expand Down
Loading

0 comments on commit b84df2f

Please sign in to comment.