nats retry connection fixes #2330

Merged
mfreeman451 merged 2 commits from refs/pull/2330/head into main 2025-10-16 15:05:04 +00:00
mfreeman451 commented 2025-10-16 14:20:44 +00:00 (Migrated from github.com)
Owner

Imported from GitHub pull request.

Original GitHub pull request: #1788
Original author: @mfreeman451
Original URL: https://github.com/carverauto/serviceradar/pull/1788
Original created: 2025-10-16T14:20:44Z
Original updated: 2025-10-16T15:05:08Z
Original head: carverauto/serviceradar:bug/event-writer-failing-nats
Original base: main
Original merged: 2025-10-16T15:05:04Z by @mfreeman451

PR Type

Bug fix, Enhancement


Description

  • Enhanced NATS connection retry logic with fatal error detection and automatic reconnection

  • Added comprehensive error handling for context cancellation and connection failures

  • Introduced test coverage for consumer and service reconnection scenarios

  • Refactored service lifecycle management with improved shutdown and configuration updates


Diagram Walkthrough

flowchart LR
  A["ProcessMessages"] -- "detects fatal error" --> B["isFatalFetchError"]
  B -- "returns error" --> C["run loop"]
  C -- "resets connection" --> D["ensureConsumer"]
  D -- "reconnects" --> E["establishConnection"]
  E -- "creates new consumer" --> A

File Walkthrough

Relevant files
Error handling
consumer.go
Add error handling and fatal error detection to consumer 

pkg/consumers/db-event-writer/consumer.go

  • Changed ProcessMessages to return error instead of void for proper
    error propagation
  • Added pullConsumer interface to enable testing and dependency
    injection
  • Implemented isFatalFetchError and isContextError helper functions for
    error classification
  • Enhanced error handling with context-aware retry delays and fatal
    error detection
+51/-4   
Tests
consumer_test.go
Add unit tests for consumer fatal error handling                 

pkg/consumers/db-event-writer/consumer_test.go

  • Added fakePullConsumer and fakeMessageBatch test doubles for consumer
    testing
  • Implemented TestConsumerProcessMessagesReturnsFatalError to verify
    fatal error propagation
+57/-0   
service_test.go
Add integration test for service reconnection logic           

pkg/consumers/db-event-writer/service_test.go

  • Added TestServiceRunReconnectsAfterFatalError to verify reconnection
    behavior
  • Tests service recovery from fatal NATS errors with multiple connection
    attempts
+73/-0   
Enhancement
service.go
Implement automatic reconnection and lifecycle management

pkg/consumers/db-event-writer/service.go

  • Refactored Start to use ensureConsumer and run loop for automatic
    reconnection
  • Added connectFactory field for dependency injection and testability
  • Implemented run method with retry logic that handles fatal errors and
    reconnects
  • Added ensureConsumer, establishConnection, setConnection, and
    resetConnection for connection lifecycle management
  • Enhanced Stop with proper shutdown timeout and connection cleanup
  • Improved UpdateConfig to properly restart service with new
    configuration
  • Added sleepWithContext helper for context-aware delays
  • Refactored createConnection (formerly Start logic) with enhanced NATS
    connection handlers
+252/-83

