InsertEvents closes pgx BatchResults without reading results, discarding insert errors #691

Closed
opened 2026-03-28 04:27:29 +00:00 by mfreeman451 · 1 comment
Owner

Imported from GitHub.

Original GitHub issue: #2153
Original author: @mfreeman451
Original URL: https://github.com/carverauto/serviceradar/issues/2153
Original created: 2025-12-16T05:19:02Z


Summary

  • Context: The InsertEvents function in pkg/db/events.go persists CloudEvent rows to the PostgreSQL events table using pgx batch operations.
  • Bug: The function calls br.Close() on BatchResults without first reading individual batch item results via br.Exec(), which can silently discard errors from INSERT operations.
  • Actual vs. expected: Errors from individual INSERT statements in the batch may not be detected and returned to the caller; the expected behavior is that all INSERT errors should be surfaced and cause the function to return an error.
  • Impact: Failed event insertions may go undetected, leading to silent data loss where events are not persisted to the database but the application proceeds as if they were successfully stored.

Code with bug

func (db *DB) InsertEvents(ctx context.Context, rows []*models.EventRow) error {
	// ... validation and batch preparation ...

	br := db.pgPool.SendBatch(ctx, batch)
	if err := br.Close(); err != nil {  // <-- BUG 🔴 Missing br.Exec() calls before Close()
		return fmt.Errorf("failed to insert events: %w", err)
	}

	return nil
}

Evidence

Example

Consider a batch with 3 event insertions where the second one has a constraint violation:

  1. Event 1: INSERT INTO events (...) VALUES (...) - valid
  2. Event 2: INSERT INTO events (...) VALUES (...) - violates constraint (e.g., invalid data type, foreign key violation not caught by ON CONFLICT)
  3. Event 3: INSERT INTO events (...) VALUES (...) - valid

With the current implementation calling only br.Close():

  • The batch is sent to PostgreSQL
  • PostgreSQL processes all three INSERTs
  • Event 2 fails with an error
  • br.Close() may or may not surface this error reliably
  • The function returns success even though Event 2 was not inserted

With the fixed implementation that calls br.Exec() for each item:

  • The batch is sent to PostgreSQL
  • The code explicitly reads the result of Event 1 - success
  • The code explicitly reads the result of Event 2 - error detected
  • The function immediately returns the error with clear context about which batch item failed
  • Events 2 and 3 are not inserted (transaction semantics)

Inconsistency within the codebase

Reference code

pkg/db/cnpg_metrics.go:320-336

func (db *DB) sendCNPG(ctx context.Context, batch *pgx.Batch, name string) (err error) {
	br := db.pgPool.SendBatch(ctx, batch)
	defer func() {
		if closeErr := br.Close(); closeErr != nil && err == nil {
			err = fmt.Errorf("cnpg %s batch close: %w", name, closeErr)
		}
	}()

	// Read results for each queued command to properly detect errors
	for i := 0; i < batch.Len(); i++ {
		if _, err = br.Exec(); err != nil {
			return fmt.Errorf("cnpg %s insert (command %d): %w", name, i, err)
		}
	}

	return nil
}

pkg/db/cnpg_device_updates_retry.go:168-183

func (db *DB) sendCNPGBatch(ctx context.Context, batch *pgx.Batch, name string) (err error) {
	br := db.conn().SendBatch(ctx, batch)
	defer func() {
		if closeErr := br.Close(); closeErr != nil && err == nil {
			err = fmt.Errorf("cnpg %s batch close: %w", name, closeErr)
		}
	}()

	for i := 0; i < batch.Len(); i++ {
		if _, err = br.Exec(); err != nil {
			return fmt.Errorf("cnpg %s insert (command %d): %w", name, i, err)
		}
	}

	return nil
}

Current code

pkg/db/events.go:83-88

br := db.pgPool.SendBatch(ctx, batch)
if err := br.Close(); err != nil {
	return fmt.Errorf("failed to insert events: %w", err)
}

return nil

Contradiction

The codebase has an established pattern for handling batch operations: explicitly calling br.Exec() for each batch item before br.Close() to properly detect errors. This pattern was introduced specifically to fix the error detection issue (see git commit 05968ef8). The InsertEvents function uses the older, buggy pattern that was replaced elsewhere in the codebase.

