feat: Enhance timestamp handling for backlog messages in GeminiAgent
- Capture original timestamps for messages processed from the backlog to ensure accurate logging. - Update processGeminiResponse to accept an original timestamp parameter, allowing for correct observation storage times. - Modify observation and summary processing to utilize the original timestamp when available, improving data integrity.
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -168,12 +168,16 @@ export class GeminiAgent {
|
|||||||
session.cumulativeInputTokens += Math.floor(tokensUsed * 0.7); // Rough estimate
|
session.cumulativeInputTokens += Math.floor(tokensUsed * 0.7); // Rough estimate
|
||||||
session.cumulativeOutputTokens += Math.floor(tokensUsed * 0.3);
|
session.cumulativeOutputTokens += Math.floor(tokensUsed * 0.3);
|
||||||
|
|
||||||
// Process response
|
// Process response (no original timestamp for init - not from queue)
|
||||||
await this.processGeminiResponse(session, initResponse.content, worker, tokensUsed);
|
await this.processGeminiResponse(session, initResponse.content, worker, tokensUsed, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process pending messages
|
// Process pending messages
|
||||||
for await (const message of this.sessionManager.getMessageIterator(session.sessionDbId)) {
|
for await (const message of this.sessionManager.getMessageIterator(session.sessionDbId)) {
|
||||||
|
// Capture earliest timestamp BEFORE processing (will be cleared after)
|
||||||
|
// This ensures backlog messages get their original timestamps, not current time
|
||||||
|
const originalTimestamp = session.earliestPendingTimestamp;
|
||||||
|
|
||||||
if (message.type === 'observation') {
|
if (message.type === 'observation') {
|
||||||
// Update last prompt number
|
// Update last prompt number
|
||||||
if (message.prompt_number !== undefined) {
|
if (message.prompt_number !== undefined) {
|
||||||
@@ -186,7 +190,7 @@ export class GeminiAgent {
|
|||||||
tool_name: message.tool_name!,
|
tool_name: message.tool_name!,
|
||||||
tool_input: JSON.stringify(message.tool_input),
|
tool_input: JSON.stringify(message.tool_input),
|
||||||
tool_output: JSON.stringify(message.tool_response),
|
tool_output: JSON.stringify(message.tool_response),
|
||||||
created_at_epoch: Date.now(),
|
created_at_epoch: originalTimestamp ?? Date.now(),
|
||||||
cwd: message.cwd
|
cwd: message.cwd
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -201,7 +205,7 @@ export class GeminiAgent {
|
|||||||
const tokensUsed = obsResponse.tokensUsed || 0;
|
const tokensUsed = obsResponse.tokensUsed || 0;
|
||||||
session.cumulativeInputTokens += Math.floor(tokensUsed * 0.7);
|
session.cumulativeInputTokens += Math.floor(tokensUsed * 0.7);
|
||||||
session.cumulativeOutputTokens += Math.floor(tokensUsed * 0.3);
|
session.cumulativeOutputTokens += Math.floor(tokensUsed * 0.3);
|
||||||
await this.processGeminiResponse(session, obsResponse.content, worker, tokensUsed);
|
await this.processGeminiResponse(session, obsResponse.content, worker, tokensUsed, originalTimestamp);
|
||||||
} else {
|
} else {
|
||||||
// Empty response - still mark messages as processed to avoid stuck state
|
// Empty response - still mark messages as processed to avoid stuck state
|
||||||
logger.warn('SDK', 'Empty Gemini response for observation, marking as processed', {
|
logger.warn('SDK', 'Empty Gemini response for observation, marking as processed', {
|
||||||
@@ -233,7 +237,7 @@ export class GeminiAgent {
|
|||||||
const tokensUsed = summaryResponse.tokensUsed || 0;
|
const tokensUsed = summaryResponse.tokensUsed || 0;
|
||||||
session.cumulativeInputTokens += Math.floor(tokensUsed * 0.7);
|
session.cumulativeInputTokens += Math.floor(tokensUsed * 0.7);
|
||||||
session.cumulativeOutputTokens += Math.floor(tokensUsed * 0.3);
|
session.cumulativeOutputTokens += Math.floor(tokensUsed * 0.3);
|
||||||
await this.processGeminiResponse(session, summaryResponse.content, worker, tokensUsed);
|
await this.processGeminiResponse(session, summaryResponse.content, worker, tokensUsed, originalTimestamp);
|
||||||
} else {
|
} else {
|
||||||
// Empty response - still mark messages as processed to avoid stuck state
|
// Empty response - still mark messages as processed to avoid stuck state
|
||||||
logger.warn('SDK', 'Empty Gemini response for summary, marking as processed', {
|
logger.warn('SDK', 'Empty Gemini response for summary, marking as processed', {
|
||||||
@@ -355,24 +359,27 @@ export class GeminiAgent {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Process Gemini response (same format as Claude)
|
* Process Gemini response (same format as Claude)
|
||||||
|
* @param originalTimestamp - Original epoch when message was queued (for backlog processing accuracy)
|
||||||
*/
|
*/
|
||||||
private async processGeminiResponse(
|
private async processGeminiResponse(
|
||||||
session: ActiveSession,
|
session: ActiveSession,
|
||||||
text: string,
|
text: string,
|
||||||
worker: any | undefined,
|
worker: any | undefined,
|
||||||
discoveryTokens: number
|
discoveryTokens: number,
|
||||||
|
originalTimestamp: number | null
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
// Parse observations (same XML format)
|
// Parse observations (same XML format)
|
||||||
const observations = parseObservations(text, session.claudeSessionId);
|
const observations = parseObservations(text, session.claudeSessionId);
|
||||||
|
|
||||||
// Store observations
|
// Store observations with original timestamp (if processing backlog) or current time
|
||||||
for (const obs of observations) {
|
for (const obs of observations) {
|
||||||
const { id: obsId, createdAtEpoch } = this.dbManager.getSessionStore().storeObservation(
|
const { id: obsId, createdAtEpoch } = this.dbManager.getSessionStore().storeObservation(
|
||||||
session.claudeSessionId,
|
session.claudeSessionId,
|
||||||
session.project,
|
session.project,
|
||||||
obs,
|
obs,
|
||||||
session.lastPromptNumber,
|
session.lastPromptNumber,
|
||||||
discoveryTokens
|
discoveryTokens,
|
||||||
|
originalTimestamp ?? undefined
|
||||||
);
|
);
|
||||||
|
|
||||||
logger.info('SDK', 'Gemini observation saved', {
|
logger.info('SDK', 'Gemini observation saved', {
|
||||||
@@ -439,7 +446,8 @@ export class GeminiAgent {
|
|||||||
session.project,
|
session.project,
|
||||||
summaryForStore,
|
summaryForStore,
|
||||||
session.lastPromptNumber,
|
session.lastPromptNumber,
|
||||||
discoveryTokens
|
discoveryTokens,
|
||||||
|
originalTimestamp ?? undefined
|
||||||
);
|
);
|
||||||
|
|
||||||
logger.info('SDK', 'Gemini summary saved', {
|
logger.info('SDK', 'Gemini summary saved', {
|
||||||
|
|||||||
Reference in New Issue
Block a user