Enhance SDKAgent response handling and message processing
- Updated response logging to process both empty and non-empty responses. - Added functionality to mark messages as processed even when the response is empty. - Refactored message processing logic to ensure all pending messages are marked as processed after successful observation/summary storage. - Introduced a new private method `markMessagesProcessed` to encapsulate the logic for marking messages as processed, preventing message loss and duplicate processing.
This commit is contained in:
File diff suppressed because one or more lines are too long
Binary file not shown.
@@ -113,7 +113,7 @@ export class SDKAgent {
|
|||||||
// Calculate discovery tokens (delta for this response only)
|
// Calculate discovery tokens (delta for this response only)
|
||||||
const discoveryTokens = (session.cumulativeInputTokens + session.cumulativeOutputTokens) - tokensBeforeResponse;
|
const discoveryTokens = (session.cumulativeInputTokens + session.cumulativeOutputTokens) - tokensBeforeResponse;
|
||||||
|
|
||||||
// Only log non-empty responses (filter out noise)
|
// Process response (empty or not) and mark messages as processed
|
||||||
if (responseSize > 0) {
|
if (responseSize > 0) {
|
||||||
const truncatedResponse = responseSize > 100
|
const truncatedResponse = responseSize > 100
|
||||||
? textContent.substring(0, 100) + '...'
|
? textContent.substring(0, 100) + '...'
|
||||||
@@ -125,6 +125,9 @@ export class SDKAgent {
|
|||||||
|
|
||||||
// Parse and process response with discovery token delta
|
// Parse and process response with discovery token delta
|
||||||
await this.processSDKResponse(session, textContent, worker, discoveryTokens);
|
await this.processSDKResponse(session, textContent, worker, discoveryTokens);
|
||||||
|
} else {
|
||||||
|
// Empty response - still need to mark pending messages as processed
|
||||||
|
await this.markMessagesProcessed(session, worker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -396,8 +399,15 @@ export class SDKAgent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CRITICAL: Mark ALL pending messages as successfully processed
|
// Mark messages as processed after successful observation/summary storage
|
||||||
// This prevents message loss if worker crashes before SDK finishes
|
await this.markMessagesProcessed(session, worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark all pending messages as successfully processed
|
||||||
|
* CRITICAL: Prevents message loss and duplicate processing
|
||||||
|
*/
|
||||||
|
private async markMessagesProcessed(session: ActiveSession, worker: any | undefined): Promise<void> {
|
||||||
const pendingMessageStore = this.sessionManager.getPendingMessageStore();
|
const pendingMessageStore = this.sessionManager.getPendingMessageStore();
|
||||||
if (session.pendingProcessingIds.size > 0) {
|
if (session.pendingProcessingIds.size > 0) {
|
||||||
for (const messageId of session.pendingProcessingIds) {
|
for (const messageId of session.pendingProcessingIds) {
|
||||||
|
|||||||
Reference in New Issue
Block a user