fixing datetimes for devices #2324

Merged
mfreeman451 merged 3 commits from refs/pull/2324/head into main 2025-10-16 03:36:27 +00:00
mfreeman451 commented 2025-10-15 18:16:54 +00:00 (Migrated from github.com)
Owner

Imported from GitHub pull request.

Original GitHub pull request: #1780
Original author: @mfreeman451
Original URL: https://github.com/carverauto/serviceradar/pull/1780
Original created: 2025-10-15T18:16:54Z
Original updated: 2025-10-16T03:36:31Z
Original head: carverauto/serviceradar:1765-bugcoreui-missing-date-in-device-view-from-devices-discovered-by-sync-service
Original base: main
Original merged: 2025-10-16T03:36:27Z by @mfreeman451

PR Type

Bug fix, Enhancement


Description

  • Fixed missing first_seen timestamp for devices discovered by sync service

  • Added annotateFirstSeen function to preserve earliest device discovery time across updates

  • Updated database migration to use _first_seen metadata field in aggregation queries

  • Added comprehensive test coverage for first_seen timestamp preservation logic


Diagram Walkthrough

flowchart LR
  A["Device Update"] --> B["annotateFirstSeen()"]
  B --> C["Check existing DB records"]
  B --> D["Parse metadata timestamps"]
  C --> E["Select earliest timestamp"]
  D --> E
  E --> F["Set _first_seen in metadata"]
  F --> G["Publish to unified_devices"]

File Walkthrough

Relevant files
Bug fix
registry.go
Add first_seen timestamp annotation logic for device updates

pkg/registry/registry.go

  • Added annotateFirstSeen() function to compute and set _first_seen
    metadata field
  • Implemented parseFirstSeenTimestamp() helper supporting multiple date
    formats
  • Integrated first_seen annotation into ProcessBatchDeviceUpdates()
    workflow
  • Logic preserves earliest timestamp from existing records, metadata, or
    update timestamp
+95/-0   
Tests
registry_test.go
Add test coverage for first_seen timestamp preservation   

pkg/registry/registry_test.go

  • Added mock expectations for GetUnifiedDevicesByIPsOrIDs() in test
    helpers
  • Updated existing tests to validate _first_seen metadata presence and
    format
  • Added TestDeviceRegistry_FirstSeenPreservedFromExistingRecord() test
    case
+66/-0   
Enhancement
00000000000001_consolidated_serviceradar_schema.up.sql
Update database schema to support _first_seen metadata field

pkg/db/migrations/00000000000001_consolidated_serviceradar_schema.up.sql

  • Added SYNC modifier to all DROP STREAM statements for proper cleanup
  • Dropped dependent materialized views before recreating OCSF streams
  • Updated unified_devices materialized view to use _first_seen from
    metadata
  • Simplified aggregation logic by removing is_active and has_identity
    filters
  • Changed first_seen calculation to use
    parse_datetime64_best_effort_or_null() on _first_seen metadata
+69/-61 