Additionally, pkg/db/auth.go also suffers from the same bug (lines 160-163), indicating this is a systemic issue that was partially fixed but not comprehensively addressed across all batch operations.

Full context

The InsertEvents function is called by the db-event-writer consumer (pkg/consumers/db-event-writer/processor.go:639) to persist CloudEvents from various sources into the PostgreSQL events table. CloudEvents are a standardized format for event data, and this function is part of the observability pipeline that stores events for later querying and analysis.

The events table uses a compound primary key (event_timestamp, id) and includes an ON CONFLICT ... DO UPDATE clause to handle duplicate events. While the ON CONFLICT handling prevents many error conditions, other errors can still occur:

  • Database constraints beyond the primary key
  • Data type mismatches not caught by Go type system
  • PostgreSQL-specific errors (e.g., disk full, connection issues during batch processing)
  • Schema-related issues (e.g., if the search_path causes inserts to go to the wrong schema, similar to the bug fixed in commit 05968ef8)

When such errors occur during a batch insert but are not properly detected, the db-event-writer believes the events were successfully persisted and continues processing. This leads to gaps in the event log that are difficult to diagnose because there's no error logged or reported.

The batch operation is used for performance: multiple event rows are sent to PostgreSQL in a single network round trip. The issue is not with using batches, but with the error detection mechanism after the batch is sent.

External documentation

From the openspec proposal that fixed this issue in cnpg_metrics.go:

openspec/changes/fix-sysmon-vm-metrics-availability/proposal.md

Additionally, the sendCNPG batch function was not properly reading batch results
before closing, which could silently discard insert errors.

### 3. Proper Batch Result Handling
Modified sendCNPG() in pkg/db/cnpg_metrics.go to properly read batch results before closing:

    // Read results for each queued command to properly detect errors
    for i := 0; i < batch.Len(); i++ {
        if _, err = br.Exec(); err != nil {
            return fmt.Errorf("cnpg %s insert (command %d): %w", name, i, err)
        }
    }

From the git commit message (05968ef8):

fix(db): properly read batch results to detect insert errors

The sendCNPG function was calling br.Close() without reading individual
query results, which silently discarded any errors from INSERT statements.
Now properly calls br.Exec() for each queued command to surface errors.

Why has this bug gone undetected?

This bug has gone undetected for several reasons:

  1. ON CONFLICT handling: The INSERT statement includes ON CONFLICT (id, event_timestamp) DO UPDATE, which handles duplicate key violations gracefully. This prevents the most common type of INSERT error (duplicates) from occurring, masking the error detection problem.

  2. Permissive schema: The events table schema is quite permissive - most columns are TEXT without NOT NULL constraints, and the Level column is an INTEGER that accepts any int32 value. This makes it rare for data type violations or constraint errors to occur.

  3. Successful common path: In normal operation, events are well-formed and inserts succeed. The bug only manifests when there's an actual error during insertion, which is uncommon.

  4. Silent failure: When the bug does occur, it fails silently - no error is logged, no alert is raised, and the application continues normally. Only careful inspection of missing data would reveal the problem.

  5. Recent discovery: The development team only recently discovered this pattern was problematic (December 2025, commit 05968ef8) when debugging why sysmon metrics weren't appearing. They fixed it in cnpg_metrics.go and cnpg_device_updates_retry.go but did not audit other batch operations in the codebase for the same issue.

  6. No comprehensive test coverage: There are no tests that verify error handling for batch operations with constraint violations or other error conditions. The test suite focuses on the happy path where all inserts succeed.

Recommended fix

Apply the same fix that was used in commit 05968ef8 for cnpg_metrics.go:

