mirror of
https://github.com/1f349/dendrite.git
synced 2024-11-25 21:21:35 +00:00
Factor out Consumer from the roomserver (#50)
This commit is contained in:
parent
e3f3eb8f3d
commit
a974b90ee9
@ -2,16 +2,18 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/matrix-org/dendrite/roomserver/input"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/query"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/input"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/query"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -43,10 +45,13 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
consumer := input.Consumer{
|
consumer := input.Consumer{
|
||||||
Consumer: kafkaConsumer,
|
ContinualConsumer: common.ContinualConsumer{
|
||||||
|
Topic: inputRoomEventTopic,
|
||||||
|
Consumer: kafkaConsumer,
|
||||||
|
PartitionStore: db,
|
||||||
|
},
|
||||||
DB: db,
|
DB: db,
|
||||||
Producer: kafkaProducer,
|
Producer: kafkaProducer,
|
||||||
InputRoomEventTopic: inputRoomEventTopic,
|
|
||||||
OutputRoomEventTopic: outputRoomEventTopic,
|
OutputRoomEventTopic: outputRoomEventTopic,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
108
src/github.com/matrix-org/dendrite/common/consumers.go
Normal file
108
src/github.com/matrix-org/dendrite/common/consumers.go
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A PartitionOffset is the offset into a partition of the input log.
|
||||||
|
type PartitionOffset struct {
|
||||||
|
// The ID of the partition.
|
||||||
|
Partition int32
|
||||||
|
// The offset into the partition.
|
||||||
|
Offset int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// A PartitionStorer has the storage APIs needed by the consumer.
|
||||||
|
type PartitionStorer interface {
|
||||||
|
// PartitionOffsets returns the offsets the consumer has reached for each partition.
|
||||||
|
PartitionOffsets(topic string) ([]PartitionOffset, error)
|
||||||
|
// SetPartitionOffset records where the consumer has reached for a partition.
|
||||||
|
SetPartitionOffset(topic string, partition int32, offset int64) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
|
||||||
|
// remember the offset it reached.
|
||||||
|
type ContinualConsumer struct {
|
||||||
|
// The kafkaesque topic to consume events from.
|
||||||
|
// This is the name used in kafka to identify the stream to consume events from.
|
||||||
|
Topic string
|
||||||
|
// A kafkaesque stream consumer providing the APIs for talking to the event source.
|
||||||
|
// The interface is taken from a client library for Apache Kafka.
|
||||||
|
// But any equivalent event streaming protocol could be made to implement the same interface.
|
||||||
|
Consumer sarama.Consumer
|
||||||
|
// A thing which can load and save partition offsets for a topic.
|
||||||
|
PartitionStore PartitionStorer
|
||||||
|
// ProcessMessage is a function which will be called for each message in the log. Return an error to
|
||||||
|
// stop processing messages. See ErrShutdown for specific control signals.
|
||||||
|
ProcessMessage func(msg *sarama.ConsumerMessage) error
|
||||||
|
// ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved.
|
||||||
|
// It is optional.
|
||||||
|
ShutdownCallback func()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer.
|
||||||
|
var ErrShutdown = fmt.Errorf("shutdown")
|
||||||
|
|
||||||
|
// Start starts the consumer consuming.
|
||||||
|
// Starts up a goroutine for each partition in the kafka stream.
|
||||||
|
// Returns nil once all the goroutines are started.
|
||||||
|
// Returns an error if it can't start consuming for any of the partitions.
|
||||||
|
func (c *ContinualConsumer) Start() error {
|
||||||
|
offsets := map[int32]int64{}
|
||||||
|
|
||||||
|
partitions, err := c.Consumer.Partitions(c.Topic)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, partition := range partitions {
|
||||||
|
// Default all the offsets to the beginning of the stream.
|
||||||
|
offsets[partition] = sarama.OffsetOldest
|
||||||
|
}
|
||||||
|
|
||||||
|
storedOffsets, err := c.PartitionStore.PartitionOffsets(c.Topic)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, offset := range storedOffsets {
|
||||||
|
// We've already processed events from this partition so advance the offset to where we got to.
|
||||||
|
offsets[offset.Partition] = offset.Offset
|
||||||
|
}
|
||||||
|
|
||||||
|
var partitionConsumers []sarama.PartitionConsumer
|
||||||
|
for partition, offset := range offsets {
|
||||||
|
pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset)
|
||||||
|
if err != nil {
|
||||||
|
for _, p := range partitionConsumers {
|
||||||
|
p.Close()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
partitionConsumers = append(partitionConsumers, pc)
|
||||||
|
}
|
||||||
|
for _, pc := range partitionConsumers {
|
||||||
|
go c.consumePartition(pc)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// consumePartition consumes the room events for a single partition of the kafkaesque stream.
|
||||||
|
func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||||
|
defer pc.Close()
|
||||||
|
for message := range pc.Messages() {
|
||||||
|
msgErr := c.ProcessMessage(message)
|
||||||
|
// Advance our position in the stream so that we will start at the right position after a restart.
|
||||||
|
if err := c.PartitionStore.SetPartitionOffset(c.Topic, message.Partition, message.Offset); err != nil {
|
||||||
|
panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err))
|
||||||
|
}
|
||||||
|
// Shutdown if we were told to do so.
|
||||||
|
if msgErr == ErrShutdown {
|
||||||
|
if c.ShutdownCallback != nil {
|
||||||
|
c.ShutdownCallback()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,9 +1,6 @@
|
|||||||
package storage
|
package common
|
||||||
|
|
||||||
import (
|
import "database/sql"
|
||||||
"database/sql"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
const partitionOffsetsSchema = `
|
const partitionOffsetsSchema = `
|
||||||
-- The offsets that the server has processed up to.
|
-- The offsets that the server has processed up to.
|
||||||
@ -26,32 +23,37 @@ const upsertPartitionOffsetsSQL = "" +
|
|||||||
" ON CONFLICT ON CONSTRAINT topic_partition_unique" +
|
" ON CONFLICT ON CONSTRAINT topic_partition_unique" +
|
||||||
" DO UPDATE SET partition_offset = $3"
|
" DO UPDATE SET partition_offset = $3"
|
||||||
|
|
||||||
type partitionOffsetStatements struct {
|
// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
|
||||||
|
type PartitionOffsetStatements struct {
|
||||||
selectPartitionOffsetsStmt *sql.Stmt
|
selectPartitionOffsetsStmt *sql.Stmt
|
||||||
upsertPartitionOffsetStmt *sql.Stmt
|
upsertPartitionOffsetStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *partitionOffsetStatements) prepare(db *sql.DB) (err error) {
|
// Prepare converts the raw SQL statements into prepared statements.
|
||||||
|
func (s *PartitionOffsetStatements) Prepare(db *sql.DB) (err error) {
|
||||||
_, err = db.Exec(partitionOffsetsSchema)
|
_, err = db.Exec(partitionOffsetsSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil {
|
||||||
return statementList{
|
return
|
||||||
{&s.selectPartitionOffsetsStmt, selectPartitionOffsetsSQL},
|
}
|
||||||
{&s.upsertPartitionOffsetStmt, upsertPartitionOffsetsSQL},
|
if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil {
|
||||||
}.prepare(db)
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *partitionOffsetStatements) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) {
|
// SelectPartitionOffsets returns all the partition offsets for the given topic.
|
||||||
|
func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]PartitionOffset, error) {
|
||||||
rows, err := s.selectPartitionOffsetsStmt.Query(topic)
|
rows, err := s.selectPartitionOffsetsStmt.Query(topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
var results []types.PartitionOffset
|
var results []PartitionOffset
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var offset types.PartitionOffset
|
var offset PartitionOffset
|
||||||
if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil {
|
if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -59,7 +61,8 @@ func (s *partitionOffsetStatements) selectPartitionOffsets(topic string) ([]type
|
|||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *partitionOffsetStatements) upsertPartitionOffset(topic string, partition int32, offset int64) error {
|
// UpsertPartitionOffset updates or inserts the partition offset for the given topic.
|
||||||
|
func (s *PartitionOffsetStatements) UpsertPartitionOffset(topic string, partition int32, offset int64) error {
|
||||||
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
|
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
@ -4,19 +4,17 @@ package input
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A ConsumerDatabase has the storage APIs needed by the consumer.
|
// A ConsumerDatabase has the storage APIs needed by the consumer.
|
||||||
type ConsumerDatabase interface {
|
type ConsumerDatabase interface {
|
||||||
RoomEventDatabase
|
RoomEventDatabase
|
||||||
// PartitionOffsets returns the offsets the consumer has reached for each partition.
|
common.PartitionStorer
|
||||||
PartitionOffsets(topic string) ([]types.PartitionOffset, error)
|
|
||||||
// SetPartitionOffset records where the consumer has reached for a partition.
|
|
||||||
SetPartitionOffset(topic string, partition int32, offset int64) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// An ErrorLogger handles the errors encountered by the consumer.
|
// An ErrorLogger handles the errors encountered by the consumer.
|
||||||
@ -31,16 +29,10 @@ type ErrorLogger interface {
|
|||||||
// The events needed to construct the state at the event should already be stored on the roomserver.
|
// The events needed to construct the state at the event should already be stored on the roomserver.
|
||||||
// If the event is not valid then it will be discarded and an error will be logged.
|
// If the event is not valid then it will be discarded and an error will be logged.
|
||||||
type Consumer struct {
|
type Consumer struct {
|
||||||
// A kafkaesque stream consumer providing the APIs for talking to the event source.
|
ContinualConsumer common.ContinualConsumer
|
||||||
// The interface is taken from a client library for Apache Kafka.
|
|
||||||
// But any equivalent event streaming protocol could be made to implement the same interface.
|
|
||||||
Consumer sarama.Consumer
|
|
||||||
// The database used to store the room events.
|
// The database used to store the room events.
|
||||||
DB ConsumerDatabase
|
DB ConsumerDatabase
|
||||||
Producer sarama.SyncProducer
|
Producer sarama.SyncProducer
|
||||||
// The kafkaesque topic to consume room events from.
|
|
||||||
// This is the name used in kafka to identify the stream to consume events from.
|
|
||||||
InputRoomEventTopic string
|
|
||||||
// The kafkaesque topic to output new room events to.
|
// The kafkaesque topic to output new room events to.
|
||||||
// This is the name used in kafka to identify the stream to write events to.
|
// This is the name used in kafka to identify the stream to write events to.
|
||||||
OutputRoomEventTopic string
|
OutputRoomEventTopic string
|
||||||
@ -75,79 +67,42 @@ func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error {
|
|||||||
// Returns nil once all the goroutines are started.
|
// Returns nil once all the goroutines are started.
|
||||||
// Returns an error if it can't start consuming for any of the partitions.
|
// Returns an error if it can't start consuming for any of the partitions.
|
||||||
func (c *Consumer) Start() error {
|
func (c *Consumer) Start() error {
|
||||||
offsets := map[int32]int64{}
|
c.ContinualConsumer.ProcessMessage = c.processMessage
|
||||||
|
c.ContinualConsumer.ShutdownCallback = c.shutdown
|
||||||
|
return c.ContinualConsumer.Start()
|
||||||
|
}
|
||||||
|
|
||||||
partitions, err := c.Consumer.Partitions(c.InputRoomEventTopic)
|
func (c *Consumer) processMessage(message *sarama.ConsumerMessage) error {
|
||||||
if err != nil {
|
var input api.InputRoomEvent
|
||||||
return err
|
if err := json.Unmarshal(message.Value, &input); err != nil {
|
||||||
}
|
// If the message is invalid then log it and move onto the next message in the stream.
|
||||||
for _, partition := range partitions {
|
c.logError(message, err)
|
||||||
// Default all the offsets to the beginning of the stream.
|
} else {
|
||||||
offsets[partition] = sarama.OffsetOldest
|
if err := processRoomEvent(c.DB, c, input); err != nil {
|
||||||
}
|
// If there was an error processing the message then log it and
|
||||||
|
// move onto the next message in the stream.
|
||||||
storedOffsets, err := c.DB.PartitionOffsets(c.InputRoomEventTopic)
|
// TODO: If the error was due to a problem talking to the database
|
||||||
if err != nil {
|
// then we shouldn't move onto the next message and we should either
|
||||||
return err
|
// retry processing the message, or panic and kill ourselves.
|
||||||
}
|
c.logError(message, err)
|
||||||
for _, offset := range storedOffsets {
|
|
||||||
// We've already processed events from this partition so advance the offset to where we got to.
|
|
||||||
offsets[offset.Partition] = offset.Offset
|
|
||||||
}
|
|
||||||
|
|
||||||
var partitionConsumers []sarama.PartitionConsumer
|
|
||||||
for partition, offset := range offsets {
|
|
||||||
pc, err := c.Consumer.ConsumePartition(c.InputRoomEventTopic, partition, offset)
|
|
||||||
if err != nil {
|
|
||||||
for _, p := range partitionConsumers {
|
|
||||||
p.Close()
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
partitionConsumers = append(partitionConsumers, pc)
|
|
||||||
}
|
}
|
||||||
for _, pc := range partitionConsumers {
|
// Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
|
||||||
go c.consumePartition(pc)
|
processed := atomic.AddInt64(&c.processed, 1)
|
||||||
|
// Check if we should stop processing.
|
||||||
|
// Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
|
||||||
|
// If we try to stop processing after M message and we have N goroutines then we will process somewhere
|
||||||
|
// between M and (N + M) messages because the N goroutines could all try to process what they think will be the
|
||||||
|
// last message. We could be more careful here but this is good enough for getting rough benchmarks.
|
||||||
|
if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) {
|
||||||
|
return common.ErrShutdown
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// consumePartition consumes the room events for a single partition of the kafkaesque stream.
|
func (c *Consumer) shutdown() {
|
||||||
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
if c.ShutdownCallback != nil {
|
||||||
defer pc.Close()
|
c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed))
|
||||||
for message := range pc.Messages() {
|
|
||||||
var input api.InputRoomEvent
|
|
||||||
if err := json.Unmarshal(message.Value, &input); err != nil {
|
|
||||||
// If the message is invalid then log it and move onto the next message in the stream.
|
|
||||||
c.logError(message, err)
|
|
||||||
} else {
|
|
||||||
if err := processRoomEvent(c.DB, c, input); err != nil {
|
|
||||||
// If there was an error processing the message then log it and
|
|
||||||
// move onto the next message in the stream.
|
|
||||||
// TODO: If the error was due to a problem talking to the database
|
|
||||||
// then we shouldn't move onto the next message and we should either
|
|
||||||
// retry processing the message, or panic and kill ourselves.
|
|
||||||
c.logError(message, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Advance our position in the stream so that we will start at the right position after a restart.
|
|
||||||
if err := c.DB.SetPartitionOffset(c.InputRoomEventTopic, message.Partition, message.Offset); err != nil {
|
|
||||||
c.logError(message, err)
|
|
||||||
}
|
|
||||||
// Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
|
|
||||||
processed := atomic.AddInt64(&c.processed, 1)
|
|
||||||
// Check if we should stop processing.
|
|
||||||
// Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
|
|
||||||
// If we try to stop processing after M message and we have N goroutines then we will process somewhere
|
|
||||||
// between M and (N + M) messages because the N goroutines could all try to process what they think will be the
|
|
||||||
// last message. We could be more careful here but this is good enough for getting rough benchmarks.
|
|
||||||
if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) {
|
|
||||||
if c.ShutdownCallback != nil {
|
|
||||||
c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,10 +2,12 @@ package storage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
type statements struct {
|
type statements struct {
|
||||||
partitionOffsetStatements
|
common.PartitionOffsetStatements
|
||||||
eventTypeStatements
|
eventTypeStatements
|
||||||
eventStateKeyStatements
|
eventStateKeyStatements
|
||||||
roomStatements
|
roomStatements
|
||||||
@ -19,7 +21,7 @@ type statements struct {
|
|||||||
func (s *statements) prepare(db *sql.DB) error {
|
func (s *statements) prepare(db *sql.DB) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if err = s.partitionOffsetStatements.prepare(db); err != nil {
|
if err = s.PartitionOffsetStatements.Prepare(db); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
@ -28,13 +29,13 @@ func Open(dataSourceName string) (*Database, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PartitionOffsets implements input.ConsumerDatabase
|
// PartitionOffsets implements input.ConsumerDatabase
|
||||||
func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) {
|
func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
||||||
return d.statements.selectPartitionOffsets(topic)
|
return d.statements.SelectPartitionOffsets(topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetPartitionOffset implements input.ConsumerDatabase
|
// SetPartitionOffset implements input.ConsumerDatabase
|
||||||
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
||||||
return d.statements.upsertPartitionOffset(topic, partition, offset)
|
return d.statements.UpsertPartitionOffset(topic, partition, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreEvent implements input.EventDatabase
|
// StoreEvent implements input.EventDatabase
|
||||||
|
@ -5,14 +5,6 @@ import (
|
|||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A PartitionOffset is the offset into a partition of the input log.
|
|
||||||
type PartitionOffset struct {
|
|
||||||
// The ID of the partition.
|
|
||||||
Partition int32
|
|
||||||
// The offset into the partition.
|
|
||||||
Offset int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventTypeNID is a numeric ID for an event type.
|
// EventTypeNID is a numeric ID for an event type.
|
||||||
type EventTypeNID int64
|
type EventTypeNID int64
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user