Imported from GitHub pull request. Original GitHub pull request: #1780 Original author: @mfreeman451 Original URL: https://github.com/carverauto/serviceradar/pull/1780 Original created: 2025-10-15T18:16:54Z Original updated: 2025-10-16T03:36:31Z Original head: carverauto/serviceradar:1765-bugcoreui-missing-date-in-device-view-from-devices-discovered-by-sync-service Original base: main Original merged: 2025-10-16T03:36:27Z by @mfreeman451 --- ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Fixed missing `first_seen` timestamp for devices discovered by sync service - Added `annotateFirstSeen` function to preserve earliest device discovery time across updates - Updated database migration to use `_first_seen` metadata field in aggregation queries - Added comprehensive test coverage for first_seen timestamp preservation logic ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Device Update"] --> B["annotateFirstSeen()"] B --> C["Check existing DB records"] B --> D["Parse metadata timestamps"] C --> E["Select earliest timestamp"] D --> E E --> F["Set _first_seen in metadata"] F --> G["Publish to unified_devices"] ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>registry.go</strong><dd><code>Add first_seen timestamp annotation logic for device updates</code></dd></summary> <hr> pkg/registry/registry.go <ul><li>Added <code>annotateFirstSeen()</code> function to compute and set <code>_first_seen</code> <br>metadata field<br> <li> Implemented <code>parseFirstSeenTimestamp()</code> helper supporting multiple date <br>formats<br> <li> Integrated first_seen annotation into <code>ProcessBatchDeviceUpdates()</code> <br>workflow<br> <li> Logic preserves earliest timestamp from existing records, metadata, or <br>update timestamp</ul> </details> </td> <td><a href="https://github.com/carverauto/serviceradar/pull/1780/files#diff-cb61d8f79451b9541de4a8cc0811523a68d15452b2f5971c7618ea5b423cf4ec">+95/-0</a>&nbsp; &nbsp; </td> </tr> </table></td></tr><tr><td><strong>Tests</strong></td><td><table> <tr> <td> <details> <summary><strong>registry_test.go</strong><dd><code>Add test coverage for first_seen timestamp preservation</code>&nbsp; &nbsp; </dd></summary> <hr> pkg/registry/registry_test.go <ul><li>Added mock expectations for <code>GetUnifiedDevicesByIPsOrIDs()</code> in test <br>helpers<br> <li> Updated existing tests to validate <code>_first_seen</code> metadata presence and <br>format<br> <li> Added <code>TestDeviceRegistry_FirstSeenPreservedFromExistingRecord()</code> test <br>case</ul> </details> </td> <td><a href="https://github.com/carverauto/serviceradar/pull/1780/files#diff-f010972d104404be52d2a8e6e784cb56e31194f90795a69571a12696bcbdc075">+66/-0</a>&nbsp; &nbsp; </td> </tr> </table></td></tr><tr><td><strong>Enhancement</strong></td><td><table> <tr> <td> <details> <summary><strong>00000000000001_consolidated_serviceradar_schema.up.sql</strong><dd><code>Update database schema to support _first_seen metadata field</code></dd></summary> <hr> pkg/db/migrations/00000000000001_consolidated_serviceradar_schema.up.sql <ul><li>Added <code>SYNC</code> modifier to all <code>DROP STREAM</code> statements for proper cleanup<br> <li> Dropped dependent materialized views before recreating OCSF streams<br> <li> Updated <code>unified_devices</code> materialized view to use <code>_first_seen</code> from <br>metadata<br> <li> Simplified aggregation logic by removing <code>is_active</code> and <code>has_identity</code> <br>filters<br> <li> Changed <code>first_seen</code> calculation to use <br><code>parse_datetime64_best_effort_or_null()</code> on <code>_first_seen</code> metadata</ul> </details> </td> <td><a href="https://github.com/carverauto/serviceradar/pull/1780/files#diff-1e05de747238f2112bb2230aac8db388e4c80eebe84c071eb78e035d64e67eb6">+69/-61</a>&nbsp; </td> </tr> </table></td></tr></tr></tbody></table> </details> ___
qodo-code-review[bot] commented 2025-10-15 18:17:36 +00:00 (Migrated from github.com)
Author
Owner

Imported GitHub PR comment.

Original author: @qodo-code-review[bot]
Original URL: https://github.com/carverauto/serviceradar/pull/1780#issuecomment-3407713683
Original created: 2025-10-15T18:17:36Z

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🟢
🎫 #1765
🟢 Ensure devices discovered by the sync service have a valid non-regressing first_seen date
recorded initially.
Preserve the earliest first_seen across subsequent sync updates so it is not overwritten
by later bad data.
Reflect the correct first_seen in the unified_devices view used by the UI/device view.
Add adequate automated tests validating first_seen preservation behavior.
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
No custom compliance provided

