Handle ErrNoRows when sending read updates

This commit is contained in:
Neil Alexander 2022-03-03 12:09:16 +00:00
parent bcc27e9e18
commit 6ed8cf0e07
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
2 changed files with 5 additions and 3 deletions

View File

@ -16,6 +16,7 @@ package consumers
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -138,12 +139,12 @@ func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID st
var readPos types.StreamPosition var readPos types.StreamPosition
var fullyReadPos types.StreamPosition var fullyReadPos types.StreamPosition
if output.ReadMarker.Read != "" { if output.ReadMarker.Read != "" {
if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil { if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("s.db.PositionInTopology (Read): %w", err) return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
} }
} }
if output.ReadMarker.FullyRead != "" { if output.ReadMarker.FullyRead != "" {
if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil { if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err) return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err)
} }
} }

View File

@ -16,6 +16,7 @@ package consumers
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -129,7 +130,7 @@ func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output
} }
var readPos types.StreamPosition var readPos types.StreamPosition
if output.EventID != "" { if output.EventID != "" {
if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil { if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("s.db.PositionInTopology (Read): %w", err) return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
} }
} }