fixing age graph bugs #2506

Merged
mfreeman451 merged 2 commits from refs/pull/2506/head into main 2025-12-04 16:21:18 +00:00
mfreeman451 commented 2025-12-04 16:03:43 +00:00 (Migrated from github.com)
Owner

Imported from GitHub pull request.

Original GitHub pull request: #2056
Original author: @mfreeman451
Original URL: https://github.com/carverauto/serviceradar/pull/2056
Original created: 2025-12-04T16:03:43Z
Original updated: 2025-12-04T16:21:30Z
Original head: carverauto/serviceradar:bug/age_merge_failed
Original base: main
Original merged: 2025-12-04T16:21:18Z by @mfreeman451

User description

IMPORTANT: Please sign the Developer Certificate of Origin

Thank you for your contribution to ServiceRadar. Please note, when contributing, the developer must include
a DCO sign-off statement indicating the DCO acceptance in one commit message. Here
is an example DCO Signed-off-by line in a commit message:

Signed-off-by: J. Doe <j.doe@domain.com>

Describe your changes

Code checklist before requesting a review

  • I have signed the DCO?
  • The build completes without errors?
  • All tests are passing when running make test?

PR Type

Bug fix, Enhancement


Description

  • Implement bounded worker queue for AGE graph writes with retry logic for transient errors

  • Add queue depth/capacity metrics and structured logging for contention diagnostics

  • Chunk large batches and serialize writes to prevent concurrent MERGE conflicts

  • Support environment-based configuration for queue size, chunk size, workers, and timeout

  • Update TimescaleDB to 2.24.0 and CNPG image hash for stability

  • Document AGE contention troubleshooting in runbook


Diagram Walkthrough

flowchart LR
  A["AGE Write Requests<br/>Graph/Interfaces/Topology"] -->|chunk| B["Chunking<br/>by size"]
  B -->|enqueue| C["Bounded Work Queue<br/>512 default"]
  C -->|worker pool| D["Process Request<br/>with Retries"]
  D -->|transient error| E["Backoff & Retry<br/>XX000/57014"]
  E -->|success| F["Record Success<br/>& Metrics"]
  D -->|persistent error| G["Record Failure<br/>& Log"]
  F -->|decrement| H["Queue Depth Metric"]
  G -->|decrement| H
  H -->|export| I["OTel Metrics<br/>queue_depth/capacity"]

File Walkthrough

Relevant files
Enhancement
2 files
age_graph_metrics.go
Add queue depth and capacity metrics                                         
+48/-3   
age_graph_writer.go
Implement bounded queue with retry logic                                 
+355/-57
Dependencies
2 files
MODULE.bazel
Upgrade TimescaleDB to 2.24.0                                                       
+3/-3     
BUILD.bazel
Add pgconn dependency for error classification                     
+1/-0     
Configuration changes
5 files
BUILD.bazel
Set TimescaleDB version config to 2.24.0-dev                         
+5/-0     
core.yaml
Add imagePullPolicy Always for core container                       
+1/-0     
values.yaml
Update app tag and CNPG image hash                                             
+2/-2     
cnpg-cluster.yaml
Update CNPG image hash for consistency                                     
+1/-1     
cnpg-cluster.yaml
Update CNPG image hash for consistency                                     
+1/-1     
Documentation
4 files
age-graph-readiness.md
Document queue contention and backpressure checks               
+4/-0     
proposal.md
Add change proposal for AGE stabilization                               
+13/-0   
spec.md
Define AGE contention tolerance requirements                         
+18/-0   
tasks.md
Outline implementation tasks for AGE stabilization             
+13/-0   