Imported from GitHub pull request. Original GitHub pull request: #1788 Original author: @mfreeman451 Original URL: https://github.com/carverauto/serviceradar/pull/1788 Original created: 2025-10-16T14:20:44Z Original updated: 2025-10-16T15:05:08Z Original head: carverauto/serviceradar:bug/event-writer-failing-nats Original base: main Original merged: 2025-10-16T15:05:04Z by @mfreeman451 --- ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Enhanced NATS connection retry logic with fatal error detection and automatic reconnection - Added comprehensive error handling for context cancellation and connection failures - Introduced test coverage for consumer and service reconnection scenarios - Refactored service lifecycle management with improved shutdown and configuration updates ___ ### Diagram Walkthrough ```mermaid flowchart LR A["ProcessMessages"] -- "detects fatal error" --> B["isFatalFetchError"] B -- "returns error" --> C["run loop"] C -- "resets connection" --> D["ensureConsumer"] D -- "reconnects" --> E["establishConnection"] E -- "creates new consumer" --> A ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Error handling</strong></td><td><table> <tr> <td> <details> <summary><strong>consumer.go</strong><dd><code>Add error handling and fatal error detection to consumer</code>&nbsp; </dd></summary> <hr> pkg/consumers/db-event-writer/consumer.go <ul><li>Changed <code>ProcessMessages</code> to return error instead of void for proper <br>error propagation<br> <li> Added <code>pullConsumer</code> interface to enable testing and dependency <br>injection<br> <li> Implemented <code>isFatalFetchError</code> and <code>isContextError</code> helper functions for <br>error classification<br> <li> Enhanced error handling with context-aware retry delays and fatal <br>error detection</ul> </details> </td> <td><a href="https://github.com/carverauto/serviceradar/pull/1788/files#diff-0a2b2a9691c5d0f1ccdc9dd3d92b881cae9348c9d46c27dbcc3f069ed8c8891b">+51/-4</a>&nbsp; &nbsp; </td> </tr> </table></td></tr><tr><td><strong>Tests</strong></td><td><table> <tr> <td> <details> <summary><strong>consumer_test.go</strong><dd><code>Add unit tests for consumer fatal error handling</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> pkg/consumers/db-event-writer/consumer_test.go <ul><li>Added <code>fakePullConsumer</code> and <code>fakeMessageBatch</code> test doubles for consumer <br>testing<br> <li> Implemented <code>TestConsumerProcessMessagesReturnsFatalError</code> to verify <br>fatal error propagation</ul> </details> </td> <td><a href="https://github.com/carverauto/serviceradar/pull/1788/files#diff-1bad6bec56192d5c4688c9578b89ca9bcfd456bf9fb04a4bd14b295fba254975">+57/-0</a>&nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>service_test.go</strong><dd><code>Add integration test for service reconnection logic</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> pkg/consumers/db-event-writer/service_test.go <ul><li>Added <code>TestServiceRunReconnectsAfterFatalError</code> to verify reconnection <br>behavior<br> <li> Tests service recovery from fatal NATS errors with multiple connection <br>attempts</ul> </details> </td> <td><a href="https://github.com/carverauto/serviceradar/pull/1788/files#diff-88562fdd6796ec87c30e47778e0a9e515667d64cffd7d95c29c713be03eb04b3">+73/-0</a>&nbsp; &nbsp; </td> </tr> </table></td></tr><tr><td><strong>Enhancement</strong></td><td><table> <tr> <td> <details> <summary><strong>service.go</strong><dd><code>Implement automatic reconnection and lifecycle management</code></dd></summary> <hr> pkg/consumers/db-event-writer/service.go <ul><li>Refactored <code>Start</code> to use <code>ensureConsumer</code> and <code>run</code> loop for automatic <br>reconnection<br> <li> Added <code>connectFactory</code> field for dependency injection and testability<br> <li> Implemented <code>run</code> method with retry logic that handles fatal errors and <br>reconnects<br> <li> Added <code>ensureConsumer</code>, <code>establishConnection</code>, <code>setConnection</code>, and <br><code>resetConnection</code> for connection lifecycle management<br> <li> Enhanced <code>Stop</code> with proper shutdown timeout and connection cleanup<br> <li> Improved <code>UpdateConfig</code> to properly restart service with new <br>configuration<br> <li> Added <code>sleepWithContext</code> helper for context-aware delays<br> <li> Refactored <code>createConnection</code> (formerly <code>Start</code> logic) with enhanced NATS <br>connection handlers</ul> </details> </td> <td><a href="https://github.com/carverauto/serviceradar/pull/1788/files#diff-9f9f48b11e7670c7ae374abc41327adf4617972b214f1c168e9da53d3cd7b609">+252/-83</a></td> </tr> </table></td></tr></tr></tbody></table> </details> ___
qodo-code-review[bot] commented 2025-10-16 14:21:24 +00:00 (Migrated from github.com)
Author
Owner

Imported GitHub PR comment.