func (db *DB) InsertEvents(ctx context.Context, rows []*models.EventRow) error {
	if len(rows) == 0 {
		return nil
	}

	if !db.cnpgConfigured() {
		return ErrDatabaseNotInitialized
	}

	batch := &pgx.Batch{}

	for _, row := range rows {
		if row == nil {
			continue
		}

		ts := row.EventTimestamp
		if ts.IsZero() {
			ts = time.Now().UTC()
		}

		batch.Queue(
			`INSERT INTO events (
				event_timestamp,
				specversion,
				id,
				source,
				type,
				datacontenttype,
				subject,
				remote_addr,
				host,
				level,
				severity,
				short_message,
				version,
				raw_data
			) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)
			ON CONFLICT (id, event_timestamp) DO UPDATE SET
				event_timestamp = EXCLUDED.event_timestamp,
				specversion     = EXCLUDED.specversion,
				source          = EXCLUDED.source,
				type            = EXCLUDED.type,
				datacontenttype = EXCLUDED.datacontenttype,
				subject         = EXCLUDED.subject,
				remote_addr     = EXCLUDED.remote_addr,
				host            = EXCLUDED.host,
				level           = EXCLUDED.level,
				severity        = EXCLUDED.severity,
				short_message   = EXCLUDED.short_message,
				version         = EXCLUDED.version,
				raw_data        = EXCLUDED.raw_data`,
			ts,
			row.SpecVersion,
			row.ID,
			row.Source,
			row.Type,
			row.DataContentType,
			row.Subject,
			row.RemoteAddr,
			row.Host,
			row.Level,
			row.Severity,
			row.ShortMessage,
			row.Version,
			row.RawData,
		)
	}

	br := db.pgPool.SendBatch(ctx, batch)
	defer func() {
		if closeErr := br.Close(); closeErr != nil {
			// Log the close error even if we're already returning an error
		}
	}()

	// Read results for each queued command to properly detect errors  // <-- FIX 🟢
	for i := 0; i < batch.Len(); i++ {  // <-- FIX 🟢
		if _, err := br.Exec(); err != nil {
			return fmt.Errorf("failed to insert event %d: %w", i, err)
		}
	}

	return nil
}

Related bugs

pkg/db/auth.go lines 160-163 has the same bug in the StoreUsers function:

br := db.pgPool.SendBatch(ctx, batch)
if err := br.Close(); err != nil {
	return fmt.Errorf("failed to store batch users: %w", err)
}

This should also be fixed using the same pattern.