Imported from GitHub pull request. Original GitHub pull request: #2056 Original author: @mfreeman451 Original URL: https://github.com/carverauto/serviceradar/pull/2056 Original created: 2025-12-04T16:03:43Z Original updated: 2025-12-04T16:21:30Z Original head: carverauto/serviceradar:bug/age_merge_failed Original base: main Original merged: 2025-12-04T16:21:18Z by @mfreeman451 --- ### **User description** ## IMPORTANT: Please sign the Developer Certificate of Origin Thank you for your contribution to ServiceRadar. Please note, when contributing, the developer must include a [DCO sign-off statement]( https://developercertificate.org/) indicating the DCO acceptance in one commit message. Here is an example DCO Signed-off-by line in a commit message: ``` Signed-off-by: J. Doe <j.doe@domain.com> ``` ## Describe your changes ## Issue ticket number and link ## Code checklist before requesting a review - [ ] I have signed the DCO? - [ ] The build completes without errors? - [ ] All tests are passing when running make test? ___ ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Implement bounded worker queue for AGE graph writes with retry logic for transient errors - Add queue depth/capacity metrics and structured logging for contention diagnostics - Chunk large batches and serialize writes to prevent concurrent MERGE conflicts - Support environment-based configuration for queue size, chunk size, workers, and timeout - Update TimescaleDB to 2.24.0 and CNPG image hash for stability - Document AGE contention troubleshooting in runbook ___ ### Diagram Walkthrough ```mermaid flowchart LR A["AGE Write Requests<br/>Graph/Interfaces/Topology"] -->|chunk| B["Chunking<br/>by size"] B -->|enqueue| C["Bounded Work Queue<br/>512 default"] C -->|worker pool| D["Process Request<br/>with Retries"] D -->|transient error| E["Backoff & Retry<br/>XX000/57014"] E -->|success| F["Record Success<br/>& Metrics"] D -->|persistent error| G["Record Failure<br/>& Log"] F -->|decrement| H["Queue Depth Metric"] G -->|decrement| H H -->|export| I["OTel Metrics<br/>queue_depth/capacity"] ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Enhancement</strong></td><td><details><summary>2 files</summary><table> <tr> <td><strong>age_graph_metrics.go</strong><dd><code>Add queue depth and capacity metrics</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-2d400ecd96ff77c1379e4a8681763f9ba2dc22790e31a445b7e3ce716e8fbb15">+48/-3</a>&nbsp; &nbsp; </td> </tr> <tr> <td><strong>age_graph_writer.go</strong><dd><code>Implement bounded queue with retry logic</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-2e6c0a5c048582201ffb983889ab9fab6fb1f8d64b91ee3fb65479b2fb2c6195">+355/-57</a></td> </tr> </table></details></td></tr><tr><td><strong>Dependencies</strong></td><td><details><summary>2 files</summary><table> <tr> <td><strong>MODULE.bazel</strong><dd><code>Upgrade TimescaleDB to 2.24.0</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-6136fc12446089c3db7360e923203dd114b6a1466252e71667c6791c20fe6bdc">+3/-3</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>BUILD.bazel</strong><dd><code>Add pgconn dependency for error classification</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-3e838afce9a84935e04b7ff8fd3e48d5452c21538a3ea1d36e3fd00aa3c30cd0">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Configuration changes</strong></td><td><details><summary>5 files</summary><table> <tr> <td><strong>BUILD.bazel</strong><dd><code>Set TimescaleDB version config to 2.24.0-dev</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-0e4db31c224a8f72ae8e870a849e38a59d74a2c7f7b04347b0b3eb07e20c5a80">+5/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>core.yaml</strong><dd><code>Add imagePullPolicy Always for core container</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-06ab387d2c169d82a1de28b5e66c86f0417bd81b82a96246d0a2da8bfaa8d224">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>values.yaml</strong><dd><code>Update app tag and CNPG image hash</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-d4449c7cb70362554b274f81eae5a4b81a8e81df494282e383d1b7ea3871c452">+2/-2</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>cnpg-cluster.yaml</strong><dd><code>Update CNPG image hash for consistency</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-7295774b8f05fee8f0f2b054f94381aa4c2581344117e9386f62c50baf64de53">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>cnpg-cluster.yaml</strong><dd><code>Update CNPG image hash for consistency</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-760bce255f2a46e053d34785cde7ee863372ea085f28f42b45858e2ca066079f">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Documentation</strong></td><td><details><summary>4 files</summary><table> <tr> <td><strong>age-graph-readiness.md</strong><dd><code>Document queue contention and backpressure checks</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-64fec6b9d2d878e0b9ffdf75868a063dd73d0980551622f567781240d41afbc2">+4/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>proposal.md</strong><dd><code>Add change proposal for AGE stabilization</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-968a8f1179b06f94ed06222b9573504ea35231fbf4d375382f5fb9ff1077da4b">+13/-0</a>&nbsp; &nbsp; </td> </tr> <tr> <td><strong>spec.md</strong><dd><code>Define AGE contention tolerance requirements</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-ba711c0d2510f04d571eb1563b2b74503ab7650858ec59a783d30f63cbaca5c5">+18/-0</a>&nbsp; &nbsp; </td> </tr> <tr> <td><strong>tasks.md</strong><dd><code>Outline implementation tasks for AGE stabilization</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-d14503fc2592505c92c5238901099d8c04725b3bb46594cb8e890b3b2a5fc591">+13/-0</a>&nbsp; &nbsp; </td> </tr> </table></details></td></tr></tr></tbody></table> </details> ___
qodo-code-review[bot] commented 2025-12-04 16:04:21 +00:00 (Migrated from github.com)
Author
Owner