Original author: @qodo-code-review[bot]
Original URL: https://github.com/carverauto/serviceradar/pull/1788#issuecomment-3411154603
Original created: 2025-10-16T14:21:24Z

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
- [ ] Create ticket/issue <!-- /create_ticket --create_ticket=true -->

</details></td></tr>
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
No custom compliance provided

Follow the guide to enable custom compliance check.

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
- Requires Further Human Verification
🏷️ - Compliance label
Imported GitHub PR comment. Original author: @qodo-code-review[bot] Original URL: https://github.com/carverauto/serviceradar/pull/1788#issuecomment-3411154603 Original created: 2025-10-16T14:21:24Z --- _You are nearing your monthly Qodo Merge usage quota. For more information, please visit [here](https://qodo-merge-docs.qodo.ai/installation/qodo_merge/#cloud-users)._ ## PR Compliance Guide 🔍 <!-- https://github.com/carverauto/serviceradar/commit/b50384b81589445bb40a2bd042e922f3364077c0 --> Below is a summary of compliance checks for this PR:<br> <table><tbody><tr><td colspan='2'><strong>Security Compliance</strong></td></tr> <tr><td>🟢</td><td><details><summary><strong>No security concerns identified</strong></summary> No security vulnerabilities detected by AI analysis. Human verification advised for critical code. </details></td></tr> <tr><td colspan='2'><strong>Ticket Compliance</strong></td></tr> <tr><td>⚪</td><td><details><summary>🎫 <strong>No ticket provided </summary></strong> - [ ] Create ticket/issue <!-- /create_ticket --create_ticket=true --> </details></td></tr> <tr><td colspan='2'><strong>Codebase Duplication Compliance</strong></td></tr> <tr><td>⚪</td><td><details><summary><strong>Codebase context is not defined </strong></summary> Follow the <a href='https://qodo-merge-docs.qodo.ai/core-abilities/rag_context_enrichment/'>guide</a> to enable codebase context checks. </details></td></tr> <tr><td colspan='2'><strong>Custom Compliance</strong></td></tr> <tr><td>⚪</td><td><details><summary><strong>No custom compliance provided</strong></summary> Follow the <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/'>guide</a> to enable custom compliance check. </details></td></tr> <tr><td align="center" colspan="2"> - [ ] Update <!-- /compliance --update_compliance=true --> </td></tr></tbody></table> <details><summary>Compliance status legend</summary> 🟢 - Fully Compliant<br> 🟡 - Partial Compliant<br> 🔴 - Not Compliant<br> ⚪ - Requires Further Human Verification<br> 🏷️ - Compliance label<br> </details>
qodo-code-review[bot] commented 2025-10-16 14:22:53 +00:00 (Migrated from github.com)
Author
Owner

Imported GitHub PR comment.

Original author: @qodo-code-review[bot]
Original URL: https://github.com/carverauto/serviceradar/pull/1788#issuecomment-3411162054
Original created: 2025-10-16T14:22:53Z

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Code Suggestions

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Use context cancellation for test shutdown

In TestServiceRunReconnectsAfterFatalError, use context cancellation to
gracefully stop the service's run loop instead of returning context.Canceled
from the mock connectFactory.

pkg/consumers/db-event-writer/service_test.go [33-50]

+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
 	svc.connectFactory = func(context.Context) (*nats.Conn, jetstream.JetStream, *Consumer, error) {
 		mu.Lock()
 		defer mu.Unlock()
 
 		connectCalls++
 
-		err := nats.ErrConnectionClosed
 		if connectCalls > 1 {
-			err = context.Canceled
+			// Cancel context to stop the run loop gracefully.
+			cancel()
 		}
 
 		return nil, nil, &Consumer{
 			streamName:   "events",
 			consumerName: "writer",
-			consumer:     &fakePullConsumer{err: err},
+			consumer:     &fakePullConsumer{err: nats.ErrConnectionClosed},
 			logger:       logger.NewTestLogger(),
 		}, nil
 	}
 
+	require.NoError(t, svc.Start(ctx))
+
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion proposes a more robust and idiomatic way to control the test's lifecycle by using context cancellation, which better reflects real-world shutdown scenarios.