Imported from GitHub. Original GitHub issue: #2153 Original author: @mfreeman451 Original URL: https://github.com/carverauto/serviceradar/issues/2153 Original created: 2025-12-16T05:19:02Z --- # Summary - **Context**: The `InsertEvents` function in `pkg/db/events.go` persists CloudEvent rows to the PostgreSQL events table using pgx batch operations. - **Bug**: The function calls `br.Close()` on `BatchResults` without first reading individual batch item results via `br.Exec()`, which can silently discard errors from INSERT operations. - **Actual vs. expected**: Errors from individual INSERT statements in the batch may not be detected and returned to the caller; the expected behavior is that all INSERT errors should be surfaced and cause the function to return an error. - **Impact**: Failed event insertions may go undetected, leading to silent data loss where events are not persisted to the database but the application proceeds as if they were successfully stored. # Code with bug ```go func (db *DB) InsertEvents(ctx context.Context, rows []*models.EventRow) error { // ... validation and batch preparation ... br := db.pgPool.SendBatch(ctx, batch) if err := br.Close(); err != nil { // <-- BUG 🔴 Missing br.Exec() calls before Close() return fmt.Errorf("failed to insert events: %w", err) } return nil } ``` # Evidence ## Example Consider a batch with 3 event insertions where the second one has a constraint violation: 1. Event 1: `INSERT INTO events (...) VALUES (...)` - valid 2. Event 2: `INSERT INTO events (...) VALUES (...)` - violates constraint (e.g., invalid data type, foreign key violation not caught by ON CONFLICT) 3. Event 3: `INSERT INTO events (...) VALUES (...)` - valid With the current implementation calling only `br.Close()`: - The batch is sent to PostgreSQL - PostgreSQL processes all three INSERTs - Event 2 fails with an error - `br.Close()` may or may not surface this error reliably - The function returns success even though Event 2 was not inserted With the fixed implementation that calls `br.Exec()` for each item: - The batch is sent to PostgreSQL - The code explicitly reads the result of Event 1 - success - The code explicitly reads the result of Event 2 - error detected - The function immediately returns the error with clear context about which batch item failed - Events 2 and 3 are not inserted (transaction semantics) ## Inconsistency within the codebase ### Reference code `pkg/db/cnpg_metrics.go:320-336` ```go func (db *DB) sendCNPG(ctx context.Context, batch *pgx.Batch, name string) (err error) { br := db.pgPool.SendBatch(ctx, batch) defer func() { if closeErr := br.Close(); closeErr != nil && err == nil { err = fmt.Errorf("cnpg %s batch close: %w", name, closeErr) } }() // Read results for each queued command to properly detect errors for i := 0; i < batch.Len(); i++ { if _, err = br.Exec(); err != nil { return fmt.Errorf("cnpg %s insert (command %d): %w", name, i, err) } } return nil } ``` `pkg/db/cnpg_device_updates_retry.go:168-183` ```go func (db *DB) sendCNPGBatch(ctx context.Context, batch *pgx.Batch, name string) (err error) { br := db.conn().SendBatch(ctx, batch) defer func() { if closeErr := br.Close(); closeErr != nil && err == nil { err = fmt.Errorf("cnpg %s batch close: %w", name, closeErr) } }() for i := 0; i < batch.Len(); i++ { if _, err = br.Exec(); err != nil { return fmt.Errorf("cnpg %s insert (command %d): %w", name, i, err) } } return nil } ``` ### Current code `pkg/db/events.go:83-88` ```go br := db.pgPool.SendBatch(ctx, batch) if err := br.Close(); err != nil { return fmt.Errorf("failed to insert events: %w", err) } return nil ``` ### Contradiction The codebase has an established pattern for handling batch operations: explicitly calling `br.Exec()` for each batch item before `br.Close()` to properly detect errors. This pattern was introduced specifically to fix the error detection issue (see git commit 05968ef8). The `InsertEvents` function uses the older, buggy pattern that was replaced elsewhere in the codebase. Additionally, `pkg/db/auth.go` also suffers from the same bug (lines 160-163), indicating this is a systemic issue that was partially fixed but not comprehensively addressed across all batch operations. # Full context The `InsertEvents` function is called by the db-event-writer consumer (`pkg/consumers/db-event-writer/processor.go:639`) to persist CloudEvents from various sources into the PostgreSQL events table. CloudEvents are a standardized format for event data, and this function is part of the observability pipeline that stores events for later querying and analysis. The events table uses a compound primary key `(event_timestamp, id)` and includes an `ON CONFLICT ... DO UPDATE` clause to handle duplicate events. While the ON CONFLICT handling prevents many error conditions, other errors can still occur: - Database constraints beyond the primary key - Data type mismatches not caught by Go type system - PostgreSQL-specific errors (e.g., disk full, connection issues during batch processing) - Schema-related issues (e.g., if the search_path causes inserts to go to the wrong schema, similar to the bug fixed in commit 05968ef8) When such errors occur during a batch insert but are not properly detected, the db-event-writer believes the events were successfully persisted and continues processing. This leads to gaps in the event log that are difficult to diagnose because there's no error logged or reported. The batch operation is used for performance: multiple event rows are sent to PostgreSQL in a single network round trip. The issue is not with using batches, but with the error detection mechanism after the batch is sent. ## External documentation From the openspec proposal that fixed this issue in `cnpg_metrics.go`: ``` openspec/changes/fix-sysmon-vm-metrics-availability/proposal.md Additionally, the sendCNPG batch function was not properly reading batch results before closing, which could silently discard insert errors. ### 3. Proper Batch Result Handling Modified sendCNPG() in pkg/db/cnpg_metrics.go to properly read batch results before closing: // Read results for each queued command to properly detect errors for i := 0; i < batch.Len(); i++ { if _, err = br.Exec(); err != nil { return fmt.Errorf("cnpg %s insert (command %d): %w", name, i, err) } } ``` From the git commit message (05968ef8): ``` fix(db): properly read batch results to detect insert errors The sendCNPG function was calling br.Close() without reading individual query results, which silently discarded any errors from INSERT statements. Now properly calls br.Exec() for each queued command to surface errors. ``` # Why has this bug gone undetected? This bug has gone undetected for several reasons: 1. **ON CONFLICT handling**: The INSERT statement includes `ON CONFLICT (id, event_timestamp) DO UPDATE`, which handles duplicate key violations gracefully. This prevents the most common type of INSERT error (duplicates) from occurring, masking the error detection problem. 2. **Permissive schema**: The events table schema is quite permissive - most columns are TEXT without NOT NULL constraints, and the Level column is an INTEGER that accepts any int32 value. This makes it rare for data type violations or constraint errors to occur. 3. **Successful common path**: In normal operation, events are well-formed and inserts succeed. The bug only manifests when there's an actual error during insertion, which is uncommon. 4. **Silent failure**: When the bug does occur, it fails silently - no error is logged, no alert is raised, and the application continues normally. Only careful inspection of missing data would reveal the problem. 5. **Recent discovery**: The development team only recently discovered this pattern was problematic (December 2025, commit 05968ef8) when debugging why sysmon metrics weren't appearing. They fixed it in `cnpg_metrics.go` and `cnpg_device_updates_retry.go` but did not audit other batch operations in the codebase for the same issue. 6. **No comprehensive test coverage**: There are no tests that verify error handling for batch operations with constraint violations or other error conditions. The test suite focuses on the happy path where all inserts succeed. # Recommended fix Apply the same fix that was used in commit 05968ef8 for `cnpg_metrics.go`: ```go func (db *DB) InsertEvents(ctx context.Context, rows []*models.EventRow) error { if len(rows) == 0 { return nil } if !db.cnpgConfigured() { return ErrDatabaseNotInitialized } batch := &pgx.Batch{} for _, row := range rows { if row == nil { continue } ts := row.EventTimestamp if ts.IsZero() { ts = time.Now().UTC() } batch.Queue( `INSERT INTO events ( event_timestamp, specversion, id, source, type, datacontenttype, subject, remote_addr, host, level, severity, short_message, version, raw_data ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) ON CONFLICT (id, event_timestamp) DO UPDATE SET event_timestamp = EXCLUDED.event_timestamp, specversion = EXCLUDED.specversion, source = EXCLUDED.source, type = EXCLUDED.type, datacontenttype = EXCLUDED.datacontenttype, subject = EXCLUDED.subject, remote_addr = EXCLUDED.remote_addr, host = EXCLUDED.host, level = EXCLUDED.level, severity = EXCLUDED.severity, short_message = EXCLUDED.short_message, version = EXCLUDED.version, raw_data = EXCLUDED.raw_data`, ts, row.SpecVersion, row.ID, row.Source, row.Type, row.DataContentType, row.Subject, row.RemoteAddr, row.Host, row.Level, row.Severity, row.ShortMessage, row.Version, row.RawData, ) } br := db.pgPool.SendBatch(ctx, batch) defer func() { if closeErr := br.Close(); closeErr != nil { // Log the close error even if we're already returning an error } }() // Read results for each queued command to properly detect errors // <-- FIX 🟢 for i := 0; i < batch.Len(); i++ { // <-- FIX 🟢 if _, err := br.Exec(); err != nil { return fmt.Errorf("failed to insert event %d: %w", i, err) } } return nil } ``` # Related bugs `pkg/db/auth.go` lines 160-163 has the same bug in the `StoreUsers` function: ```go br := db.pgPool.SendBatch(ctx, batch) if err := br.Close(); err != nil { return fmt.Errorf("failed to store batch users: %w", err) } ``` This should also be fixed using the same pattern.
Author
Owner

Imported GitHub comment.

Original author: @mfreeman451
Original URL: https://github.com/carverauto/serviceradar/issues/2153#issuecomment-3662853121
Original created: 2025-12-16T23:20:39Z


closing as completed

Imported GitHub comment. Original author: @mfreeman451 Original URL: https://github.com/carverauto/serviceradar/issues/2153#issuecomment-3662853121 Original created: 2025-12-16T23:20:39Z --- closing as completed
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
carverauto/serviceradar#691
No description provided.