Follow the guide to enable custom compliance check.

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
- Requires Further Human Verification
🏷️ - Compliance label
Imported GitHub PR comment. Original author: @qodo-code-review[bot] Original URL: https://github.com/carverauto/serviceradar/pull/1780#issuecomment-3407713683 Original created: 2025-10-15T18:17:36Z --- _You are nearing your monthly Qodo Merge usage quota. For more information, please visit [here](https://qodo-merge-docs.qodo.ai/installation/qodo_merge/#cloud-users)._ ## PR Compliance Guide 🔍 <!-- https://github.com/carverauto/serviceradar/commit/8e08e34976e8988735e258bf76e42f6f82a752bc --> Below is a summary of compliance checks for this PR:<br> <table><tbody><tr><td colspan='2'><strong>Security Compliance</strong></td></tr> <tr><td>🟢</td><td><details><summary><strong>No security concerns identified</strong></summary> No security vulnerabilities detected by AI analysis. Human verification advised for critical code. </details></td></tr> <tr><td colspan='2'><strong>Ticket Compliance</strong></td></tr> <tr><td>🟢</td> <td> <details> <summary>🎫 <a href=https://github.com/carverauto/serviceradar/issues/1765>#1765</a></summary> <table width='100%'><tbody> <tr><td rowspan=4>🟢</td> <td>Ensure devices discovered by the sync service have a valid non-regressing first_seen date <br>recorded initially.</td></tr> <tr><td>Preserve the earliest first_seen across subsequent sync updates so it is not overwritten <br>by later bad data.</td></tr> <tr><td>Reflect the correct first_seen in the unified_devices view used by the UI/device view.</td></tr> <tr><td>Add adequate automated tests validating first_seen preservation behavior.</td></tr> </tbody></table> </details> </td></tr> <tr><td colspan='2'><strong>Codebase Duplication Compliance</strong></td></tr> <tr><td>⚪</td><td><details><summary><strong>Codebase context is not defined </strong></summary> Follow the <a href='https://qodo-merge-docs.qodo.ai/core-abilities/rag_context_enrichment/'>guide</a> to enable codebase context checks. </details></td></tr> <tr><td colspan='2'><strong>Custom Compliance</strong></td></tr> <tr><td>⚪</td><td><details><summary><strong>No custom compliance provided</strong></summary> Follow the <a href='https://qodo-merge-docs.qodo.ai/tools/compliance/'>guide</a> to enable custom compliance check. </details></td></tr> <tr><td align="center" colspan="2"> - [ ] Update <!-- /compliance --update_compliance=true --> </td></tr></tbody></table> <details><summary>Compliance status legend</summary> 🟢 - Fully Compliant<br> 🟡 - Partial Compliant<br> 🔴 - Not Compliant<br> ⚪ - Requires Further Human Verification<br> 🏷️ - Compliance label<br> </details>
qodo-code-review[bot] commented 2025-10-15 18:18:51 +00:00 (Migrated from github.com)
Author
Owner

Imported GitHub PR comment.

Original author: @qodo-code-review[bot]
Original URL: https://github.com/carverauto/serviceradar/pull/1780#issuecomment-3407717536
Original created: 2025-10-15T18:18:51Z

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Code Suggestions

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure consistent timestamps within batches
Suggestion Impact:The commit implemented a two-pass strategy by extracting collection of device IDs, fetching existing first_seen timestamps, computing the batch-wide earliest per device, and then applying that consistent timestamp to all updates. It also added chunked fetching and helper functions, aligning with the suggestion’s intent to ensure consistency within batches.

code diff:

@@ -536,6 +549,26 @@
 		return nil
 	}
 