Medium
Refactor duplicated error handling logic

Refactor the duplicated error handling for c.consumer.Fetch and msgs.Error()
into a new helper method to improve maintainability.

pkg/consumers/db-event-writer/consumer.go [120-169]

 			msgs, err := c.consumer.Fetch(defaultMaxPullMessages, jetstream.FetchMaxWait(defaultPullExpiry))
 			if err != nil {
-				if isContextError(err) {
-					return err
+				if c.handleFetchError(ctx, err, "Failed to fetch messages") {
+					continue
 				}
-
-				if isFatalFetchError(err) {
-					return err
-				}
-
-				c.logger.Error().Err(err).Msg("Failed to fetch messages")
-				select {
-				case <-ctx.Done():
-					return ctx.Err()
-				case <-time.After(reconnectDelay):
-				}
-
-				continue
+				return err
 			}
 
 ... (clipped 14 lines)
 
 			if fetchErr := msgs.Error(); fetchErr != nil {
-				if isContextError(fetchErr) {
-					return fetchErr
+				if c.handleFetchError(ctx, fetchErr, "Fetch error") {
+					continue
 				}
+				return fetchErr
+			}
+		}
+	}
+}
 
-				if isFatalFetchError(fetchErr) {
-					return fetchErr
-				}
+func (c *Consumer) handleFetchError(ctx context.Context, err error, logMsg string) bool {
+	if isContextError(err) || isFatalFetchError(err) {
+		return false
+	}
 
-				c.logger.Error().Err(fetchErr).Msg("Fetch error")
-			}
+	c.logger.Error().Err(err).Msg(logMsg)
+	select {
+	case <-ctx.Done():
+		return false
+	case <-time.After(reconnectDelay):
+		return true
+	}
+}

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies duplicated error handling logic and proposes a valid refactoring into a helper function, which would improve code maintainability and reduce redundancy.

Low
Remove redundant connection closing logic

Remove the redundant connection closing block from setConnection as callers are
already responsible for closing the connection via resetConnection.

pkg/consumers/db-event-writer/service.go [214-225]

 func (s *Service) setConnection(nc *nats.Conn, js jetstream.JetStream, consumer *Consumer) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-
-	if s.nc != nil {
-		s.nc.Close()
-	}
 
 	s.nc = nc
 	s.js = js
 	s.consumer = consumer
 }
  • Apply / Chat
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that the connection closing logic in setConnection is redundant because all call paths already handle closing the connection via resetConnection.

Low
  • Update