Imported GitHub PR comment.

Original author: @qodo-code-review[bot]
Original URL: https://github.com/carverauto/serviceradar/pull/2056#issuecomment-3612976345
Original created: 2025-12-04T16:04:21Z

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Action Logging: New critical actions (queued AGE writes, retries, failures) add structured logs but do not
clearly include a user ID or actor context, so auditability of who initiated changes is
uncertain.

Referred Code
for _, chunk := range chunkTopology(events, w.chunkSize) {
	payload := buildTopologyParams(chunk)
	if len(payload) == 0 {
		continue
	}

	data, err := json.Marshal(map[string]any{"links": payload})
	if err != nil {
		if w.log != nil {
			w.log.Warn().Err(err).Msg("age graph: failed to marshal topology payload")
		}
		w.recordFailure()
		continue
	}

	if err := w.enqueue(ctx, "topology", len(chunk), ageTopologyMergeQuery, string(data)); err != nil {
		w.recordFailure()
		if w.log != nil {
			w.log.Warn().Err(err).
				Int("batch_size", len(chunk)).
				Msg("age graph: failed to queue topology links")


 ... (clipped 17 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status:
Error Detail Exposure: Logs include database SQLSTATE codes and raw error messages which are useful internally
but may expose internal details if any are surfaced to users; confirm these logs are not
user-facing.

Referred Code
			Int("max_attempts", attempts).
			Float64("queue_wait_secs", time.Since(req.enqueuedAt).Seconds()).
			Dur("backoff", delay).
			Msg("age graph: transient merge failure, will retry")
	}
	time.Sleep(delay)
	continue
}

if w.log != nil {
	w.log.Warn().
		Err(err).
		Str("age_sqlstate", code).
		Str("kind", req.kind).
		Int("batch_size", req.size).
		Int("attempt", attempt).
		Int("max_attempts", attempts).
		Float64("queue_wait_secs", time.Since(req.enqueuedAt).Seconds()).
		Float64("exec_secs", time.Since(start).Seconds()).
		Int64("queue_depth", currentAgeQueueDepth()).
		Int64("queue_capacity", currentAgeQueueCapacity()).


 ... (clipped 2 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Input Validation: The writer forwards marshaled JSON payloads and classifies errors but does not show
explicit validation/sanitization of external inputs within this diff; ensure upstream
models and executor parameterization prevent injection.

Referred Code
func (w *ageGraphWriter) enqueue(ctx context.Context, kind string, size int, query, payload string) error {
	if w == nil || w.executor == nil {
		return errors.New("age graph writer unavailable")
	}

	baseCtx := context.WithoutCancel(ctx)
	reqCtx, cancel := context.WithTimeout(baseCtx, w.requestTimeout)
	defer cancel()

	req := &ageGraphRequest{
		ctx:        reqCtx,
		kind:       kind,
		size:       size,
		payload:    payload,
		query:      query,
		result:     make(chan error, 1),
		enqueuedAt: time.Now(),
	}

	select {
	case w.workQueue <- req:


 ... (clipped 35 lines)

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
- Requires Further Human Verification
🏷️ - Compliance label
Imported GitHub PR comment. Original author: @qodo-code-review[bot] Original URL: https://github.com/carverauto/serviceradar/pull/2056#issuecomment-3612976345 Original created: 2025-12-04T16:04:21Z --- ## PR Compliance Guide 🔍 <!-- https://github.com/carverauto/serviceradar/commit/3fa10c7a2355eb7a0d996c57beefd8ed9a1d82b4 --> Below is a summary of compliance checks for this PR:<br> <table><tbody><tr><td colspan='2'><strong>Security Compliance</strong></td></tr> <tr><td>🟢</td><td><details><summary><strong>No security concerns identified</strong></summary> No security vulnerabilities detected by AI analysis. Human verification advised for critical code. </details></td></tr> <tr><td colspan='2'><strong>Ticket Compliance</strong></td></tr> <tr><td>⚪</td><td><details><summary>🎫 <strong>No ticket provided </strong></summary> - [ ] Create ticket/issue <!-- /create_ticket --create_ticket=true --> </details></td></tr> <tr><td colspan='2'><strong>Codebase Duplication Compliance</strong></td></tr> <tr><td>⚪</td><td><details><summary><strong>Codebase context is not defined </strong></summary> Follow the <a href='https://qodo-merge-docs.qodo.ai/core-abilities/rag_context_enrichment/'>guide</a> to enable codebase context checks. </details></td></tr> <tr><td colspan='2'><strong>Custom Compliance</strong></td></tr> <tr><td rowspan=3>🟢</td><td> <details><summary><strong>Generic: Meaningful Naming and Self-Documenting Code</strong></summary><br> **Objective:** Ensure all identifiers clearly express their purpose and intent, making code <br>self-documenting<br> **Status:** Passed<br> > Learn more about managing compliance <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#configuration-options'>generic rules</a> or creating your own <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#custom-compliance'>custom rules</a> </details></td></tr> <tr><td> <details><summary><strong>Generic: Robust Error Handling and Edge Case Management</strong></summary><br> **Objective:** Ensure comprehensive error handling that provides meaningful context and graceful <br>degradation<br> **Status:** Passed<br> > Learn more about managing compliance <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#configuration-options'>generic rules</a> or creating your own <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#custom-compliance'>custom rules</a> </details></td></tr> <tr><td> <details><summary><strong>Generic: Secure Logging Practices</strong></summary><br> **Objective:** To ensure logs are useful for debugging and auditing without exposing sensitive <br>information like PII, PHI, or cardholder data.<br> **Status:** Passed<br> > Learn more about managing compliance <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#configuration-options'>generic rules</a> or creating your own <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#custom-compliance'>custom rules</a> </details></td></tr> <tr><td rowspan=3>⚪</td> <td><details> <summary><strong>Generic: Comprehensive Audit Trails</strong></summary><br> **Objective:** To create a detailed and reliable record of critical system actions for security analysis <br>and compliance.<br> **Status:** <br><a href='https://github.com/carverauto/serviceradar/pull/2056/files#diff-2e6c0a5c048582201ffb983889ab9fab6fb1f8d64b91ee3fb65479b2fb2c6195R321-R358'><strong>Action Logging</strong></a>: New critical actions (queued AGE writes, retries, failures) add structured logs but do not <br>clearly include a user ID or actor context, so auditability of who initiated changes is <br>uncertain.<br> <details open><summary>Referred Code</summary> ```go for _, chunk := range chunkTopology(events, w.chunkSize) { payload := buildTopologyParams(chunk) if len(payload) == 0 { continue } data, err := json.Marshal(map[string]any{"links": payload}) if err != nil { if w.log != nil { w.log.Warn().Err(err).Msg("age graph: failed to marshal topology payload") } w.recordFailure() continue } if err := w.enqueue(ctx, "topology", len(chunk), ageTopologyMergeQuery, string(data)); err != nil { w.recordFailure() if w.log != nil { w.log.Warn().Err(err). Int("batch_size", len(chunk)). Msg("age graph: failed to queue topology links") ... (clipped 17 lines) ``` </details> > Learn more about managing compliance <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#configuration-options'>generic rules</a> or creating your own <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#custom-compliance'>custom rules</a> </details></td></tr> <tr><td><details> <summary><strong>Generic: Secure Error Handling</strong></summary><br> **Objective:** To prevent the leakage of sensitive system information through error messages while <br>providing sufficient detail for internal debugging.<br> **Status:** <br><a href='https://github.com/carverauto/serviceradar/pull/2056/files#diff-2e6c0a5c048582201ffb983889ab9fab6fb1f8d64b91ee3fb65479b2fb2c6195R836-R858'><strong>Error Detail Exposure</strong></a>: Logs include database SQLSTATE codes and raw error messages which are useful internally <br>but may expose internal details if any are surfaced to users; confirm these logs are not <br>user-facing.<br> <details open><summary>Referred Code</summary> ```go Int("max_attempts", attempts). Float64("queue_wait_secs", time.Since(req.enqueuedAt).Seconds()). Dur("backoff", delay). Msg("age graph: transient merge failure, will retry") } time.Sleep(delay) continue } if w.log != nil { w.log.Warn(). Err(err). Str("age_sqlstate", code). Str("kind", req.kind). Int("batch_size", req.size). Int("attempt", attempt). Int("max_attempts", attempts). Float64("queue_wait_secs", time.Since(req.enqueuedAt).Seconds()). Float64("exec_secs", time.Since(start).Seconds()). Int64("queue_depth", currentAgeQueueDepth()). Int64("queue_capacity", currentAgeQueueCapacity()). ... (clipped 2 lines) ``` </details> > Learn more about managing compliance <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#configuration-options'>generic rules</a> or creating your own <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#custom-compliance'>custom rules</a> </details></td></tr> <tr><td><details> <summary><strong>Generic: Security-First Input Validation and Data Handling</strong></summary><br> **Objective:** Ensure all data inputs are validated, sanitized, and handled securely to prevent <br>vulnerabilities<br> **Status:** <br><a href='https://github.com/carverauto/serviceradar/pull/2056/files#diff-2e6c0a5c048582201ffb983889ab9fab6fb1f8d64b91ee3fb65479b2fb2c6195R768-R823'><strong>Input Validation</strong></a>: The writer forwards marshaled JSON payloads and classifies errors but does not show <br>explicit validation/sanitization of external inputs within this diff; ensure upstream <br>models and executor parameterization prevent injection.<br> <details open><summary>Referred Code</summary> ```go func (w *ageGraphWriter) enqueue(ctx context.Context, kind string, size int, query, payload string) error { if w == nil || w.executor == nil { return errors.New("age graph writer unavailable") } baseCtx := context.WithoutCancel(ctx) reqCtx, cancel := context.WithTimeout(baseCtx, w.requestTimeout) defer cancel() req := &ageGraphRequest{ ctx: reqCtx, kind: kind, size: size, payload: payload, query: query, result: make(chan error, 1), enqueuedAt: time.Now(), } select { case w.workQueue <- req: ... (clipped 35 lines) ``` </details> > Learn more about managing compliance <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#configuration-options'>generic rules</a> or creating your own <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/#custom-compliance'>custom rules</a> </details></td></tr> <tr><td align="center" colspan="2"> - [ ] Update <!-- /compliance --update_compliance=true --> </td></tr></tbody></table> <details><summary>Compliance status legend</summary> 🟢 - Fully Compliant<br> 🟡 - Partial Compliant<br> 🔴 - Not Compliant<br> ⚪ - Requires Further Human Verification<br> 🏷️ - Compliance label<br> </details>
qodo-code-review[bot] commented 2025-12-04 16:05:38 +00:00 (Migrated from github.com)
Author
Owner

Imported GitHub PR comment.

Original author: @qodo-code-review[bot]
Original URL: https://github.com/carverauto/serviceradar/pull/2056#issuecomment-3612981418
Original created: 2025-12-04T16:05:38Z

PR Code Suggestions

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Refactor backfill tool to use queue

The age-backfill tool should be updated to use the new queuing system for AGE
graph writes. This change is necessary to fulfill the PR's design requirement of
coordinating backfill operations with live data ingestion to prevent database
contention.

Examples:

openspec/changes/stabilize-age-graph-ingestion/tasks.md [11]
- [ ] 3.1 Route age-backfill through the same queue or add coordination (mutex/flag) so rebuilds cannot run concurrent MERGEs against live ingestion.
pkg/registry/age_graph_writer.go [152-196]
func (w *ageGraphWriter) startWorker() {
	w.queueStarted.Do(func() {
		setAgeQueueCapacity(int64(w.queueCapacity))
		for i := 0; i < w.workerCount; i++ {
			go func() {
				for req := range w.workQueue {
					processErr := w.processRequest(req)
					req.result <- processErr
				}
			}()

 ... (clipped 35 lines)

Solution Walkthrough:

Before:

// In cmd/tools/age-backfill/main.go (current state in PR)

func backfill(...) {
  // ... setup DB connection pool ...
  dbExecutor := &poolExecutor{pool: pool}

  // The backfill tool uses a GraphWriter that writes directly to the DB,
  // or worse, writes directly without using the GraphWriter interface.
  // This bypasses the new queuing mechanism.
  graphWriter := registry.NewAGEGraphWriter(dbExecutor, log) // This now returns a queued writer, but the tool might not be using it correctly or at all for its writes.

  // ... fetch data to backfill ...

  // Writes are sent directly, not through the queue.
  graphWriter.WriteGraph(ctx, deviceUpdates)
  graphWriter.WriteInterfaces(ctx, interfaces)
  graphWriter.WriteTopology(ctx, topologyEvents)
}

After:

// In cmd/tools/age-backfill/main.go (suggested change)

func backfill(...) {
  // ... setup DB connection pool ...
  dbExecutor := &poolExecutor{pool: pool}

  // The backfill tool MUST use the new queuing GraphWriter.
  // The NewAGEGraphWriter constructor already sets up the queue.
  graphWriter := registry.NewAGEGraphWriter(dbExecutor, log)

  // ... fetch data to backfill ...

  // Now, when these methods are called, the work is correctly
  // chunked and enqueued, coordinating with other live writes.
  graphWriter.WriteGraph(ctx, deviceUpdates)
  graphWriter.WriteInterfaces(ctx, interfaces)
  graphWriter.WriteTopology(ctx, topologyEvents)

  // The tool might need to wait for the queue to drain before exiting.
}

Suggestion importance[1-10]: 9

__

Why: This suggestion correctly identifies a critical omission where the age-backfill tool is not integrated with the new queuing system, directly contradicting a key requirement in the PR's design documents and undermining the primary goal of preventing write contention.

High
General
Make retry backoff context-aware

Replace time.Sleep(delay) with a context-aware timer in processRequest to allow
the retry backoff to be interrupted by context cancellation.

pkg/registry/age_graph_writer.go [804-867]

 func (w *ageGraphWriter) processRequest(req *ageGraphRequest) error {
 	defer incrementAgeQueueDepth(-1)
 
 	if req == nil {
 		return nil
 	}
 
 	attempts := defaultAgeGraphMaxAttempts
 	var lastErr error
 	for attempt := 1; attempt <= attempts; attempt++ {
 		if req.ctx.Err() != nil {
 			return req.ctx.Err()
 		}
 
 		start := time.Now()
 		_, err := w.executor.ExecuteQuery(req.ctx, req.query, req.payload)
 		if err == nil {
 			w.recordSuccess()
 			return nil
 		}
 
 		lastErr = err
 		code, transient := classifyAGEError(err)
 		if transient && attempt < attempts {
 			delay := backoffDelay(attempt)
 			if w.log != nil {
 ...
 			}
-			time.Sleep(delay)
+			timer := time.NewTimer(delay)
+			select {
+			case <-timer.C:
+				// continue to next attempt
+			case <-req.ctx.Done():
+				if !timer.Stop() {
+					<-timer.C
+				}
+				return req.ctx.Err()
+			}
 			continue
 		}
 
 		if w.log != nil {
 ...
 		}
 		w.recordFailure()
 		return err
 	}
 
 	if lastErr != nil {
 		w.recordFailure()
 	}
 	return lastErr
 }

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that time.Sleep is not context-aware and proposes a valid improvement to make the retry backoff responsive to context cancellation, improving shutdown and timeout handling.

Medium
  • Update
Imported GitHub PR comment. Original author: @qodo-code-review[bot] Original URL: https://github.com/carverauto/serviceradar/pull/2056#issuecomment-3612981418 Original created: 2025-12-04T16:05:38Z --- ## PR Code Suggestions ✨ <!-- 3fa10c7 --> Explore these optional code suggestions: <table><thead><tr><td><strong>Category</strong></td><td align=left><strong>Suggestion&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </strong></td><td align=center><strong>Impact</strong></td></tr><tbody><tr><td rowspan=1>High-level</td> <td> <details><summary>Refactor backfill tool to use queue<!-- not_implemented --></summary> ___ **The <code>age-backfill</code> tool should be updated to use the new queuing system for AGE <br>graph writes. This change is necessary to fulfill the PR's design requirement of <br>coordinating backfill operations with live data ingestion to prevent database <br>contention.** ### Examples: <details> <summary> <a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-d14503fc2592505c92c5238901099d8c04725b3bb46594cb8e890b3b2a5fc591R11-R11">openspec/changes/stabilize-age-graph-ingestion/tasks.md [11]</a> </summary> ```markdown - [ ] 3.1 Route age-backfill through the same queue or add coordination (mutex/flag) so rebuilds cannot run concurrent MERGEs against live ingestion. ``` </details> <details> <summary> <a href="https://github.com/carverauto/serviceradar/pull/2056/files#diff-2e6c0a5c048582201ffb983889ab9fab6fb1f8d64b91ee3fb65479b2fb2c6195R152-R196">pkg/registry/age_graph_writer.go [152-196]</a> </summary> ```go func (w *ageGraphWriter) startWorker() { w.queueStarted.Do(func() { setAgeQueueCapacity(int64(w.queueCapacity)) for i := 0; i < w.workerCount; i++ { go func() { for req := range w.workQueue { processErr := w.processRequest(req) req.result <- processErr } }() ... (clipped 35 lines) ``` </details> ### Solution Walkthrough: #### Before: ```go // In cmd/tools/age-backfill/main.go (current state in PR) func backfill(...) { // ... setup DB connection pool ... dbExecutor := &poolExecutor{pool: pool} // The backfill tool uses a GraphWriter that writes directly to the DB, // or worse, writes directly without using the GraphWriter interface. // This bypasses the new queuing mechanism. graphWriter := registry.NewAGEGraphWriter(dbExecutor, log) // This now returns a queued writer, but the tool might not be using it correctly or at all for its writes. // ... fetch data to backfill ... // Writes are sent directly, not through the queue. graphWriter.WriteGraph(ctx, deviceUpdates) graphWriter.WriteInterfaces(ctx, interfaces) graphWriter.WriteTopology(ctx, topologyEvents) } ``` #### After: ```go // In cmd/tools/age-backfill/main.go (suggested change) func backfill(...) { // ... setup DB connection pool ... dbExecutor := &poolExecutor{pool: pool} // The backfill tool MUST use the new queuing GraphWriter. // The NewAGEGraphWriter constructor already sets up the queue. graphWriter := registry.NewAGEGraphWriter(dbExecutor, log) // ... fetch data to backfill ... // Now, when these methods are called, the work is correctly // chunked and enqueued, coordinating with other live writes. graphWriter.WriteGraph(ctx, deviceUpdates) graphWriter.WriteInterfaces(ctx, interfaces) graphWriter.WriteTopology(ctx, topologyEvents) // The tool might need to wait for the queue to drain before exiting. } ``` <details><summary>Suggestion importance[1-10]: 9</summary> __ Why: This suggestion correctly identifies a critical omission where the `age-backfill` tool is not integrated with the new queuing system, directly contradicting a key requirement in the PR's design documents and undermining the primary goal of preventing write contention. </details></details></td><td align=center>High </td></tr><tr><td rowspan=1>General</td> <td> <details><summary>Make retry backoff context-aware<!-- not_implemented --></summary> ___ **Replace <code>time.Sleep(delay)</code> with a context-aware timer in <code>processRequest</code> to allow <br>the retry backoff to be interrupted by context cancellation.** [pkg/registry/age_graph_writer.go [804-867]](https://github.com/carverauto/serviceradar/pull/2056/files#diff-2e6c0a5c048582201ffb983889ab9fab6fb1f8d64b91ee3fb65479b2fb2c6195R804-R867) ```diff func (w *ageGraphWriter) processRequest(req *ageGraphRequest) error { defer incrementAgeQueueDepth(-1) if req == nil { return nil } attempts := defaultAgeGraphMaxAttempts var lastErr error for attempt := 1; attempt <= attempts; attempt++ { if req.ctx.Err() != nil { return req.ctx.Err() } start := time.Now() _, err := w.executor.ExecuteQuery(req.ctx, req.query, req.payload) if err == nil { w.recordSuccess() return nil } lastErr = err code, transient := classifyAGEError(err) if transient && attempt < attempts { delay := backoffDelay(attempt) if w.log != nil { ... } - time.Sleep(delay) + timer := time.NewTimer(delay) + select { + case <-timer.C: + // continue to next attempt + case <-req.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return req.ctx.Err() + } continue } if w.log != nil { ... } w.recordFailure() return err } if lastErr != nil { w.recordFailure() } return lastErr } ``` `[To ensure code accuracy, apply this suggestion manually]` <details><summary>Suggestion importance[1-10]: 7</summary> __ Why: The suggestion correctly identifies that `time.Sleep` is not context-aware and proposes a valid improvement to make the retry backoff responsive to context cancellation, improving shutdown and timeout handling. </details></details></td><td align=center>Medium </td></tr> <tr><td align="center" colspan="2"> - [ ] Update <!-- /improve_multi --more_suggestions=true --> </td><td></td></tr></tbody></table>
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
carverauto/serviceradar!2506
No description provided.