+	deviceIDs := collectDeviceIDs(updates)
+	if len(deviceIDs) == 0 {
+		return nil
+	}
+
+	existing, err := r.fetchExistingFirstSeen(ctx, deviceIDs)
+	if err != nil {
+		return err
+	}
+
+	firstSeen := computeBatchFirstSeen(updates, existing)
+	applyFirstSeenMetadata(updates, firstSeen)
+	return nil
+}
+
+func collectDeviceIDs(updates []*models.DeviceUpdate) []string {
+	if len(updates) == 0 {
+		return nil
+	}
+
 	idSet := make(map[string]struct{}, len(updates))
 	for _, update := range updates {
 		if update == nil || update.DeviceID == "" {
@@ -552,22 +585,52 @@
 	for id := range idSet {
 		deviceIDs = append(deviceIDs, id)
 	}
-
-	existingFirstSeen := make(map[string]time.Time, len(deviceIDs))
-	devices, err := r.db.GetUnifiedDevicesByIPsOrIDs(ctx, nil, deviceIDs)
-	if err != nil {
-		return fmt.Errorf("lookup existing devices: %w", err)
-	}
-
-	for _, device := range devices {
-		if device == nil || device.DeviceID == "" || device.FirstSeen.IsZero() {
+	return deviceIDs
+}
+
+func (r *DeviceRegistry) fetchExistingFirstSeen(ctx context.Context, deviceIDs []string) (map[string]time.Time, error) {
+	result := make(map[string]time.Time, len(deviceIDs))
+	if len(deviceIDs) == 0 {
+		return result, nil
+	}
+
+	chunkSize := r.firstSeenLookupChunkSize
+	if chunkSize <= 0 {
+		chunkSize = len(deviceIDs)
+	}
+
+	for start := 0; start < len(deviceIDs); start += chunkSize {
+		end := start + chunkSize
+		if end > len(deviceIDs) {
+			end = len(deviceIDs)
+		}
+
+		devices, err := r.db.GetUnifiedDevicesByIPsOrIDs(ctx, nil, deviceIDs[start:end])
+		if err != nil {
+			return nil, fmt.Errorf("lookup existing devices: %w", err)
+		}
+
+		for _, device := range devices {
+			if device != nil && device.DeviceID != "" && !device.FirstSeen.IsZero() {
+				result[device.DeviceID] = device.FirstSeen.UTC()
+			}
+		}
+	}
+
+	return result, nil
+}
+
+func computeBatchFirstSeen(updates []*models.DeviceUpdate, seed map[string]time.Time) map[string]time.Time {
+	result := make(map[string]time.Time, len(seed)+len(updates))
+	for id, ts := range seed {
+		if ts.IsZero() {
 			continue
 		}
-		existingFirstSeen[device.DeviceID] = device.FirstSeen.UTC()
+		result[id] = ts.UTC()
 	}
 
 	for _, update := range updates {
-		if update == nil {
+		if update == nil || update.DeviceID == "" {
 			continue
 		}
 
@@ -587,17 +650,39 @@
 			}
 		}
 
-		if existing, ok := existingFirstSeen[update.DeviceID]; ok && !existing.IsZero() && existing.Before(earliest) {
+		if existing, ok := result[update.DeviceID]; ok && !existing.IsZero() && existing.Before(earliest) {
 			earliest = existing
+		}
+
+		if current, ok := result[update.DeviceID]; !ok || earliest.Before(current) {
+			result[update.DeviceID] = earliest.UTC()
+		}
+	}
+
+	return result
+}
+
+func applyFirstSeenMetadata(updates []*models.DeviceUpdate, firstSeen map[string]time.Time) {
+	if len(firstSeen) == 0 {
+		return
+	}
+
+	for _, update := range updates {
+		if update == nil || update.DeviceID == "" {
+			continue
+		}
+
+		earliest, ok := firstSeen[update.DeviceID]
+		if !ok || earliest.IsZero() {
+			continue
 		}
 
 		if update.Metadata == nil {
 			update.Metadata = make(map[string]string)
 		}
+
 		update.Metadata["_first_seen"] = earliest.UTC().Format(time.RFC3339Nano)
 	}
-
-	return nil
 }

Refactor annotateFirstSeen to use a two-pass approach. First, find the earliest
timestamp for each device ID across the entire batch and existing records.
Second, apply this consistent timestamp to all updates for that device to ensure
data consistency.

pkg/registry/registry.go [534-601]

 func (r *DeviceRegistry) annotateFirstSeen(ctx context.Context, updates []*models.DeviceUpdate) error {
 	if len(updates) == 0 {
 		return nil
 	}
 
 	idSet := make(map[string]struct{}, len(updates))
 	for _, update := range updates {
 		if update == nil || update.DeviceID == "" {
 			continue
 		}
 		idSet[update.DeviceID] = struct{}{}
 	}
 
 	if len(idSet) == 0 {
 		return nil
 	}
 
 	deviceIDs := make([]string, 0, len(idSet))
 	for id := range idSet {
 		deviceIDs = append(deviceIDs, id)
 	}
 
-	existingFirstSeen := make(map[string]time.Time, len(deviceIDs))
+	batchFirstSeen := make(map[string]time.Time, len(deviceIDs))
 	devices, err := r.db.GetUnifiedDevicesByIPsOrIDs(ctx, nil, deviceIDs)
 	if err != nil {
 		return fmt.Errorf("lookup existing devices: %w", err)
 	}
 
 	for _, device := range devices {
-		if device == nil || device.DeviceID == "" || device.FirstSeen.IsZero() {
-			continue
+		if device != nil && device.DeviceID != "" && !device.FirstSeen.IsZero() {
+			batchFirstSeen[device.DeviceID] = device.FirstSeen.UTC()
 		}
-		existingFirstSeen[device.DeviceID] = device.FirstSeen.UTC()
 	}
 
+	// First pass: determine the earliest time for each device ID in the batch
 	for _, update := range updates {
-		if update == nil {
+		if update == nil || update.DeviceID == "" {
 			continue
 		}
 
 		earliest := update.Timestamp
 		if earliest.IsZero() {
 			earliest = time.Now()
 		}
 
 		if update.Metadata != nil {
 			if ts, ok := parseFirstSeenTimestamp(update.Metadata["_first_seen"]); ok && ts.Before(earliest) {
 				earliest = ts
 			}
 			for _, key := range []string{"first_seen", "integration_first_seen", "armis_first_seen"} {
 				if ts, ok := parseFirstSeenTimestamp(update.Metadata[key]); ok && ts.Before(earliest) {
 					earliest = ts
 				}
 			}
 		}
 
-		if existing, ok := existingFirstSeen[update.DeviceID]; ok && !existing.IsZero() && existing.Before(earliest) {
-			earliest = existing
+		if existing, ok := batchFirstSeen[update.DeviceID]; !ok || earliest.Before(existing) {
+			batchFirstSeen[update.DeviceID] = earliest
 		}
+	}
 
+	// Second pass: apply the consistent earliest time to all updates
+	for _, update := range updates {
+		if update == nil || update.DeviceID == "" {
+			continue
+		}
 		if update.Metadata == nil {
 			update.Metadata = make(map[string]string)
 		}
-		update.Metadata["_first_seen"] = earliest.UTC().Format(time.RFC3339Nano)
+		if earliest, ok := batchFirstSeen[update.DeviceID]; ok {
+			update.Metadata["_first_seen"] = earliest.UTC().Format(time.RFC3339Nano)
+		}
 	}
 
 	return nil
 }

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a logic flaw where multiple updates for the same device within a single batch could receive inconsistent _first_seen timestamps, leading to incorrect data. The proposed two-pass approach effectively fixes this data consistency bug.

Medium
High-level
The solution introduces a potential performance bottleneck

The current implementation queries the unified_devices view for each update
batch to preserve the first_seen timestamp, creating a potential performance
bottleneck. A more scalable solution would perform this logic entirely within
the database.

Examples:

pkg/registry/registry.go [184-186]
	if err := r.annotateFirstSeen(ctx, canonicalized); err != nil {
		r.logger.Warn().Err(err).Msg("Failed to annotate _first_seen metadata")
	}
pkg/registry/registry.go [534-601]
func (r *DeviceRegistry) annotateFirstSeen(ctx context.Context, updates []*models.DeviceUpdate) error {
	if len(updates) == 0 {
		return nil
	}

	idSet := make(map[string]struct{}, len(updates))
	for _, update := range updates {
		if update == nil || update.DeviceID == "" {
			continue
		}

 ... (clipped 58 lines)

Solution Walkthrough:

Before:

func (r *DeviceRegistry) ProcessBatchDeviceUpdates(ctx, updates) {
  // ...
  canonicalized := canonicalize(updates)
  // ...
  // Directly publish updates without preserving historical first_seen
  r.db.PublishBatchDeviceUpdates(ctx, canonicalized)
}

-- In Database
CREATE MATERIALIZED VIEW unified_device_pipeline_mv AS
SELECT
  device_id,
  min_if(timestamp, ...) AS first_seen, -- Relies only on data in the current update window
  ...
FROM device_updates
GROUP BY device_id;

After:

func (r *DeviceRegistry) ProcessBatchDeviceUpdates(ctx, updates) {
  // ...
  canonicalized := canonicalize(updates)

  // Read from aggregated view to get historical first_seen
  r.annotateFirstSeen(ctx, canonicalized)

  // Publish enriched updates
  r.db.PublishBatchDeviceUpdates(ctx, canonicalized)
}

func (r *DeviceRegistry) annotateFirstSeen(ctx, updates) {
  // Get existing 'first_seen' from unified_devices view
  existingDevices, err := r.db.GetUnifiedDevicesByIPsOrIDs(...)
  // ...
  // For each update, find the earliest timestamp
  // and set it in update.Metadata["_first_seen"]
}

Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential performance bottleneck in the read-modify-write pattern introduced, which is a significant architectural concern for a high-throughput system.

Medium
  • Update
Imported GitHub PR comment. Original author: @qodo-code-review[bot] Original URL: https://github.com/carverauto/serviceradar/pull/1780#issuecomment-3407717536 Original created: 2025-10-15T18:18:51Z --- _You are nearing your monthly Qodo Merge usage quota. For more information, please visit [here](https://qodo-merge-docs.qodo.ai/installation/qodo_merge/#cloud-users)._ ## PR Code Suggestions ✨ <!-- 8e08e34 --> Explore these optional code suggestions: <table><thead><tr><td><strong>Category</strong></td><td align=left><strong>Suggestion&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </strong></td><td align=center><strong>Impact</strong></td></tr><tbody><tr><td rowspan=1>Possible issue</td> <td> <details><summary>✅ <s>Ensure consistent timestamps within batches</s></summary> ___ <details><summary><b>Suggestion Impact:</b></summary>The commit implemented a two-pass strategy by extracting collection of device IDs, fetching existing first_seen timestamps, computing the batch-wide earliest per device, and then applying that consistent timestamp to all updates. It also added chunked fetching and helper functions, aligning with the suggestion’s intent to ensure consistency within batches. code diff: ```diff @@ -536,6 +549,26 @@ return nil } + deviceIDs := collectDeviceIDs(updates) + if len(deviceIDs) == 0 { + return nil + } + + existing, err := r.fetchExistingFirstSeen(ctx, deviceIDs) + if err != nil { + return err + } + + firstSeen := computeBatchFirstSeen(updates, existing) + applyFirstSeenMetadata(updates, firstSeen) + return nil +} + +func collectDeviceIDs(updates []*models.DeviceUpdate) []string { + if len(updates) == 0 { + return nil + } + idSet := make(map[string]struct{}, len(updates)) for _, update := range updates { if update == nil || update.DeviceID == "" { @@ -552,22 +585,52 @@ for id := range idSet { deviceIDs = append(deviceIDs, id) } - - existingFirstSeen := make(map[string]time.Time, len(deviceIDs)) - devices, err := r.db.GetUnifiedDevicesByIPsOrIDs(ctx, nil, deviceIDs) - if err != nil { - return fmt.Errorf("lookup existing devices: %w", err) - } - - for _, device := range devices { - if device == nil || device.DeviceID == "" || device.FirstSeen.IsZero() { + return deviceIDs +} + +func (r *DeviceRegistry) fetchExistingFirstSeen(ctx context.Context, deviceIDs []string) (map[string]time.Time, error) { + result := make(map[string]time.Time, len(deviceIDs)) + if len(deviceIDs) == 0 { + return result, nil + } + + chunkSize := r.firstSeenLookupChunkSize + if chunkSize <= 0 { + chunkSize = len(deviceIDs) + } + + for start := 0; start < len(deviceIDs); start += chunkSize { + end := start + chunkSize + if end > len(deviceIDs) { + end = len(deviceIDs) + } + + devices, err := r.db.GetUnifiedDevicesByIPsOrIDs(ctx, nil, deviceIDs[start:end]) + if err != nil { + return nil, fmt.Errorf("lookup existing devices: %w", err) + } + + for _, device := range devices { + if device != nil && device.DeviceID != "" && !device.FirstSeen.IsZero() { + result[device.DeviceID] = device.FirstSeen.UTC() + } + } + } + + return result, nil +} + +func computeBatchFirstSeen(updates []*models.DeviceUpdate, seed map[string]time.Time) map[string]time.Time { + result := make(map[string]time.Time, len(seed)+len(updates)) + for id, ts := range seed { + if ts.IsZero() { continue } - existingFirstSeen[device.DeviceID] = device.FirstSeen.UTC() + result[id] = ts.UTC() } for _, update := range updates { - if update == nil { + if update == nil || update.DeviceID == "" { continue } @@ -587,17 +650,39 @@ } } - if existing, ok := existingFirstSeen[update.DeviceID]; ok && !existing.IsZero() && existing.Before(earliest) { + if existing, ok := result[update.DeviceID]; ok && !existing.IsZero() && existing.Before(earliest) { earliest = existing + } + + if current, ok := result[update.DeviceID]; !ok || earliest.Before(current) { + result[update.DeviceID] = earliest.UTC() + } + } + + return result +} + +func applyFirstSeenMetadata(updates []*models.DeviceUpdate, firstSeen map[string]time.Time) { + if len(firstSeen) == 0 { + return + } + + for _, update := range updates { + if update == nil || update.DeviceID == "" { + continue + } + + earliest, ok := firstSeen[update.DeviceID] + if !ok || earliest.IsZero() { + continue } if update.Metadata == nil { update.Metadata = make(map[string]string) } + update.Metadata["_first_seen"] = earliest.UTC().Format(time.RFC3339Nano) } - - return nil } ``` </details> ___ **Refactor <code>annotateFirstSeen</code> to use a two-pass approach. First, find the earliest <br>timestamp for each device ID across the entire batch and existing records. <br>Second, apply this consistent timestamp to all updates for that device to ensure <br>data consistency.** [pkg/registry/registry.go [534-601]](https://github.com/carverauto/serviceradar/pull/1780/files#diff-cb61d8f79451b9541de4a8cc0811523a68d15452b2f5971c7618ea5b423cf4ecR534-R601) ```diff func (r *DeviceRegistry) annotateFirstSeen(ctx context.Context, updates []*models.DeviceUpdate) error { if len(updates) == 0 { return nil } idSet := make(map[string]struct{}, len(updates)) for _, update := range updates { if update == nil || update.DeviceID == "" { continue } idSet[update.DeviceID] = struct{}{} } if len(idSet) == 0 { return nil } deviceIDs := make([]string, 0, len(idSet)) for id := range idSet { deviceIDs = append(deviceIDs, id) } - existingFirstSeen := make(map[string]time.Time, len(deviceIDs)) + batchFirstSeen := make(map[string]time.Time, len(deviceIDs)) devices, err := r.db.GetUnifiedDevicesByIPsOrIDs(ctx, nil, deviceIDs) if err != nil { return fmt.Errorf("lookup existing devices: %w", err) } for _, device := range devices { - if device == nil || device.DeviceID == "" || device.FirstSeen.IsZero() { - continue + if device != nil && device.DeviceID != "" && !device.FirstSeen.IsZero() { + batchFirstSeen[device.DeviceID] = device.FirstSeen.UTC() } - existingFirstSeen[device.DeviceID] = device.FirstSeen.UTC() } + // First pass: determine the earliest time for each device ID in the batch for _, update := range updates { - if update == nil { + if update == nil || update.DeviceID == "" { continue } earliest := update.Timestamp if earliest.IsZero() { earliest = time.Now() } if update.Metadata != nil { if ts, ok := parseFirstSeenTimestamp(update.Metadata["_first_seen"]); ok && ts.Before(earliest) { earliest = ts } for _, key := range []string{"first_seen", "integration_first_seen", "armis_first_seen"} { if ts, ok := parseFirstSeenTimestamp(update.Metadata[key]); ok && ts.Before(earliest) { earliest = ts } } } - if existing, ok := existingFirstSeen[update.DeviceID]; ok && !existing.IsZero() && existing.Before(earliest) { - earliest = existing + if existing, ok := batchFirstSeen[update.DeviceID]; !ok || earliest.Before(existing) { + batchFirstSeen[update.DeviceID] = earliest } + } + // Second pass: apply the consistent earliest time to all updates + for _, update := range updates { + if update == nil || update.DeviceID == "" { + continue + } if update.Metadata == nil { update.Metadata = make(map[string]string) } - update.Metadata["_first_seen"] = earliest.UTC().Format(time.RFC3339Nano) + if earliest, ok := batchFirstSeen[update.DeviceID]; ok { + update.Metadata["_first_seen"] = earliest.UTC().Format(time.RFC3339Nano) + } } return nil } ``` `[Suggestion processed]` <details><summary>Suggestion importance[1-10]: 8</summary> __ Why: The suggestion correctly identifies a logic flaw where multiple updates for the same device within a single batch could receive inconsistent `_first_seen` timestamps, leading to incorrect data. The proposed two-pass approach effectively fixes this data consistency bug. </details></details></td><td align=center>Medium </td></tr><tr><td rowspan=1>High-level</td> <td> <details><summary>The solution introduces a potential performance bottleneck</summary> ___ **The current implementation queries the <code>unified_devices</code> view for each update <br>batch to preserve the <code>first_seen</code> timestamp, creating a potential performance <br>bottleneck. A more scalable solution would perform this logic entirely within <br>the database.** ### Examples: <details> <summary> <a href="https://github.com/carverauto/serviceradar/pull/1780/files#diff-cb61d8f79451b9541de4a8cc0811523a68d15452b2f5971c7618ea5b423cf4ecR184-R186">pkg/registry/registry.go [184-186]</a> </summary> ```go if err := r.annotateFirstSeen(ctx, canonicalized); err != nil { r.logger.Warn().Err(err).Msg("Failed to annotate _first_seen metadata") } ``` </details> <details> <summary> <a href="https://github.com/carverauto/serviceradar/pull/1780/files#diff-cb61d8f79451b9541de4a8cc0811523a68d15452b2f5971c7618ea5b423cf4ecR534-R601">pkg/registry/registry.go [534-601]</a> </summary> ```go func (r *DeviceRegistry) annotateFirstSeen(ctx context.Context, updates []*models.DeviceUpdate) error { if len(updates) == 0 { return nil } idSet := make(map[string]struct{}, len(updates)) for _, update := range updates { if update == nil || update.DeviceID == "" { continue } ... (clipped 58 lines) ``` </details> ### Solution Walkthrough: #### Before: ```go func (r *DeviceRegistry) ProcessBatchDeviceUpdates(ctx, updates) { // ... canonicalized := canonicalize(updates) // ... // Directly publish updates without preserving historical first_seen r.db.PublishBatchDeviceUpdates(ctx, canonicalized) } -- In Database CREATE MATERIALIZED VIEW unified_device_pipeline_mv AS SELECT device_id, min_if(timestamp, ...) AS first_seen, -- Relies only on data in the current update window ... FROM device_updates GROUP BY device_id; ``` #### After: ```go func (r *DeviceRegistry) ProcessBatchDeviceUpdates(ctx, updates) { // ... canonicalized := canonicalize(updates) // Read from aggregated view to get historical first_seen r.annotateFirstSeen(ctx, canonicalized) // Publish enriched updates r.db.PublishBatchDeviceUpdates(ctx, canonicalized) } func (r *DeviceRegistry) annotateFirstSeen(ctx, updates) { // Get existing 'first_seen' from unified_devices view existingDevices, err := r.db.GetUnifiedDevicesByIPsOrIDs(...) // ... // For each update, find the earliest timestamp // and set it in update.Metadata["_first_seen"] } ``` <details><summary>Suggestion importance[1-10]: 7</summary> __ Why: The suggestion correctly identifies a potential performance bottleneck in the read-modify-write pattern introduced, which is a significant architectural concern for a high-throughput system. </details></details></td><td align=center>Medium </td></tr> <tr><td align="center" colspan="2"> - [ ] Update <!-- /improve_multi --more_suggestions=true --> </td><td></td></tr></tbody></table>
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
carverauto/serviceradar!2324
No description provided.