| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026 |
- import {
- Injectable,
- Logger,
- NotFoundException,
- Inject,
- forwardRef,
- } from '@nestjs/common';
- import { InjectRepository } from '@nestjs/typeorm';
- import { Repository, DeepPartial } from 'typeorm';
- import { ConfigService } from '@nestjs/config';
- import { ChatOpenAI } from '@langchain/openai';
- import {
- HumanMessage,
- BaseMessage,
- AIMessage,
- SystemMessage,
- } from '@langchain/core/messages';
- import { Observable, from, map, mergeMap, concatMap } from 'rxjs';
- import {
- AssessmentSession,
- AssessmentStatus,
- } from './entities/assessment-session.entity';
- import { AssessmentQuestion } from './entities/assessment-question.entity';
- import { AssessmentAnswer } from './entities/assessment-answer.entity';
- import { AssessmentTemplate } from './entities/assessment-template.entity';
- import { KnowledgeBaseService } from '../knowledge-base/knowledge-base.service';
- import { KnowledgeGroupService } from '../knowledge-group/knowledge-group.service';
- import { ModelConfigService } from '../model-config/model-config.service';
- import { ModelType } from '../types';
- import { ElasticsearchService } from '../elasticsearch/elasticsearch.service';
- import { RagService } from '../rag/rag.service';
- import { ChatService } from '../chat/chat.service';
- import { createEvaluationGraph } from './graph/builder';
- import { EvaluationState } from './graph/state';
- import { TemplateService } from './services/template.service';
- import { ContentFilterService } from './services/content-filter.service';
- import { I18nService } from '../i18n/i18n.service';
- import { TenantService } from '../tenant/tenant.service';
- @Injectable()
- export class AssessmentService {
- private readonly logger = new Logger(AssessmentService.name);
- private readonly graph = createEvaluationGraph();
- constructor(
- @InjectRepository(AssessmentSession)
- private sessionRepository: Repository<AssessmentSession>,
- @InjectRepository(AssessmentQuestion)
- private questionRepository: Repository<AssessmentQuestion>,
- @InjectRepository(AssessmentAnswer)
- private answerRepository: Repository<AssessmentAnswer>,
- @Inject(forwardRef(() => KnowledgeBaseService))
- private kbService: KnowledgeBaseService,
- @Inject(forwardRef(() => KnowledgeGroupService))
- private groupService: KnowledgeGroupService,
- @Inject(forwardRef(() => ModelConfigService))
- private modelConfigService: ModelConfigService,
- private configService: ConfigService,
- private templateService: TemplateService,
- private contentFilterService: ContentFilterService,
- private ragService: RagService,
- @Inject(forwardRef(() => ChatService))
- private chatService: ChatService,
- private i18nService: I18nService,
- private tenantService: TenantService,
- ) {}
- private async getModel(tenantId: string): Promise<ChatOpenAI> {
- const config = await this.modelConfigService.findDefaultByType(
- tenantId,
- ModelType.LLM,
- );
- return new ChatOpenAI({
- apiKey: config.apiKey || 'ollama',
- modelName: config.modelId,
- temperature: 0.7,
- configuration: {
- baseURL: config.baseUrl || 'http://localhost:11434/v1',
- },
- });
- }
- /**
- * Starts a new assessment session.
- */
- private async getSessionContent(session: {
- knowledgeBaseId?: string | null;
- knowledgeGroupId?: string | null;
- userId: string;
- tenantId: string;
- templateJson?: any;
- }): Promise<string> {
- const kbId = session.knowledgeBaseId || session.knowledgeGroupId;
- this.logger.log(`[getSessionContent] Starting for KB/Group ID: ${kbId}`);
- if (!kbId) {
- this.logger.warn(`[getSessionContent] No KB/Group ID provided`);
- return '';
- }
- const keywords = session.templateJson?.keywords || [];
- // If keywords are provided, use RagService (Hybrid Search) to find relevant content
- if (keywords.length > 0) {
- this.logger.log(
- `[getSessionContent] Keywords detected, performing hybrid search via RagService: ${keywords.join(', ')}`,
- );
- try {
- // 1. Determine file IDs to include in search
- let fileIds: string[] = [];
- if (session.knowledgeBaseId) {
- fileIds = [session.knowledgeBaseId];
- } else if (session.knowledgeGroupId) {
- fileIds = await this.groupService.getFileIdsByGroups(
- [session.knowledgeGroupId],
- session.userId,
- session.tenantId,
- );
- }
- if (fileIds.length > 0) {
- const query = keywords.join(' ');
- this.logger.log(
- `[getSessionContent] Performing high-fidelity grounded search (streamChat-style). Keywords: "${query}"`,
- );
- // 1. Get default embedding model (strict logic from streamChat)
- const embeddingModel =
- await this.modelConfigService.findDefaultByType(
- session.tenantId || 'default',
- ModelType.EMBEDDING,
- );
- // 2. Perform advanced RAG search
- const ragResults = await this.ragService.searchKnowledge(
- query,
- session.userId,
- 20, // Increased topK to 20 for broader question coverage
- 0.1, // Lenient similarityThreshold (Chat/Rag defaults are 0.3)
- embeddingModel?.id,
- true, // enableFullTextSearch
- true, // enableRerank
- undefined, // selectedRerankId
- undefined, // selectedGroups
- fileIds,
- 0.3, // Lenient rerankSimilarityThreshold (Chat/Rag defaults are 0.5)
- session.tenantId,
- );
- // 3. Format context using localized labels (equivalent to buildContext)
- const language = session.templateJson?.language || 'zh';
- const searchContent = ragResults
- .map((result, index) => {
- // this.logger.debug(`[getSessionContent] Found chunk [${index + 1}]: score=${result.score.toFixed(4)}, file=${result.fileName}, contentPreview=${result.content}...`);
- return `[${index + 1}] ${this.i18nService.getMessage('file', language)}:${result.fileName}\n${this.i18nService.getMessage('content', language)}:${result.content}\n`;
- })
- .join('\n');
- if (searchContent && searchContent.trim().length > 0) {
- this.logger.log(
- `[getSessionContent] SUCCESS: Found ${ragResults.length} relevant chunks. Total length: ${searchContent.length}`,
- );
- // this.logger.log(`[getSessionContent] --- AI Context Start ---\n${searchContent}\n[getSessionContent] --- AI Context End ---`);
- return searchContent;
- } else {
- this.logger.warn(
- `[getSessionContent] High-fidelity search returned no results for query: "${query}".`,
- );
- }
- } else {
- this.logger.warn(
- `[getSessionContent] No files found for search scope (KB: ${session.knowledgeBaseId}, Group: ${session.knowledgeGroupId})`,
- );
- }
- } catch (err) {
- this.logger.error(
- `[getSessionContent] Grounded search failed unexpectedly: ${err.message}`,
- err.stack,
- );
- }
- this.logger.warn(
- `[getSessionContent] Grounded search failed or returned nothing. One common reason is that the keywords are not present in the indexed documents.`,
- );
- }
- // Fallback or No Keywords: Original behavior (full content retrieval)
- let content = '';
- if (session.knowledgeBaseId) {
- this.logger.debug(
- `[getSessionContent] Fetching content for KnowledgeBase: ${kbId}`,
- );
- const kb = await (this.kbService as any).kbRepository.findOne({
- where: { id: kbId, tenantId: session.tenantId },
- });
- if (kb) {
- content = kb.content || '';
- this.logger.debug(
- `[getSessionContent] Found KB content, length: ${content.length}`,
- );
- } else {
- this.logger.warn(
- `[getSessionContent] KnowledgeBase not found: ${kbId}`,
- );
- }
- } else {
- try {
- this.logger.debug(
- `[getSessionContent] Fetching content for KnowledgeGroup: ${kbId}`,
- );
- const groupFiles = await this.groupService.getGroupFiles(
- kbId,
- session.userId,
- session.tenantId,
- );
- this.logger.debug(
- `[getSessionContent] Found ${groupFiles.length} files in group`,
- );
- content = groupFiles
- .filter((f) => f.content)
- .map((f) => {
- this.logger.debug(
- `[getSessionContent] Including file: ${f.title || f.originalName}, content length: ${f.content?.length || 0}`,
- );
- return `--- Document: ${f.title || f.originalName} ---\n${f.content}`;
- })
- .join('\n\n');
- this.logger.debug(
- `[getSessionContent] Total group content length: ${content.length}`,
- );
- } catch (err) {
- this.logger.error(
- `[getSessionContent] Failed to get group files: ${err.message}`,
- );
- }
- }
- // Apply keyword filter (regex based) as an extra layer if still using full content
- if (content && keywords.length > 0) {
- this.logger.debug(
- `[getSessionContent] Applying fallback keyword filters: ${keywords.join(', ')}`,
- );
- const prevLen = content.length;
- content = this.contentFilterService.filterContent(content, keywords);
- this.logger.debug(
- `[getSessionContent] After filtering, content length: ${content.length} (was ${prevLen})`,
- );
- }
- this.logger.log(
- `[getSessionContent] Final content for AI generation (Length: ${content.length})`,
- );
- this.logger.debug(
- `[getSessionContent] Content Preview: ${content.substring(0, 500)}...`,
- );
- return content;
- }
- /**
- * Starts a new assessment session.
- * kbId can be a KnowledgeBase ID or a KnowledgeGroup ID.
- */
- async startSession(
- userId: string,
- kbId: string | undefined,
- tenantId: string,
- language: string = 'en',
- templateId?: string,
- ): Promise<AssessmentSession> {
- this.logger.log(
- `[startSession] Starting session for user ${userId}, templateId: ${templateId}, kbId: ${kbId}`,
- );
- let template: AssessmentTemplate | null = null;
- if (templateId) {
- template = await this.templateService.findOne(
- templateId,
- userId,
- tenantId,
- );
- this.logger.debug(
- `[startSession] Found template: ${template?.name}, linked group: ${template?.knowledgeGroupId}`,
- );
- }
- // Use kbId if provided, otherwise fall back to template's group ID
- const activeKbId = kbId || template?.knowledgeGroupId;
- this.logger.log(`[startSession] activeKbId resolved to: ${activeKbId}`);
- if (!activeKbId) {
- this.logger.error(`[startSession] No knowledge source resolved`);
- throw new Error('Knowledge source (ID or Template) must be provided.');
- }
- // Try to determine if it's a KB or Group and check permissions
- let isKb = false;
- try {
- await this.kbService.findOne(activeKbId, userId, tenantId);
- isKb = true;
- } catch (kbError) {
- if (kbError instanceof NotFoundException) {
- // Try finding it as a Group
- try {
- await this.groupService.findOne(activeKbId, userId, tenantId);
- } catch (groupError) {
- this.logger.error(
- `[startSession] Knowledge source ${activeKbId} not found as KB or Group`,
- );
- throw new NotFoundException(
- this.i18nService.getMessage('knowledgeSourceNotFound') ||
- 'Knowledge source not found',
- );
- }
- } else {
- throw kbError; // e.g. ForbiddenException
- }
- }
- this.logger.debug(`[startSession] isKb: ${isKb}`);
- const templateData = template
- ? {
- name: template.name,
- keywords: template.keywords,
- questionCount: template.questionCount,
- difficultyDistribution: template.difficultyDistribution,
- style: template.style,
- }
- : undefined;
- const sessionData: any = {
- userId,
- tenantId,
- knowledgeBaseId: isKb ? activeKbId : undefined,
- knowledgeGroupId: isKb ? undefined : activeKbId,
- templateId,
- templateJson: templateData,
- status: AssessmentStatus.IN_PROGRESS,
- language,
- };
- const content = await this.getSessionContent(sessionData);
- if (!content || content.trim().length < 10) {
- this.logger.error(
- `[startSession] Insufficient content length: ${content?.length || 0}`,
- );
- throw new Error(
- 'Selected knowledge source has no sufficient content for evaluation.',
- );
- }
- const session = this.sessionRepository.create(
- sessionData as DeepPartial<AssessmentSession>,
- );
- const savedSession = (await this.sessionRepository.save(
- session as any,
- )) as AssessmentSession;
- // Thread ID for LangGraph is the session ID
- savedSession.threadId = savedSession.id;
- await this.sessionRepository.save(savedSession);
- this.logger.log(
- `[startSession] Session ${savedSession.id} created and saved`,
- );
- return savedSession;
- }
- /**
- * Specialized streaming start for initial generation.
- */
- startSessionStream(sessionId: string, userId: string): Observable<any> {
- return new Observable((observer) => {
- (async () => {
- try {
- const session = await this.sessionRepository.findOne({
- where: { id: sessionId, userId },
- });
- if (!session) {
- observer.error(new NotFoundException('Session not found'));
- return;
- }
- const model = await this.getModel(session.tenantId);
- const content = await this.getSessionContent(session);
- // Check if we already have state
- const existingState = await this.graph.getState({
- configurable: { thread_id: sessionId },
- });
- if (
- existingState &&
- existingState.values &&
- existingState.values.questions?.length > 0
- ) {
- this.logger.log(
- `Session ${sessionId} already has state, skipping generation.`,
- );
- const mappedData = { ...existingState.values };
- mappedData.messages = this.mapMessages(mappedData.messages || []);
- mappedData.feedbackHistory = this.mapMessages(
- mappedData.feedbackHistory || [],
- );
- observer.next({ type: 'final', data: mappedData });
- observer.complete();
- return;
- }
- const initialState: Partial<EvaluationState> = {
- assessmentSessionId: sessionId,
- knowledgeBaseId:
- session.knowledgeBaseId || session.knowledgeGroupId || '',
- messages: [],
- questionCount: session.templateJson?.questionCount,
- difficultyDistribution:
- session.templateJson?.difficultyDistribution,
- style: session.templateJson?.style,
- keywords: session.templateJson?.keywords,
- };
- const isZh = (session.language || 'en') === 'zh';
- const isJa = session.language === 'ja';
- const initialMsg = isZh
- ? '现在生成评估问题。请务必使用中文。'
- : isJa
- ? '今すぐアセスメント問題を生成してください。必ず日本語で回答してください。'
- : 'Generate the assessment questions now. Please strictly respond in English.';
- this.logger.log(
- `[startSessionStream] Starting stream for session ${sessionId}`,
- );
- const stream = await this.graph.stream(
- {
- ...initialState,
- language: session.language || 'en', // Ensure language is passed in initial state
- messages: [new HumanMessage(initialMsg)],
- },
- {
- configurable: {
- thread_id: sessionId,
- model,
- knowledgeBaseContent: content,
- language: session.language || 'en',
- questionCount: session.templateJson?.questionCount,
- difficultyDistribution:
- session.templateJson?.difficultyDistribution,
- style: session.templateJson?.style,
- keywords: session.templateJson?.keywords,
- },
- streamMode: ['values', 'updates'],
- },
- );
- this.logger.debug(`[startSessionStream] Graph stream started`);
- for await (const [mode, data] of stream) {
- if (mode === 'updates') {
- const node = Object.keys(data)[0];
- const updateData = { ...data[node] };
- if (updateData.messages) {
- updateData.messages = this.mapMessages(updateData.messages);
- }
- if (updateData.feedbackHistory) {
- updateData.feedbackHistory = this.mapMessages(
- updateData.feedbackHistory,
- );
- }
- observer.next({ type: 'node', node, data: updateData });
- }
- }
- // After stream, get the latest authoritative state from checkpointer
- const fullState = await this.graph.getState({
- configurable: { thread_id: sessionId },
- });
- const finalData = fullState.values as EvaluationState;
- if (finalData && finalData.messages) {
- console.log(
- `[AssessmentService] startSessionStream Final Authoritative State messages:`,
- finalData.messages.length,
- );
- session.messages = finalData.messages;
- session.feedbackHistory = finalData.feedbackHistory || [];
- session.questions_json = finalData.questions;
- session.currentQuestionIndex = finalData.currentQuestionIndex;
- session.followUpCount = finalData.followUpCount || 0;
- if (finalData.report) {
- session.status = AssessmentStatus.COMPLETED;
- session.finalReport = finalData.report;
- const scores = finalData.scores;
- const questions = finalData.questions || [];
- if (questions.length > 0 && Object.keys(scores).length > 0) {
- let totalWeightedScore = 0;
- let totalWeight = 0;
- questions.forEach((q: any, idx: number) => {
- const score = scores[q.id || idx.toString()];
- if (score !== undefined) {
- const weight =
- q.difficulty === 'Specialist'
- ? 2.0
- : q.difficulty === 'Advanced'
- ? 1.5
- : 1.0;
- totalWeightedScore += score * weight;
- totalWeight += weight;
- }
- });
- session.finalScore =
- totalWeight > 0 ? totalWeightedScore / totalWeight : 0;
- }
- }
- await this.sessionRepository.save(session);
- const mappedData: any = { ...finalData };
- mappedData.messages = this.mapMessages(finalData.messages);
- mappedData.feedbackHistory = this.mapMessages(
- finalData.feedbackHistory || [],
- );
- mappedData.status = session.status;
- mappedData.report = session.finalReport;
- mappedData.finalScore = session.finalScore;
- observer.next({ type: 'final', data: mappedData });
- }
- observer.complete();
- } catch (err) {
- observer.error(err);
- }
- })();
- });
- }
- /**
- * Submits a user's answer and continues the assessment.
- */
- async submitAnswer(
- sessionId: string,
- userId: string,
- answer: string,
- language: string = 'en',
- ): Promise<any> {
- const session = await this.sessionRepository.findOne({
- where: { id: sessionId, userId },
- relations: ['template'],
- });
- if (!session) throw new NotFoundException('Session not found');
- const model = await this.getModel(session.tenantId);
- await this.ensureGraphState(sessionId, session);
- const content = await this.getSessionContent(session);
- // Update state with human message first to ensure it's in history before resumption
- await this.graph.updateState(
- { configurable: { thread_id: sessionId } },
- { messages: [new HumanMessage(answer)] },
- );
- this.logger.debug(`[submitAnswer] Resuming graph for session ${sessionId}`);
- let finalResult: any = null;
- // Resume from the last interrupt (typically after interviewer)
- const stream = await this.graph.stream(null, {
- configurable: {
- thread_id: sessionId,
- model,
- knowledgeBaseContent: content,
- language: session.language || language,
- questionCount: session.templateJson?.questionCount,
- difficultyDistribution: session.templateJson?.difficultyDistribution,
- style: session.templateJson?.style,
- keywords: session.templateJson?.keywords,
- },
- streamMode: ['values', 'updates'],
- });
- for await (const [mode, data] of stream) {
- if (mode === 'values') {
- // This might be the interrupt info if interrupted
- finalResult = data;
- } else if (mode === 'updates') {
- const nodeName = Object.keys(data)[0];
- this.logger.debug(`[submitAnswer] Node completed: ${nodeName}`);
- }
- }
- // Always get the latest authoritative state from checkpointer after the stream
- const fullState = await this.graph.getState({
- configurable: { thread_id: sessionId },
- });
- finalResult = fullState.values as EvaluationState;
- this.logger.log(
- `[submitAnswer] Stream finished. State Index: ${finalResult.currentQuestionIndex}, Questions: ${finalResult.questions?.length}, HasReport: ${!!finalResult.report}`,
- );
- if (finalResult && (finalResult.messages || finalResult.questions)) {
- session.messages = finalResult.messages;
- session.questions_json = finalResult.questions;
- session.currentQuestionIndex = finalResult.currentQuestionIndex;
- session.followUpCount = finalResult.followUpCount || 0;
- if (finalResult.report) {
- session.status = AssessmentStatus.COMPLETED;
- session.finalReport = finalResult.report;
- const scores = finalResult.scores as Record<string, number>;
- const questions = finalResult.questions || [];
- if (questions.length > 0 && Object.keys(scores).length > 0) {
- let totalWeightedScore = 0;
- let totalWeight = 0;
- questions.forEach((q: any, idx: number) => {
- const score = scores[q.id || idx.toString()];
- if (score !== undefined) {
- const weight =
- q.difficulty === 'Specialist'
- ? 2.0
- : q.difficulty === 'Advanced'
- ? 1.5
- : 1.0;
- totalWeightedScore += score * weight;
- totalWeight += weight;
- }
- });
- session.finalScore =
- totalWeight > 0 ? totalWeightedScore / totalWeight : 0;
- }
- }
- session.feedbackHistory = finalResult.feedbackHistory || [];
- await this.sessionRepository.save(session);
- // Map result for return
- finalResult.messages = this.mapMessages(finalResult.messages);
- finalResult.feedbackHistory = this.mapMessages(
- finalResult.feedbackHistory || [],
- );
- finalResult.report = session.finalReport;
- finalResult.finalScore = session.finalScore;
- this.logger.log(
- `[submitAnswer] session saved. DB Status: ${session.status}, Index: ${session.currentQuestionIndex}`,
- );
- this.logger.log(
- `[submitAnswer] finalResult check: hasQuestions=${!!finalResult.questions}, questionsLen=${finalResult.questions?.length}, hasReport=${!!finalResult.report}`,
- );
- this.logger.debug(
- `[submitAnswer] finalResult keys: ${Object.keys(finalResult).join(', ')}`,
- );
- this.logger.log(
- `[submitAnswer] session updated: status=${session.status}, index=${session.currentQuestionIndex}`,
- );
- } else {
- this.logger.warn(
- `[submitAnswer] finalResult has no usable data! Keys: ${Object.keys(finalResult || {}).join(', ')}`,
- );
- }
- return finalResult;
- }
- /**
- * Streaming version of submitAnswer.
- */
- submitAnswerStream(
- sessionId: string,
- userId: string,
- answer: string,
- language: string = 'en',
- ): Observable<any> {
- return new Observable((observer) => {
- (async () => {
- try {
- const session = await this.sessionRepository.findOne({
- where: { id: sessionId, userId },
- });
- if (!session) {
- observer.error(new NotFoundException('Session not found'));
- return;
- }
- const model = await this.getModel(session.tenantId);
- const content = await this.getSessionContent(session);
- await this.ensureGraphState(sessionId, session);
- const graphState = await this.graph.getState({
- configurable: { thread_id: sessionId },
- });
- const hasState =
- graphState &&
- graphState.values &&
- Object.keys(graphState.values).length > 0;
- console.log(
- `[AssessmentService] submitAnswerStream: sessionId=${sessionId}, hasState=${hasState}, nextNodes=[${graphState.next || ''}]`,
- );
- // Update state with human message first to ensure it's in history
- await this.graph.updateState(
- { configurable: { thread_id: sessionId } },
- { messages: [new HumanMessage(answer)] },
- );
- // Resume from the last interrupt
- const stream = await this.graph.stream(null, {
- configurable: {
- thread_id: sessionId,
- model,
- knowledgeBaseContent: content,
- language: session.language || language,
- },
- streamMode: ['values', 'updates'],
- });
- for await (const [mode, data] of stream) {
- if (mode === 'updates') {
- const node = Object.keys(data)[0];
- const updateData = { ...data[node] };
- if (updateData.messages) {
- updateData.messages = this.mapMessages(updateData.messages);
- }
- if (updateData.feedbackHistory) {
- updateData.feedbackHistory = this.mapMessages(
- updateData.feedbackHistory,
- );
- }
- observer.next({ type: 'node', node, data: updateData });
- }
- }
- // After stream, get authoritative state
- const fullState = await this.graph.getState({
- configurable: { thread_id: sessionId },
- });
- const finalData = fullState.values as EvaluationState;
- if (finalData && finalData.messages) {
- console.log(
- `[AssessmentService] submitAnswerStream Final Authoritative State messages:`,
- finalData.messages.length,
- );
- session.messages = finalData.messages;
- session.feedbackHistory = finalData.feedbackHistory || [];
- session.questions_json = finalData.questions;
- session.currentQuestionIndex = finalData.currentQuestionIndex;
- session.followUpCount = finalData.followUpCount || 0;
- if (finalData.report) {
- session.status = AssessmentStatus.COMPLETED;
- session.finalReport = finalData.report;
- const scores = finalData.scores;
- const questions = finalData.questions || [];
- if (questions.length > 0 && Object.keys(scores).length > 0) {
- let totalWeightedScore = 0;
- let totalWeight = 0;
- questions.forEach((q: any, idx: number) => {
- const score = scores[q.id || idx.toString()];
- if (score !== undefined) {
- // Standard=1.0, Advanced=1.5, Specialist=2.0
- const weight =
- q.difficulty === 'Specialist'
- ? 2.0
- : q.difficulty === 'Advanced'
- ? 1.5
- : 1.0;
- totalWeightedScore += score * weight;
- totalWeight += weight;
- this.logger.debug(
- `[WeightedScoring] Q${idx}: Score=${score}, Difficulty=${q.difficulty}, Weight=${weight}`,
- );
- }
- });
- session.finalScore =
- totalWeight > 0 ? totalWeightedScore / totalWeight : 0;
- this.logger.log(
- `[WeightedScoring] Session ${sessionId} Final Score: ${session.finalScore} (Weighted Avg)`,
- );
- }
- }
- await this.sessionRepository.save(session);
- const mappedData: any = { ...finalData };
- mappedData.messages = this.mapMessages(finalData.messages);
- mappedData.feedbackHistory = this.mapMessages(
- finalData.feedbackHistory || [],
- );
- mappedData.status = session.status;
- mappedData.report = session.finalReport;
- observer.next({ type: 'final', data: mappedData });
- }
- observer.complete();
- } catch (err) {
- observer.error(err);
- }
- })();
- });
- }
- /**
- * Retrieves the current state of a session.
- */
- async getSessionState(sessionId: string, userId: string): Promise<any> {
- this.logger.log(
- `Retrieving state for session ${sessionId} for user ${userId}`,
- );
- const session = await this.sessionRepository.findOne({
- where: { id: sessionId, userId },
- relations: ['template'],
- });
- if (!session) throw new NotFoundException('Session not found');
- // Ensure graph has state (lazy init or recovery)
- await this.ensureGraphState(sessionId, session);
- const state = await this.graph.getState({
- configurable: { thread_id: sessionId },
- });
- const values = { ...state.values };
- if (values.messages) {
- values.messages = this.mapMessages(values.messages);
- }
- if (values.feedbackHistory) {
- values.feedbackHistory = this.mapMessages(values.feedbackHistory);
- }
- return values;
- }
- /**
- * Retrieves assessment session history for a user.
- */
- async getHistory(
- userId: string,
- tenantId: string,
- ): Promise<AssessmentSession[]> {
- const history = await this.sessionRepository.find({
- where: { userId, tenantId },
- order: { createdAt: 'DESC' },
- relations: ['knowledgeBase', 'knowledgeGroup'],
- });
- // Map questions_json to questions for frontend compatibility
- const mappedHistory = history.map((session) => ({
- ...session,
- questions: session.questions_json || [],
- })) as any;
- this.logger.log(`Found ${history.length} historical sessions`);
- return mappedHistory;
- }
- /**
- * Deletes an assessment session.
- */
- async deleteSession(sessionId: string, user: any): Promise<void> {
- this.logger.log(
- `Deleting session ${sessionId} for user ${user.id} (role: ${user.role})`,
- );
- const userId = user.id;
- const isAdmin = user.role === 'super_admin' || user.role === 'admin';
- const deleteCondition: any = { id: sessionId };
- if (!isAdmin) {
- deleteCondition.userId = userId;
- }
- const result = await this.sessionRepository.delete(deleteCondition);
- if (result.affected === 0) {
- throw new NotFoundException(
- 'Session not found or you do not have permission to delete it',
- );
- }
- }
- /**
- * Ensures the graph checkpointer has the state for the given session.
- * Useful for lazy initialization and recovery after server restarts.
- */
- private async ensureGraphState(
- sessionId: string,
- session: AssessmentSession,
- ): Promise<void> {
- const state = await this.graph.getState({
- configurable: { thread_id: sessionId },
- });
- if (
- !state.values ||
- Object.keys(state.values).length === 0 ||
- !state.values.messages ||
- state.values.messages.length === 0
- ) {
- const hasHistory = session.messages && session.messages.length > 0;
- if (hasHistory) {
- this.logger.log(
- `[ensureGraphState] Recovering state from DB for session ${sessionId}`,
- );
- const historicalMessages = this.hydrateMessages(session.messages);
- await this.graph.updateState(
- { configurable: { thread_id: sessionId } },
- {
- assessmentSessionId: sessionId,
- knowledgeBaseId:
- session.knowledgeBaseId || session.knowledgeGroupId || '',
- messages: historicalMessages,
- feedbackHistory: this.hydrateMessages(
- session.feedbackHistory || [],
- ),
- questions: session.questions_json || [],
- currentQuestionIndex: session.currentQuestionIndex || 0,
- followUpCount: session.followUpCount || 0,
- questionCount: session.templateJson?.questionCount || 5,
- difficultyDistribution:
- session.templateJson?.difficultyDistribution,
- style: session.templateJson?.style,
- keywords: session.templateJson?.keywords,
- },
- 'grader', // Recovering a session with messages should prep for grading the next input
- );
- } else {
- this.logger.log(`Initializing new state for session ${sessionId}`);
- const content = await this.getSessionContent(session);
- const model = await this.getModel(session.tenantId);
- const initialState: Partial<EvaluationState> = {
- assessmentSessionId: sessionId,
- knowledgeBaseId:
- session.knowledgeBaseId || session.knowledgeGroupId || '',
- messages: [],
- questionCount: session.templateJson?.questionCount,
- difficultyDistribution: session.templateJson?.difficultyDistribution,
- style: session.templateJson?.style,
- keywords: session.templateJson?.keywords,
- language: session.language || 'en',
- };
- this.logger.log(
- `[ensureGraphState] Initializing with questionCount=${initialState.questionCount}, keywords=${initialState.keywords?.join(',')}, style=${initialState.style}`,
- );
- const resultStream = await this.graph.stream(initialState, {
- configurable: {
- thread_id: sessionId,
- model,
- knowledgeBaseContent: content,
- language: session.language || 'en',
- keywords: session.templateJson?.keywords,
- questionCount: session.templateJson?.questionCount,
- difficultyDistribution:
- session.templateJson?.difficultyDistribution,
- style: session.templateJson?.style,
- },
- streamMode: ['values', 'updates'],
- });
- let finalInvokeResult: any = null;
- const nodes: string[] = [];
- for await (const [mode, data] of resultStream) {
- if (mode === 'values') finalInvokeResult = data;
- else if (mode === 'updates') nodes.push(...Object.keys(data));
- }
- if (finalInvokeResult.messages) {
- session.messages = finalInvokeResult.messages;
- session.feedbackHistory = finalInvokeResult.feedbackHistory || [];
- session.questions_json = finalInvokeResult.questions;
- session.currentQuestionIndex = finalInvokeResult.currentQuestionIndex;
- session.followUpCount = finalInvokeResult.followUpCount || 0;
- await this.sessionRepository.save(session);
- }
- }
- }
- }
- /**
- * Re-hydrates plain objects from DB into LangChain message instances.
- */
- private hydrateMessages(messages: any[]): BaseMessage[] {
- if (!messages) return [];
- return messages.map((m) => {
- if (m instanceof BaseMessage) return m;
- const content = m.content || m.text || (typeof m === 'string' ? m : '');
- const type = m.role || m.type || m._getType?.() || 'ai';
- if (type === 'human' || type === 'user') {
- return new HumanMessage(content);
- } else if (type === 'ai' || type === 'assistant') {
- return new AIMessage(content);
- } else if (type === 'system') {
- return new SystemMessage(content);
- }
- return new AIMessage(content);
- });
- }
- /**
- * Maps LangChain messages to a simple format for the frontend and storage.
- */
- private mapMessages(messages: BaseMessage[]): any[] {
- if (!messages) return [];
- return messages.map((msg) => {
- const type = msg._getType();
- let role: 'user' | 'assistant' | 'system' = 'system';
- if (type === 'human') role = 'user';
- else if (type === 'ai') role = 'assistant';
- else if (type === 'system') role = 'system';
- return {
- role,
- content: msg.content,
- type, // Also store the LangChain type for easier hydration
- timestamp: (msg as any).timestamp || Date.now(),
- };
- });
- }
- }
|