Imported GitHub PR comment. Original author: @qodo-code-review[bot] Original URL: https://github.com/carverauto/serviceradar/pull/1788#issuecomment-3411162054 Original created: 2025-10-16T14:22:53Z --- _You are nearing your monthly Qodo Merge usage quota. For more information, please visit [here](https://qodo-merge-docs.qodo.ai/installation/qodo_merge/#cloud-users)._ ## PR Code Suggestions ✨ <!-- b50384b --> Explore these optional code suggestions: <table><thead><tr><td><strong>Category</strong></td><td align=left><strong>Suggestion&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </strong></td><td align=center><strong>Impact</strong></td></tr><tbody><tr><td rowspan=3>General</td> <td> <details><summary>Use context cancellation for test shutdown</summary> ___ **In <code>TestServiceRunReconnectsAfterFatalError</code>, use context cancellation to <br>gracefully stop the service's <code>run</code> loop instead of returning <code>context.Canceled</code> <br>from the mock <code>connectFactory</code>.** [pkg/consumers/db-event-writer/service_test.go [33-50]](https://github.com/carverauto/serviceradar/pull/1788/files#diff-88562fdd6796ec87c30e47778e0a9e515667d64cffd7d95c29c713be03eb04b3R33-R50) ```diff + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + svc.connectFactory = func(context.Context) (*nats.Conn, jetstream.JetStream, *Consumer, error) { mu.Lock() defer mu.Unlock() connectCalls++ - err := nats.ErrConnectionClosed if connectCalls > 1 { - err = context.Canceled + // Cancel context to stop the run loop gracefully. + cancel() } return nil, nil, &Consumer{ streamName: "events", consumerName: "writer", - consumer: &fakePullConsumer{err: err}, + consumer: &fakePullConsumer{err: nats.ErrConnectionClosed}, logger: logger.NewTestLogger(), }, nil } + require.NoError(t, svc.Start(ctx)) + ``` - [ ] **Apply / Chat** <!-- /improve --apply_suggestion=0 --> <details><summary>Suggestion importance[1-10]: 7</summary> __ Why: The suggestion proposes a more robust and idiomatic way to control the test's lifecycle by using context cancellation, which better reflects real-world shutdown scenarios. </details></details></td><td align=center>Medium </td></tr><tr><td> <details><summary>Refactor duplicated error handling logic</summary> ___ **Refactor the duplicated error handling for <code>c.consumer.Fetch</code> and <code>msgs.Error()</code> <br>into a new helper method to improve maintainability.** [pkg/consumers/db-event-writer/consumer.go [120-169]](https://github.com/carverauto/serviceradar/pull/1788/files#diff-0a2b2a9691c5d0f1ccdc9dd3d92b881cae9348c9d46c27dbcc3f069ed8c8891bR120-R169) ```diff msgs, err := c.consumer.Fetch(defaultMaxPullMessages, jetstream.FetchMaxWait(defaultPullExpiry)) if err != nil { - if isContextError(err) { - return err + if c.handleFetchError(ctx, err, "Failed to fetch messages") { + continue } - - if isFatalFetchError(err) { - return err - } - - c.logger.Error().Err(err).Msg("Failed to fetch messages") - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(reconnectDelay): - } - - continue + return err } ... (clipped 14 lines) if fetchErr := msgs.Error(); fetchErr != nil { - if isContextError(fetchErr) { - return fetchErr + if c.handleFetchError(ctx, fetchErr, "Fetch error") { + continue } + return fetchErr + } + } + } +} - if isFatalFetchError(fetchErr) { - return fetchErr - } +func (c *Consumer) handleFetchError(ctx context.Context, err error, logMsg string) bool { + if isContextError(err) || isFatalFetchError(err) { + return false + } - c.logger.Error().Err(fetchErr).Msg("Fetch error") - } + c.logger.Error().Err(err).Msg(logMsg) + select { + case <-ctx.Done(): + return false + case <-time.After(reconnectDelay): + return true + } +} ``` `[To ensure code accuracy, apply this suggestion manually]` <details><summary>Suggestion importance[1-10]: 6</summary> __ Why: The suggestion correctly identifies duplicated error handling logic and proposes a valid refactoring into a helper function, which would improve code maintainability and reduce redundancy. </details></details></td><td align=center>Low </td></tr><tr><td> <details><summary>Remove redundant connection closing logic</summary> ___ **Remove the redundant connection closing block from <code>setConnection</code> as callers are <br>already responsible for closing the connection via <code>resetConnection</code>.** [pkg/consumers/db-event-writer/service.go [214-225]](https://github.com/carverauto/serviceradar/pull/1788/files#diff-9f9f48b11e7670c7ae374abc41327adf4617972b214f1c168e9da53d3cd7b609R214-R225) ```diff func (s *Service) setConnection(nc *nats.Conn, js jetstream.JetStream, consumer *Consumer) { s.mu.Lock() defer s.mu.Unlock() - - if s.nc != nil { - s.nc.Close() - } s.nc = nc s.js = js s.consumer = consumer } ``` - [ ] **Apply / Chat** <!-- /improve --apply_suggestion=2 --> <details><summary>Suggestion importance[1-10]: 4</summary> __ Why: The suggestion correctly identifies that the connection closing logic in `setConnection` is redundant because all call paths already handle closing the connection via `resetConnection`. </details></details></td><td align=center>Low </td></tr> <tr><td align="center" colspan="2"> - [ ] Update <!-- /improve_multi --more_suggestions=true --> </td><td></td></tr></tbody></table>
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
carverauto/serviceradar!2330
No description provided.