import { Injectable, Logger, NotFoundException, Inject, forwardRef, } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, DeepPartial } from 'typeorm'; import { ConfigService } from '@nestjs/config'; import { ChatOpenAI } from '@langchain/openai'; import { HumanMessage, BaseMessage, AIMessage, SystemMessage, } from '@langchain/core/messages'; import { Observable, from, map, mergeMap, concatMap } from 'rxjs'; import { AssessmentSession, AssessmentStatus, } from './entities/assessment-session.entity'; import { AssessmentQuestion } from './entities/assessment-question.entity'; import { AssessmentAnswer } from './entities/assessment-answer.entity'; import { AssessmentTemplate } from './entities/assessment-template.entity'; import { KnowledgeBaseService } from '../knowledge-base/knowledge-base.service'; import { KnowledgeGroupService } from '../knowledge-group/knowledge-group.service'; import { ModelConfigService } from '../model-config/model-config.service'; import { ModelType } from '../types'; import { ElasticsearchService } from '../elasticsearch/elasticsearch.service'; import { RagService } from '../rag/rag.service'; import { ChatService } from '../chat/chat.service'; import { createEvaluationGraph } from './graph/builder'; import { EvaluationState } from './graph/state'; import { TemplateService } from './services/template.service'; import { ContentFilterService } from './services/content-filter.service'; import { I18nService } from '../i18n/i18n.service'; import { TenantService } from '../tenant/tenant.service'; @Injectable() export class AssessmentService { private readonly logger = new Logger(AssessmentService.name); private readonly graph = createEvaluationGraph(); constructor( @InjectRepository(AssessmentSession) private sessionRepository: Repository, @InjectRepository(AssessmentQuestion) private questionRepository: Repository, @InjectRepository(AssessmentAnswer) private answerRepository: Repository, @Inject(forwardRef(() => KnowledgeBaseService)) private kbService: KnowledgeBaseService, @Inject(forwardRef(() => KnowledgeGroupService)) private groupService: KnowledgeGroupService, @Inject(forwardRef(() => ModelConfigService)) private modelConfigService: ModelConfigService, private configService: ConfigService, private templateService: TemplateService, private contentFilterService: ContentFilterService, private ragService: RagService, @Inject(forwardRef(() => ChatService)) private chatService: ChatService, private i18nService: I18nService, private tenantService: TenantService, ) {} private async getModel(tenantId: string): Promise { const config = await this.modelConfigService.findDefaultByType( tenantId, ModelType.LLM, ); return new ChatOpenAI({ apiKey: config.apiKey || 'ollama', modelName: config.modelId, temperature: 0.7, configuration: { baseURL: config.baseUrl || 'http://localhost:11434/v1', }, }); } /** * Starts a new assessment session. */ private async getSessionContent(session: { knowledgeBaseId?: string | null; knowledgeGroupId?: string | null; userId: string; tenantId: string; templateJson?: any; }): Promise { const kbId = session.knowledgeBaseId || session.knowledgeGroupId; this.logger.log(`[getSessionContent] Starting for KB/Group ID: ${kbId}`); if (!kbId) { this.logger.warn(`[getSessionContent] No KB/Group ID provided`); return ''; } const keywords = session.templateJson?.keywords || []; // If keywords are provided, use RagService (Hybrid Search) to find relevant content if (keywords.length > 0) { this.logger.log( `[getSessionContent] Keywords detected, performing hybrid search via RagService: ${keywords.join(', ')}`, ); try { // 1. Determine file IDs to include in search let fileIds: string[] = []; if (session.knowledgeBaseId) { fileIds = [session.knowledgeBaseId]; } else if (session.knowledgeGroupId) { fileIds = await this.groupService.getFileIdsByGroups( [session.knowledgeGroupId], session.userId, session.tenantId, ); } if (fileIds.length > 0) { const query = keywords.join(' '); this.logger.log( `[getSessionContent] Performing high-fidelity grounded search (streamChat-style). Keywords: "${query}"`, ); // 1. Get default embedding model (strict logic from streamChat) const embeddingModel = await this.modelConfigService.findDefaultByType( session.tenantId || 'default', ModelType.EMBEDDING, ); // 2. Perform advanced RAG search const ragResults = await this.ragService.searchKnowledge( query, session.userId, 20, // Increased topK to 20 for broader question coverage 0.1, // Lenient similarityThreshold (Chat/Rag defaults are 0.3) embeddingModel?.id, true, // enableFullTextSearch true, // enableRerank undefined, // selectedRerankId undefined, // selectedGroups fileIds, 0.3, // Lenient rerankSimilarityThreshold (Chat/Rag defaults are 0.5) session.tenantId, ); // 3. Format context using localized labels (equivalent to buildContext) const language = session.templateJson?.language || 'zh'; const searchContent = ragResults .map((result, index) => { // this.logger.debug(`[getSessionContent] Found chunk [${index + 1}]: score=${result.score.toFixed(4)}, file=${result.fileName}, contentPreview=${result.content}...`); return `[${index + 1}] ${this.i18nService.getMessage('file', language)}:${result.fileName}\n${this.i18nService.getMessage('content', language)}:${result.content}\n`; }) .join('\n'); if (searchContent && searchContent.trim().length > 0) { this.logger.log( `[getSessionContent] SUCCESS: Found ${ragResults.length} relevant chunks. Total length: ${searchContent.length}`, ); // this.logger.log(`[getSessionContent] --- AI Context Start ---\n${searchContent}\n[getSessionContent] --- AI Context End ---`); return searchContent; } else { this.logger.warn( `[getSessionContent] High-fidelity search returned no results for query: "${query}".`, ); } } else { this.logger.warn( `[getSessionContent] No files found for search scope (KB: ${session.knowledgeBaseId}, Group: ${session.knowledgeGroupId})`, ); } } catch (err) { this.logger.error( `[getSessionContent] Grounded search failed unexpectedly: ${err.message}`, err.stack, ); } this.logger.warn( `[getSessionContent] Grounded search failed or returned nothing. One common reason is that the keywords are not present in the indexed documents.`, ); } // Fallback or No Keywords: Original behavior (full content retrieval) let content = ''; if (session.knowledgeBaseId) { this.logger.debug( `[getSessionContent] Fetching content for KnowledgeBase: ${kbId}`, ); const kb = await (this.kbService as any).kbRepository.findOne({ where: { id: kbId, tenantId: session.tenantId }, }); if (kb) { content = kb.content || ''; this.logger.debug( `[getSessionContent] Found KB content, length: ${content.length}`, ); } else { this.logger.warn( `[getSessionContent] KnowledgeBase not found: ${kbId}`, ); } } else { try { this.logger.debug( `[getSessionContent] Fetching content for KnowledgeGroup: ${kbId}`, ); const groupFiles = await this.groupService.getGroupFiles( kbId, session.userId, session.tenantId, ); this.logger.debug( `[getSessionContent] Found ${groupFiles.length} files in group`, ); content = groupFiles .filter((f) => f.content) .map((f) => { this.logger.debug( `[getSessionContent] Including file: ${f.title || f.originalName}, content length: ${f.content?.length || 0}`, ); return `--- Document: ${f.title || f.originalName} ---\n${f.content}`; }) .join('\n\n'); this.logger.debug( `[getSessionContent] Total group content length: ${content.length}`, ); } catch (err) { this.logger.error( `[getSessionContent] Failed to get group files: ${err.message}`, ); } } // Apply keyword filter (regex based) as an extra layer if still using full content if (content && keywords.length > 0) { this.logger.debug( `[getSessionContent] Applying fallback keyword filters: ${keywords.join(', ')}`, ); const prevLen = content.length; content = this.contentFilterService.filterContent(content, keywords); this.logger.debug( `[getSessionContent] After filtering, content length: ${content.length} (was ${prevLen})`, ); } this.logger.log( `[getSessionContent] Final content for AI generation (Length: ${content.length})`, ); this.logger.debug( `[getSessionContent] Content Preview: ${content.substring(0, 500)}...`, ); return content; } /** * Starts a new assessment session. * kbId can be a KnowledgeBase ID or a KnowledgeGroup ID. */ async startSession( userId: string, kbId: string | undefined, tenantId: string, language: string = 'en', templateId?: string, ): Promise { this.logger.log( `[startSession] Starting session for user ${userId}, templateId: ${templateId}, kbId: ${kbId}`, ); let template: AssessmentTemplate | null = null; if (templateId) { template = await this.templateService.findOne( templateId, userId, tenantId, ); this.logger.debug( `[startSession] Found template: ${template?.name}, linked group: ${template?.knowledgeGroupId}`, ); } // Use kbId if provided, otherwise fall back to template's group ID const activeKbId = kbId || template?.knowledgeGroupId; this.logger.log(`[startSession] activeKbId resolved to: ${activeKbId}`); if (!activeKbId) { this.logger.error(`[startSession] No knowledge source resolved`); throw new Error('Knowledge source (ID or Template) must be provided.'); } // Try to determine if it's a KB or Group and check permissions let isKb = false; try { await this.kbService.findOne(activeKbId, userId, tenantId); isKb = true; } catch (kbError) { if (kbError instanceof NotFoundException) { // Try finding it as a Group try { await this.groupService.findOne(activeKbId, userId, tenantId); } catch (groupError) { this.logger.error( `[startSession] Knowledge source ${activeKbId} not found as KB or Group`, ); throw new NotFoundException( this.i18nService.getMessage('knowledgeSourceNotFound') || 'Knowledge source not found', ); } } else { throw kbError; // e.g. ForbiddenException } } this.logger.debug(`[startSession] isKb: ${isKb}`); const templateData = template ? { name: template.name, keywords: template.keywords, questionCount: template.questionCount, difficultyDistribution: template.difficultyDistribution, style: template.style, } : undefined; const sessionData: any = { userId, tenantId, knowledgeBaseId: isKb ? activeKbId : undefined, knowledgeGroupId: isKb ? undefined : activeKbId, templateId, templateJson: templateData, status: AssessmentStatus.IN_PROGRESS, language, }; const content = await this.getSessionContent(sessionData); if (!content || content.trim().length < 10) { this.logger.error( `[startSession] Insufficient content length: ${content?.length || 0}`, ); throw new Error( 'Selected knowledge source has no sufficient content for evaluation.', ); } const session = this.sessionRepository.create( sessionData as DeepPartial, ); const savedSession = (await this.sessionRepository.save( session as any, )) as AssessmentSession; // Thread ID for LangGraph is the session ID savedSession.threadId = savedSession.id; await this.sessionRepository.save(savedSession); this.logger.log( `[startSession] Session ${savedSession.id} created and saved`, ); return savedSession; } /** * Specialized streaming start for initial generation. */ startSessionStream(sessionId: string, userId: string): Observable { return new Observable((observer) => { (async () => { try { const session = await this.sessionRepository.findOne({ where: { id: sessionId, userId }, }); if (!session) { observer.error(new NotFoundException('Session not found')); return; } const model = await this.getModel(session.tenantId); const content = await this.getSessionContent(session); // Check if we already have state const existingState = await this.graph.getState({ configurable: { thread_id: sessionId }, }); if ( existingState && existingState.values && existingState.values.questions?.length > 0 ) { this.logger.log( `Session ${sessionId} already has state, skipping generation.`, ); const mappedData = { ...existingState.values }; mappedData.messages = this.mapMessages(mappedData.messages || []); mappedData.feedbackHistory = this.mapMessages( mappedData.feedbackHistory || [], ); observer.next({ type: 'final', data: mappedData }); observer.complete(); return; } const initialState: Partial = { assessmentSessionId: sessionId, knowledgeBaseId: session.knowledgeBaseId || session.knowledgeGroupId || '', messages: [], questionCount: session.templateJson?.questionCount, difficultyDistribution: session.templateJson?.difficultyDistribution, style: session.templateJson?.style, keywords: session.templateJson?.keywords, }; const isZh = (session.language || 'en') === 'zh'; const isJa = session.language === 'ja'; const initialMsg = isZh ? '现在生成评估问题。请务必使用中文。' : isJa ? '今すぐアセスメント問題を生成してください。必ず日本語で回答してください。' : 'Generate the assessment questions now. Please strictly respond in English.'; this.logger.log( `[startSessionStream] Starting stream for session ${sessionId}`, ); const stream = await this.graph.stream( { ...initialState, language: session.language || 'en', // Ensure language is passed in initial state messages: [new HumanMessage(initialMsg)], }, { configurable: { thread_id: sessionId, model, knowledgeBaseContent: content, language: session.language || 'en', questionCount: session.templateJson?.questionCount, difficultyDistribution: session.templateJson?.difficultyDistribution, style: session.templateJson?.style, keywords: session.templateJson?.keywords, }, streamMode: ['values', 'updates'], }, ); this.logger.debug(`[startSessionStream] Graph stream started`); for await (const [mode, data] of stream) { if (mode === 'updates') { const node = Object.keys(data)[0]; const updateData = { ...data[node] }; if (updateData.messages) { updateData.messages = this.mapMessages(updateData.messages); } if (updateData.feedbackHistory) { updateData.feedbackHistory = this.mapMessages( updateData.feedbackHistory, ); } observer.next({ type: 'node', node, data: updateData }); } } // After stream, get the latest authoritative state from checkpointer const fullState = await this.graph.getState({ configurable: { thread_id: sessionId }, }); const finalData = fullState.values as EvaluationState; if (finalData && finalData.messages) { console.log( `[AssessmentService] startSessionStream Final Authoritative State messages:`, finalData.messages.length, ); session.messages = finalData.messages; session.feedbackHistory = finalData.feedbackHistory || []; session.questions_json = finalData.questions; session.currentQuestionIndex = finalData.currentQuestionIndex; session.followUpCount = finalData.followUpCount || 0; if (finalData.report) { session.status = AssessmentStatus.COMPLETED; session.finalReport = finalData.report; const scores = finalData.scores; const questions = finalData.questions || []; if (questions.length > 0 && Object.keys(scores).length > 0) { let totalWeightedScore = 0; let totalWeight = 0; questions.forEach((q: any, idx: number) => { const score = scores[q.id || idx.toString()]; if (score !== undefined) { const weight = q.difficulty === 'Specialist' ? 2.0 : q.difficulty === 'Advanced' ? 1.5 : 1.0; totalWeightedScore += score * weight; totalWeight += weight; } }); session.finalScore = totalWeight > 0 ? totalWeightedScore / totalWeight : 0; } } await this.sessionRepository.save(session); const mappedData: any = { ...finalData }; mappedData.messages = this.mapMessages(finalData.messages); mappedData.feedbackHistory = this.mapMessages( finalData.feedbackHistory || [], ); mappedData.status = session.status; mappedData.report = session.finalReport; mappedData.finalScore = session.finalScore; observer.next({ type: 'final', data: mappedData }); } observer.complete(); } catch (err) { observer.error(err); } })(); }); } /** * Submits a user's answer and continues the assessment. */ async submitAnswer( sessionId: string, userId: string, answer: string, language: string = 'en', ): Promise { const session = await this.sessionRepository.findOne({ where: { id: sessionId, userId }, relations: ['template'], }); if (!session) throw new NotFoundException('Session not found'); const model = await this.getModel(session.tenantId); await this.ensureGraphState(sessionId, session); const content = await this.getSessionContent(session); // Update state with human message first to ensure it's in history before resumption await this.graph.updateState( { configurable: { thread_id: sessionId } }, { messages: [new HumanMessage(answer)] }, ); this.logger.debug(`[submitAnswer] Resuming graph for session ${sessionId}`); let finalResult: any = null; // Resume from the last interrupt (typically after interviewer) const stream = await this.graph.stream(null, { configurable: { thread_id: sessionId, model, knowledgeBaseContent: content, language: session.language || language, questionCount: session.templateJson?.questionCount, difficultyDistribution: session.templateJson?.difficultyDistribution, style: session.templateJson?.style, keywords: session.templateJson?.keywords, }, streamMode: ['values', 'updates'], }); for await (const [mode, data] of stream) { if (mode === 'values') { // This might be the interrupt info if interrupted finalResult = data; } else if (mode === 'updates') { const nodeName = Object.keys(data)[0]; this.logger.debug(`[submitAnswer] Node completed: ${nodeName}`); } } // Always get the latest authoritative state from checkpointer after the stream const fullState = await this.graph.getState({ configurable: { thread_id: sessionId }, }); finalResult = fullState.values as EvaluationState; this.logger.log( `[submitAnswer] Stream finished. State Index: ${finalResult.currentQuestionIndex}, Questions: ${finalResult.questions?.length}, HasReport: ${!!finalResult.report}`, ); if (finalResult && (finalResult.messages || finalResult.questions)) { session.messages = finalResult.messages; session.questions_json = finalResult.questions; session.currentQuestionIndex = finalResult.currentQuestionIndex; session.followUpCount = finalResult.followUpCount || 0; if (finalResult.report) { session.status = AssessmentStatus.COMPLETED; session.finalReport = finalResult.report; const scores = finalResult.scores as Record; const questions = finalResult.questions || []; if (questions.length > 0 && Object.keys(scores).length > 0) { let totalWeightedScore = 0; let totalWeight = 0; questions.forEach((q: any, idx: number) => { const score = scores[q.id || idx.toString()]; if (score !== undefined) { const weight = q.difficulty === 'Specialist' ? 2.0 : q.difficulty === 'Advanced' ? 1.5 : 1.0; totalWeightedScore += score * weight; totalWeight += weight; } }); session.finalScore = totalWeight > 0 ? totalWeightedScore / totalWeight : 0; } } session.feedbackHistory = finalResult.feedbackHistory || []; await this.sessionRepository.save(session); // Map result for return finalResult.messages = this.mapMessages(finalResult.messages); finalResult.feedbackHistory = this.mapMessages( finalResult.feedbackHistory || [], ); finalResult.report = session.finalReport; finalResult.finalScore = session.finalScore; this.logger.log( `[submitAnswer] session saved. DB Status: ${session.status}, Index: ${session.currentQuestionIndex}`, ); this.logger.log( `[submitAnswer] finalResult check: hasQuestions=${!!finalResult.questions}, questionsLen=${finalResult.questions?.length}, hasReport=${!!finalResult.report}`, ); this.logger.debug( `[submitAnswer] finalResult keys: ${Object.keys(finalResult).join(', ')}`, ); this.logger.log( `[submitAnswer] session updated: status=${session.status}, index=${session.currentQuestionIndex}`, ); } else { this.logger.warn( `[submitAnswer] finalResult has no usable data! Keys: ${Object.keys(finalResult || {}).join(', ')}`, ); } return finalResult; } /** * Streaming version of submitAnswer. */ submitAnswerStream( sessionId: string, userId: string, answer: string, language: string = 'en', ): Observable { return new Observable((observer) => { (async () => { try { const session = await this.sessionRepository.findOne({ where: { id: sessionId, userId }, }); if (!session) { observer.error(new NotFoundException('Session not found')); return; } const model = await this.getModel(session.tenantId); const content = await this.getSessionContent(session); await this.ensureGraphState(sessionId, session); const graphState = await this.graph.getState({ configurable: { thread_id: sessionId }, }); const hasState = graphState && graphState.values && Object.keys(graphState.values).length > 0; console.log( `[AssessmentService] submitAnswerStream: sessionId=${sessionId}, hasState=${hasState}, nextNodes=[${graphState.next || ''}]`, ); // Update state with human message first to ensure it's in history await this.graph.updateState( { configurable: { thread_id: sessionId } }, { messages: [new HumanMessage(answer)] }, ); // Resume from the last interrupt const stream = await this.graph.stream(null, { configurable: { thread_id: sessionId, model, knowledgeBaseContent: content, language: session.language || language, }, streamMode: ['values', 'updates'], }); for await (const [mode, data] of stream) { if (mode === 'updates') { const node = Object.keys(data)[0]; const updateData = { ...data[node] }; if (updateData.messages) { updateData.messages = this.mapMessages(updateData.messages); } if (updateData.feedbackHistory) { updateData.feedbackHistory = this.mapMessages( updateData.feedbackHistory, ); } observer.next({ type: 'node', node, data: updateData }); } } // After stream, get authoritative state const fullState = await this.graph.getState({ configurable: { thread_id: sessionId }, }); const finalData = fullState.values as EvaluationState; if (finalData && finalData.messages) { console.log( `[AssessmentService] submitAnswerStream Final Authoritative State messages:`, finalData.messages.length, ); session.messages = finalData.messages; session.feedbackHistory = finalData.feedbackHistory || []; session.questions_json = finalData.questions; session.currentQuestionIndex = finalData.currentQuestionIndex; session.followUpCount = finalData.followUpCount || 0; if (finalData.report) { session.status = AssessmentStatus.COMPLETED; session.finalReport = finalData.report; const scores = finalData.scores; const questions = finalData.questions || []; if (questions.length > 0 && Object.keys(scores).length > 0) { let totalWeightedScore = 0; let totalWeight = 0; questions.forEach((q: any, idx: number) => { const score = scores[q.id || idx.toString()]; if (score !== undefined) { // Standard=1.0, Advanced=1.5, Specialist=2.0 const weight = q.difficulty === 'Specialist' ? 2.0 : q.difficulty === 'Advanced' ? 1.5 : 1.0; totalWeightedScore += score * weight; totalWeight += weight; this.logger.debug( `[WeightedScoring] Q${idx}: Score=${score}, Difficulty=${q.difficulty}, Weight=${weight}`, ); } }); session.finalScore = totalWeight > 0 ? totalWeightedScore / totalWeight : 0; this.logger.log( `[WeightedScoring] Session ${sessionId} Final Score: ${session.finalScore} (Weighted Avg)`, ); } } await this.sessionRepository.save(session); const mappedData: any = { ...finalData }; mappedData.messages = this.mapMessages(finalData.messages); mappedData.feedbackHistory = this.mapMessages( finalData.feedbackHistory || [], ); mappedData.status = session.status; mappedData.report = session.finalReport; observer.next({ type: 'final', data: mappedData }); } observer.complete(); } catch (err) { observer.error(err); } })(); }); } /** * Retrieves the current state of a session. */ async getSessionState(sessionId: string, userId: string): Promise { this.logger.log( `Retrieving state for session ${sessionId} for user ${userId}`, ); const session = await this.sessionRepository.findOne({ where: { id: sessionId, userId }, relations: ['template'], }); if (!session) throw new NotFoundException('Session not found'); // Ensure graph has state (lazy init or recovery) await this.ensureGraphState(sessionId, session); const state = await this.graph.getState({ configurable: { thread_id: sessionId }, }); const values = { ...state.values }; if (values.messages) { values.messages = this.mapMessages(values.messages); } if (values.feedbackHistory) { values.feedbackHistory = this.mapMessages(values.feedbackHistory); } return values; } /** * Retrieves assessment session history for a user. */ async getHistory( userId: string, tenantId: string, ): Promise { const history = await this.sessionRepository.find({ where: { userId, tenantId }, order: { createdAt: 'DESC' }, relations: ['knowledgeBase', 'knowledgeGroup'], }); // Map questions_json to questions for frontend compatibility const mappedHistory = history.map((session) => ({ ...session, questions: session.questions_json || [], })) as any; this.logger.log(`Found ${history.length} historical sessions`); return mappedHistory; } /** * Deletes an assessment session. */ async deleteSession(sessionId: string, user: any): Promise { this.logger.log( `Deleting session ${sessionId} for user ${user.id} (role: ${user.role})`, ); const userId = user.id; const isAdmin = user.role === 'super_admin' || user.role === 'admin'; const deleteCondition: any = { id: sessionId }; if (!isAdmin) { deleteCondition.userId = userId; } const result = await this.sessionRepository.delete(deleteCondition); if (result.affected === 0) { throw new NotFoundException( 'Session not found or you do not have permission to delete it', ); } } /** * Ensures the graph checkpointer has the state for the given session. * Useful for lazy initialization and recovery after server restarts. */ private async ensureGraphState( sessionId: string, session: AssessmentSession, ): Promise { const state = await this.graph.getState({ configurable: { thread_id: sessionId }, }); if ( !state.values || Object.keys(state.values).length === 0 || !state.values.messages || state.values.messages.length === 0 ) { const hasHistory = session.messages && session.messages.length > 0; if (hasHistory) { this.logger.log( `[ensureGraphState] Recovering state from DB for session ${sessionId}`, ); const historicalMessages = this.hydrateMessages(session.messages); await this.graph.updateState( { configurable: { thread_id: sessionId } }, { assessmentSessionId: sessionId, knowledgeBaseId: session.knowledgeBaseId || session.knowledgeGroupId || '', messages: historicalMessages, feedbackHistory: this.hydrateMessages( session.feedbackHistory || [], ), questions: session.questions_json || [], currentQuestionIndex: session.currentQuestionIndex || 0, followUpCount: session.followUpCount || 0, questionCount: session.templateJson?.questionCount || 5, difficultyDistribution: session.templateJson?.difficultyDistribution, style: session.templateJson?.style, keywords: session.templateJson?.keywords, }, 'grader', // Recovering a session with messages should prep for grading the next input ); } else { this.logger.log(`Initializing new state for session ${sessionId}`); const content = await this.getSessionContent(session); const model = await this.getModel(session.tenantId); const initialState: Partial = { assessmentSessionId: sessionId, knowledgeBaseId: session.knowledgeBaseId || session.knowledgeGroupId || '', messages: [], questionCount: session.templateJson?.questionCount, difficultyDistribution: session.templateJson?.difficultyDistribution, style: session.templateJson?.style, keywords: session.templateJson?.keywords, language: session.language || 'en', }; this.logger.log( `[ensureGraphState] Initializing with questionCount=${initialState.questionCount}, keywords=${initialState.keywords?.join(',')}, style=${initialState.style}`, ); const resultStream = await this.graph.stream(initialState, { configurable: { thread_id: sessionId, model, knowledgeBaseContent: content, language: session.language || 'en', keywords: session.templateJson?.keywords, questionCount: session.templateJson?.questionCount, difficultyDistribution: session.templateJson?.difficultyDistribution, style: session.templateJson?.style, }, streamMode: ['values', 'updates'], }); let finalInvokeResult: any = null; const nodes: string[] = []; for await (const [mode, data] of resultStream) { if (mode === 'values') finalInvokeResult = data; else if (mode === 'updates') nodes.push(...Object.keys(data)); } if (finalInvokeResult.messages) { session.messages = finalInvokeResult.messages; session.feedbackHistory = finalInvokeResult.feedbackHistory || []; session.questions_json = finalInvokeResult.questions; session.currentQuestionIndex = finalInvokeResult.currentQuestionIndex; session.followUpCount = finalInvokeResult.followUpCount || 0; await this.sessionRepository.save(session); } } } } /** * Re-hydrates plain objects from DB into LangChain message instances. */ private hydrateMessages(messages: any[]): BaseMessage[] { if (!messages) return []; return messages.map((m) => { if (m instanceof BaseMessage) return m; const content = m.content || m.text || (typeof m === 'string' ? m : ''); const type = m.role || m.type || m._getType?.() || 'ai'; if (type === 'human' || type === 'user') { return new HumanMessage(content); } else if (type === 'ai' || type === 'assistant') { return new AIMessage(content); } else if (type === 'system') { return new SystemMessage(content); } return new AIMessage(content); }); } /** * Maps LangChain messages to a simple format for the frontend and storage. */ private mapMessages(messages: BaseMessage[]): any[] { if (!messages) return []; return messages.map((msg) => { const type = msg._getType(); let role: 'user' | 'assistant' | 'system' = 'system'; if (type === 'human') role = 'user'; else if (type === 'ai') role = 'assistant'; else if (type === 'system') role = 'system'; return { role, content: msg.content, type, // Also store the LangChain type for easier hydration timestamp: (msg as any).timestamp || Date.now(), }; }); } }