Add opt-in anonymous stats reporting (#2249)

* Initial phone home stats queries

* Add userAgent to UpdateDeviceLastSeen
Add new Table for tracking daily user vists

* Add user_daily_visits table

* Fix queries

* userapi stats tables & queries

* userapi interface and internal api

* sycnapi stats queries

* testing phone home stats

* Add complete config to syncapi

* add missing files

* Fix queries

* Send empty request

* Add version & monolith stats

* Add configuration for phone home stats

* Move WASM to its own file, add config and comments

* Add tracing methods

* Add total rooms

* Add more fields, actually send data somewhere

* Move stats to the userapi

* Move phone home stats to util package

* Cleanup

* Linter & parts of GH comments

* More GH comments changes
- Move comments to SQL statements
- Shrink interface, add struct for stats
- No fatal errors, use defaults

* Be more explicit when querying

* Fix wrong calculation & wrong query params
Add tests

* Add Windows stats

* ADd build constraint

* Use new testing structure
Fix issues with getting values when using SQLite
Fix wrong AddDate value
Export UpdateUserDailyVisits

* Fix query params

* Fix test

* Add comment about countR30UsersSQL and countR30UsersV2SQL; fix test

* Update config

* Also update example config file

* Use OS level proxy, update logging

Co-authored-by: kegsay <kegan@matrix.org>
This commit is contained in:
Till 2022-05-04 19:04:28 +02:00 committed by GitHub
parent b0a9e85c4a
commit 3c940c428d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1582 additions and 14 deletions

View File

@ -31,7 +31,7 @@ import (
type entrypoint func(base *base.BaseDendrite, cfg *config.Dendrite) type entrypoint func(base *base.BaseDendrite, cfg *config.Dendrite)
func main() { func main() {
cfg := setup.ParseFlags(true) cfg := setup.ParseFlags(false)
component := "" component := ""
if flag.NFlag() > 0 { if flag.NFlag() > 0 {

View File

@ -85,6 +85,15 @@ global:
# Whether outbound presence events are allowed, e.g. sending presence events to other servers # Whether outbound presence events are allowed, e.g. sending presence events to other servers
enable_outbound: false enable_outbound: false
# Configures opt-in anonymous stats reporting.
report_stats:
# Whether this instance sends anonymous usage stats
enabled: false
# The endpoint to report the anonymized homeserver usage statistics to.
# Defaults to https://matrix.org/report-usage-stats/push
endpoint: https://matrix.org/report-usage-stats/push
# Server notices allows server admins to send messages to all users. # Server notices allows server admins to send messages to all users.
server_notices: server_notices:
enabled: false enabled: false

View File

@ -78,6 +78,8 @@ type Dendrite struct {
// Any information derived from the configuration options for later use. // Any information derived from the configuration options for later use.
Derived Derived `yaml:"-"` Derived Derived `yaml:"-"`
IsMonolith bool `yaml:"-"`
} }
// TODO: Kill Derived // TODO: Kill Derived
@ -210,6 +212,7 @@ func loadConfig(
) (*Dendrite, error) { ) (*Dendrite, error) {
var c Dendrite var c Dendrite
c.Defaults(false) c.Defaults(false)
c.IsMonolith = monolithic
var err error var err error
if err = yaml.Unmarshal(configData, &c); err != nil { if err = yaml.Unmarshal(configData, &c); err != nil {

View File

@ -70,6 +70,9 @@ type Global struct {
// ServerNotices configuration used for sending server notices // ServerNotices configuration used for sending server notices
ServerNotices ServerNotices `yaml:"server_notices"` ServerNotices ServerNotices `yaml:"server_notices"`
// ReportStats configures opt-in anonymous stats reporting.
ReportStats ReportStats `yaml:"report_stats"`
} }
func (c *Global) Defaults(generate bool) { func (c *Global) Defaults(generate bool) {
@ -86,6 +89,7 @@ func (c *Global) Defaults(generate bool) {
c.DNSCache.Defaults() c.DNSCache.Defaults()
c.Sentry.Defaults() c.Sentry.Defaults()
c.ServerNotices.Defaults(generate) c.ServerNotices.Defaults(generate)
c.ReportStats.Defaults()
} }
func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) { func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) {
@ -97,6 +101,7 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) {
c.Sentry.Verify(configErrs, isMonolith) c.Sentry.Verify(configErrs, isMonolith)
c.DNSCache.Verify(configErrs, isMonolith) c.DNSCache.Verify(configErrs, isMonolith)
c.ServerNotices.Verify(configErrs, isMonolith) c.ServerNotices.Verify(configErrs, isMonolith)
c.ReportStats.Verify(configErrs, isMonolith)
} }
type OldVerifyKeys struct { type OldVerifyKeys struct {
@ -163,6 +168,26 @@ func (c *ServerNotices) Defaults(generate bool) {
func (c *ServerNotices) Verify(errors *ConfigErrors, isMonolith bool) {} func (c *ServerNotices) Verify(errors *ConfigErrors, isMonolith bool) {}
// ReportStats configures opt-in anonymous stats reporting.
type ReportStats struct {
// Enabled configures anonymous usage stats of the server
Enabled bool `yaml:"enabled"`
// Endpoint the endpoint to report stats to
Endpoint string `yaml:"endpoint"`
}
func (c *ReportStats) Defaults() {
c.Enabled = false
c.Endpoint = "https://matrix.org/report-usage-stats/push"
}
func (c *ReportStats) Verify(configErrs *ConfigErrors, isMonolith bool) {
if c.Enabled {
checkNotEmpty(configErrs, "global.report_stats.endpoint", c.Endpoint)
}
}
// The configuration to use for Sentry error reporting // The configuration to use for Sentry error reporting
type Sentry struct { type Sentry struct {
Enabled bool `yaml:"enabled"` Enabled bool `yaml:"enabled"`

View File

@ -182,6 +182,7 @@ func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device)
UserID: device.UserID, UserID: device.UserID,
DeviceID: device.ID, DeviceID: device.ID,
RemoteAddr: remoteAddr, RemoteAddr: remoteAddr,
UserAgent: req.UserAgent(),
} }
lsres := &userapi.PerformLastSeenUpdateResponse{} lsres := &userapi.PerformLastSeenUpdateResponse{}
go rp.userAPI.PerformLastSeenUpdate(req.Context(), lsreq, lsres) // nolint:errcheck go rp.userAPI.PerformLastSeenUpdate(req.Context(), lsreq, lsres) // nolint:errcheck

View File

@ -320,6 +320,7 @@ type PerformLastSeenUpdateRequest struct {
UserID string UserID string
DeviceID string DeviceID string
RemoteAddr string RemoteAddr string
UserAgent string
} }
// PerformLastSeenUpdateResponse is the response for PerformLastSeenUpdate. // PerformLastSeenUpdateResponse is the response for PerformLastSeenUpdate.

View File

@ -210,7 +210,7 @@ func (a *UserInternalAPI) PerformLastSeenUpdate(
if err != nil { if err != nil {
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
} }
if err := a.DB.UpdateDeviceLastSeen(ctx, localpart, req.DeviceID, req.RemoteAddr); err != nil { if err := a.DB.UpdateDeviceLastSeen(ctx, localpart, req.DeviceID, req.RemoteAddr, req.UserAgent); err != nil {
return fmt.Errorf("a.DeviceDB.UpdateDeviceLastSeen: %w", err) return fmt.Errorf("a.DeviceDB.UpdateDeviceLastSeen: %w", err)
} }
return nil return nil

View File

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types"
) )
type Profile interface { type Profile interface {
@ -67,7 +68,7 @@ type Device interface {
// Returns the device on success. // Returns the device on success.
CreateDevice(ctx context.Context, localpart string, deviceID *string, accessToken string, displayName *string, ipAddr, userAgent string) (dev *api.Device, returnErr error) CreateDevice(ctx context.Context, localpart string, deviceID *string, accessToken string, displayName *string, ipAddr, userAgent string) (dev *api.Device, returnErr error)
UpdateDevice(ctx context.Context, localpart, deviceID string, displayName *string) error UpdateDevice(ctx context.Context, localpart, deviceID string, displayName *string) error
UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr string) error UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr, userAgent string) error
RemoveDevices(ctx context.Context, localpart string, devices []string) error RemoveDevices(ctx context.Context, localpart string, devices []string) error
// RemoveAllDevices deleted all devices for this user. Returns the devices deleted. // RemoveAllDevices deleted all devices for this user. Returns the devices deleted.
RemoveAllDevices(ctx context.Context, localpart, exceptDeviceID string) (devices []api.Device, err error) RemoveAllDevices(ctx context.Context, localpart, exceptDeviceID string) (devices []api.Device, err error)
@ -135,9 +136,14 @@ type Database interface {
OpenID OpenID
Profile Profile
Pusher Pusher
Statistics
ThreePID ThreePID
} }
type Statistics interface {
UserStatistics(ctx context.Context) (*types.UserStatistics, *types.DatabaseEngine, error)
}
// Err3PIDInUse is the error returned when trying to save an association involving // Err3PIDInUse is the error returned when trying to save an association involving
// a third-party identifier which is already associated to a local user. // a third-party identifier which is already associated to a local user.
var Err3PIDInUse = errors.New("this third-party identifier is already in use") var Err3PIDInUse = errors.New("this third-party identifier is already in use")

View File

@ -96,7 +96,7 @@ const selectDevicesByIDSQL = "" +
"SELECT device_id, localpart, display_name, last_seen_ts FROM device_devices WHERE device_id = ANY($1) ORDER BY last_seen_ts DESC" "SELECT device_id, localpart, display_name, last_seen_ts FROM device_devices WHERE device_id = ANY($1) ORDER BY last_seen_ts DESC"
const updateDeviceLastSeen = "" + const updateDeviceLastSeen = "" +
"UPDATE device_devices SET last_seen_ts = $1, ip = $2 WHERE localpart = $3 AND device_id = $4" "UPDATE device_devices SET last_seen_ts = $1, ip = $2, user_agent = $3 WHERE localpart = $4 AND device_id = $5"
type devicesStatements struct { type devicesStatements struct {
insertDeviceStmt *sql.Stmt insertDeviceStmt *sql.Stmt
@ -304,9 +304,9 @@ func (s *devicesStatements) SelectDevicesByLocalpart(
return devices, rows.Err() return devices, rows.Err()
} }
func (s *devicesStatements) UpdateDeviceLastSeen(ctx context.Context, txn *sql.Tx, localpart, deviceID, ipAddr string) error { func (s *devicesStatements) UpdateDeviceLastSeen(ctx context.Context, txn *sql.Tx, localpart, deviceID, ipAddr, userAgent string) error {
lastSeenTs := time.Now().UnixNano() / 1000000 lastSeenTs := time.Now().UnixNano() / 1000000
stmt := sqlutil.TxStmt(txn, s.updateDeviceLastSeenStmt) stmt := sqlutil.TxStmt(txn, s.updateDeviceLastSeenStmt)
_, err := stmt.ExecContext(ctx, lastSeenTs, ipAddr, localpart, deviceID) _, err := stmt.ExecContext(ctx, lastSeenTs, ipAddr, userAgent, localpart, deviceID)
return err return err
} }

View File

@ -0,0 +1,437 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"time"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
const userDailyVisitsSchema = `
CREATE TABLE IF NOT EXISTS user_daily_visits (
localpart TEXT NOT NULL,
device_id TEXT NOT NULL,
timestamp BIGINT NOT NULL,
user_agent TEXT
);
-- Device IDs and timestamp must be unique for a given user per day
CREATE UNIQUE INDEX IF NOT EXISTS localpart_device_timestamp_idx ON user_daily_visits(localpart, device_id, timestamp);
CREATE INDEX IF NOT EXISTS timestamp_idx ON user_daily_visits(timestamp);
CREATE INDEX IF NOT EXISTS localpart_timestamp_idx ON user_daily_visits(localpart, timestamp);
`
const countUsersLastSeenAfterSQL = "" +
"SELECT COUNT(*) FROM (" +
" SELECT localpart FROM device_devices WHERE last_seen_ts > $1 " +
" GROUP BY localpart" +
" ) u"
// Note on the following countR30UsersSQL and countR30UsersV2SQL: The different checks are intentional.
// This is to ensure the values reported by Dendrite are the same as by Synapse.
// Queries are taken from: https://github.com/matrix-org/synapse/blob/9ce51a47f6e37abd0a1275281806399d874eb026/synapse/storage/databases/main/stats.py
/*
R30Users counts the number of 30 day retained users, defined as:
- Users who have created their accounts more than 30 days ago
- Where last seen at most 30 days ago
- Where account creation and last_seen are > 30 days apart
*/
const countR30UsersSQL = `
SELECT platform, COUNT(*) FROM (
SELECT users.localpart, platform, users.created_ts, MAX(uip.last_seen_ts)
FROM account_accounts users
INNER JOIN
(SELECT
localpart, last_seen_ts,
CASE
WHEN user_agent LIKE '%%Android%%' THEN 'android'
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
ELSE 'unknown'
END
AS platform
FROM device_devices
) uip
ON users.localpart = uip.localpart
AND users.account_type <> 4
AND users.created_ts < $1
AND uip.last_seen_ts > $1
AND (uip.last_seen_ts) - users.created_ts > $2
GROUP BY users.localpart, platform, users.created_ts
) u GROUP BY PLATFORM
`
/*
R30UsersV2 counts the number of 30 day retained users, defined as users that:
- Appear more than once in the past 60 days
- Have more than 30 days between the most and least recent appearances that occurred in the past 60 days.
*/
const countR30UsersV2SQL = `
SELECT
client_type,
count(client_type)
FROM
(
SELECT
localpart,
CASE
WHEN
LOWER(user_agent) LIKE '%%riot%%' OR
LOWER(user_agent) LIKE '%%element%%'
THEN CASE
WHEN LOWER(user_agent) LIKE '%%electron%%' THEN 'electron'
WHEN LOWER(user_agent) LIKE '%%android%%' THEN 'android'
WHEN LOWER(user_agent) LIKE '%%ios%%' THEN 'ios'
ELSE 'unknown'
END
WHEN LOWER(user_agent) LIKE '%%mozilla%%' OR LOWER(user_agent) LIKE '%%gecko%%' THEN 'web'
ELSE 'unknown'
END as client_type
FROM user_daily_visits
WHERE timestamp > $1 AND timestamp < $2
GROUP BY localpart, client_type
HAVING max(timestamp) - min(timestamp) > $3
) AS temp
GROUP BY client_type
`
const countUserByAccountTypeSQL = `
SELECT COUNT(*) FROM account_accounts WHERE account_type = ANY($1)
`
// $1 = All non guest AccountType IDs
// $2 = Guest AccountType
const countRegisteredUserByTypeStmt = `
SELECT user_type, COUNT(*) AS count FROM (
SELECT
CASE
WHEN account_type = ANY($1) AND appservice_id IS NULL THEN 'native'
WHEN account_type = $2 AND appservice_id IS NULL THEN 'guest'
WHEN account_type = ANY($1) AND appservice_id IS NOT NULL THEN 'bridged'
END AS user_type
FROM account_accounts
WHERE created_ts > $3
) AS t GROUP BY user_type
`
// account_type 1 = users; 3 = admins
const updateUserDailyVisitsSQL = `
INSERT INTO user_daily_visits(localpart, device_id, timestamp, user_agent)
SELECT u.localpart, u.device_id, $1, MAX(u.user_agent)
FROM device_devices AS u
LEFT JOIN (
SELECT localpart, device_id, timestamp FROM user_daily_visits
WHERE timestamp = $1
) udv
ON u.localpart = udv.localpart AND u.device_id = udv.device_id
INNER JOIN device_devices d ON d.localpart = u.localpart
INNER JOIN account_accounts a ON a.localpart = u.localpart
WHERE $2 <= d.last_seen_ts AND d.last_seen_ts < $3
AND a.account_type in (1, 3)
GROUP BY u.localpart, u.device_id
ON CONFLICT (localpart, device_id, timestamp) DO NOTHING
;
`
const queryDBEngineVersion = "SHOW server_version;"
type statsStatements struct {
serverName gomatrixserverlib.ServerName
lastUpdate time.Time
countUsersLastSeenAfterStmt *sql.Stmt
countR30UsersStmt *sql.Stmt
countR30UsersV2Stmt *sql.Stmt
updateUserDailyVisitsStmt *sql.Stmt
countUserByAccountTypeStmt *sql.Stmt
countRegisteredUserByTypeStmt *sql.Stmt
dbEngineVersionStmt *sql.Stmt
}
func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.StatsTable, error) {
s := &statsStatements{
serverName: serverName,
lastUpdate: time.Now(),
}
_, err := db.Exec(userDailyVisitsSchema)
if err != nil {
return nil, err
}
go s.startTimers()
return s, sqlutil.StatementList{
{&s.countUsersLastSeenAfterStmt, countUsersLastSeenAfterSQL},
{&s.countR30UsersStmt, countR30UsersSQL},
{&s.countR30UsersV2Stmt, countR30UsersV2SQL},
{&s.updateUserDailyVisitsStmt, updateUserDailyVisitsSQL},
{&s.countUserByAccountTypeStmt, countUserByAccountTypeSQL},
{&s.countRegisteredUserByTypeStmt, countRegisteredUserByTypeStmt},
{&s.dbEngineVersionStmt, queryDBEngineVersion},
}.Prepare(db)
}
func (s *statsStatements) startTimers() {
var updateStatsFunc func()
updateStatsFunc = func() {
logrus.Infof("Executing UpdateUserDailyVisits")
if err := s.UpdateUserDailyVisits(context.Background(), nil, time.Now(), s.lastUpdate); err != nil {
logrus.WithError(err).Error("failed to update daily user visits")
}
time.AfterFunc(time.Hour*3, updateStatsFunc)
}
time.AfterFunc(time.Minute*5, updateStatsFunc)
}
func (s *statsStatements) allUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countUserByAccountTypeStmt)
err = stmt.QueryRowContext(ctx,
pq.Int64Array{
int64(api.AccountTypeUser),
int64(api.AccountTypeGuest),
int64(api.AccountTypeAdmin),
int64(api.AccountTypeAppService),
},
).Scan(&result)
return
}
func (s *statsStatements) nonBridgedUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countUserByAccountTypeStmt)
err = stmt.QueryRowContext(ctx,
pq.Int64Array{
int64(api.AccountTypeUser),
int64(api.AccountTypeGuest),
int64(api.AccountTypeAdmin),
},
).Scan(&result)
return
}
func (s *statsStatements) registeredUserByType(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
stmt := sqlutil.TxStmt(txn, s.countRegisteredUserByTypeStmt)
registeredAfter := time.Now().AddDate(0, 0, -30)
rows, err := stmt.QueryContext(ctx,
pq.Int64Array{
int64(api.AccountTypeUser),
int64(api.AccountTypeAdmin),
int64(api.AccountTypeAppService),
},
api.AccountTypeGuest,
gomatrixserverlib.AsTimestamp(registeredAfter),
)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "RegisteredUserByType: failed to close rows")
var userType string
var count int64
var result = make(map[string]int64)
for rows.Next() {
if err = rows.Scan(&userType, &count); err != nil {
return nil, err
}
result[userType] = count
}
return result, rows.Err()
}
func (s *statsStatements) dailyUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countUsersLastSeenAfterStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -1)
err = stmt.QueryRowContext(ctx,
gomatrixserverlib.AsTimestamp(lastSeenAfter),
).Scan(&result)
return
}
func (s *statsStatements) monthlyUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countUsersLastSeenAfterStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -30)
err = stmt.QueryRowContext(ctx,
gomatrixserverlib.AsTimestamp(lastSeenAfter),
).Scan(&result)
return
}
/*
R30Users counts the number of 30 day retained users, defined as:
- Users who have created their accounts more than 30 days ago
- Where last seen at most 30 days ago
- Where account creation and last_seen are > 30 days apart
*/
func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
stmt := sqlutil.TxStmt(txn, s.countR30UsersStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -30)
diff := time.Hour * 24 * 30
rows, err := stmt.QueryContext(ctx,
gomatrixserverlib.AsTimestamp(lastSeenAfter),
diff.Milliseconds(),
)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "R30Users: failed to close rows")
var platform string
var count int64
var result = make(map[string]int64)
for rows.Next() {
if err = rows.Scan(&platform, &count); err != nil {
return nil, err
}
if platform == "unknown" {
continue
}
result["all"] += count
result[platform] = count
}
return result, rows.Err()
}
/*
R30UsersV2 counts the number of 30 day retained users, defined as users that:
- Appear more than once in the past 60 days
- Have more than 30 days between the most and least recent appearances that occurred in the past 60 days.
*/
func (s *statsStatements) r30UsersV2(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
stmt := sqlutil.TxStmt(txn, s.countR30UsersV2Stmt)
sixtyDaysAgo := time.Now().AddDate(0, 0, -60)
diff := time.Hour * 24 * 30
tomorrow := time.Now().Add(time.Hour * 24)
rows, err := stmt.QueryContext(ctx,
gomatrixserverlib.AsTimestamp(sixtyDaysAgo),
gomatrixserverlib.AsTimestamp(tomorrow),
diff.Milliseconds(),
)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "R30UsersV2: failed to close rows")
var platform string
var count int64
var result = map[string]int64{
"ios": 0,
"android": 0,
"web": 0,
"electron": 0,
"all": 0,
}
for rows.Next() {
if err = rows.Scan(&platform, &count); err != nil {
return nil, err
}
if _, ok := result[platform]; !ok {
continue
}
result["all"] += count
result[platform] = count
}
return result, rows.Err()
}
// UserStatistics collects some information about users on this instance.
// Returns the stats itself as well as the database engine version and type.
// On error, returns the stats collected up to the error.
func (s *statsStatements) UserStatistics(ctx context.Context, txn *sql.Tx) (*types.UserStatistics, *types.DatabaseEngine, error) {
var (
stats = &types.UserStatistics{
R30UsersV2: map[string]int64{
"ios": 0,
"android": 0,
"web": 0,
"electron": 0,
"all": 0,
},
R30Users: map[string]int64{},
RegisteredUsersByType: map[string]int64{},
}
dbEngine = &types.DatabaseEngine{Engine: "Postgres", Version: "unknown"}
err error
)
stats.AllUsers, err = s.allUsers(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.DailyUsers, err = s.dailyUsers(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.MonthlyUsers, err = s.monthlyUsers(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.R30Users, err = s.r30Users(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.R30UsersV2, err = s.r30UsersV2(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.NonBridgedUsers, err = s.nonBridgedUsers(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.RegisteredUsersByType, err = s.registeredUserByType(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stmt := sqlutil.TxStmt(txn, s.dbEngineVersionStmt)
err = stmt.QueryRowContext(ctx).Scan(&dbEngine.Version)
return stats, dbEngine, err
}
func (s *statsStatements) UpdateUserDailyVisits(
ctx context.Context, txn *sql.Tx,
startTime, lastUpdate time.Time,
) error {
stmt := sqlutil.TxStmt(txn, s.updateUserDailyVisitsStmt)
startTime = startTime.Truncate(time.Hour * 24)
// edge case
if startTime.After(s.lastUpdate) {
startTime = startTime.AddDate(0, 0, -1)
}
_, err := stmt.ExecContext(ctx,
gomatrixserverlib.AsTimestamp(startTime),
gomatrixserverlib.AsTimestamp(lastUpdate),
gomatrixserverlib.AsTimestamp(time.Now()),
)
if err == nil {
s.lastUpdate = time.Now()
}
return err
}

View File

@ -94,6 +94,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil { if err != nil {
return nil, fmt.Errorf("NewPostgresNotificationTable: %w", err) return nil, fmt.Errorf("NewPostgresNotificationTable: %w", err)
} }
statsTable, err := NewPostgresStatsTable(db, serverName)
if err != nil {
return nil, fmt.Errorf("NewPostgresStatsTable: %w", err)
}
return &shared.Database{ return &shared.Database{
AccountDatas: accountDataTable, AccountDatas: accountDataTable,
Accounts: accountsTable, Accounts: accountsTable,
@ -106,6 +110,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
ThreePIDs: threePIDTable, ThreePIDs: threePIDTable,
Pushers: pusherTable, Pushers: pusherTable,
Notifications: notificationsTable, Notifications: notificationsTable,
Stats: statsTable,
ServerName: serverName, ServerName: serverName,
DB: db, DB: db,
Writer: writer, Writer: writer,

View File

@ -26,6 +26,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/bcrypt" "golang.org/x/crypto/bcrypt"
@ -51,6 +52,7 @@ type Database struct {
LoginTokens tables.LoginTokenTable LoginTokens tables.LoginTokenTable
Notifications tables.NotificationTable Notifications tables.NotificationTable
Pushers tables.PusherTable Pushers tables.PusherTable
Stats tables.StatsTable
LoginTokenLifetime time.Duration LoginTokenLifetime time.Duration
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
BcryptCost int BcryptCost int
@ -611,10 +613,10 @@ func (d *Database) RemoveAllDevices(
return return
} }
// UpdateDeviceLastSeen updates a the last seen timestamp and the ip address // UpdateDeviceLastSeen updates a last seen timestamp and the ip address.
func (d *Database) UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr string) error { func (d *Database) UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr, userAgent string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.Devices.UpdateDeviceLastSeen(ctx, txn, localpart, deviceID, ipAddr) return d.Devices.UpdateDeviceLastSeen(ctx, txn, localpart, deviceID, ipAddr, userAgent)
}) })
} }
@ -756,3 +758,8 @@ func (d *Database) RemovePushers(
return d.Pushers.DeletePushers(ctx, txn, appid, pushkey) return d.Pushers.DeletePushers(ctx, txn, appid, pushkey)
}) })
} }
// UserStatistics populates types.UserStatistics, used in reports.
func (d *Database) UserStatistics(ctx context.Context) (*types.UserStatistics, *types.DatabaseEngine, error) {
return d.Stats.UserStatistics(ctx, nil)
}

View File

@ -81,7 +81,7 @@ const selectDevicesByIDSQL = "" +
"SELECT device_id, localpart, display_name, last_seen_ts FROM device_devices WHERE device_id IN ($1) ORDER BY last_seen_ts DESC" "SELECT device_id, localpart, display_name, last_seen_ts FROM device_devices WHERE device_id IN ($1) ORDER BY last_seen_ts DESC"
const updateDeviceLastSeen = "" + const updateDeviceLastSeen = "" +
"UPDATE device_devices SET last_seen_ts = $1, ip = $2 WHERE localpart = $3 AND device_id = $4" "UPDATE device_devices SET last_seen_ts = $1, ip = $2, user_agent = $3 WHERE localpart = $4 AND device_id = $5"
type devicesStatements struct { type devicesStatements struct {
db *sql.DB db *sql.DB
@ -306,9 +306,9 @@ func (s *devicesStatements) SelectDevicesByID(ctx context.Context, deviceIDs []s
return devices, rows.Err() return devices, rows.Err()
} }
func (s *devicesStatements) UpdateDeviceLastSeen(ctx context.Context, txn *sql.Tx, localpart, deviceID, ipAddr string) error { func (s *devicesStatements) UpdateDeviceLastSeen(ctx context.Context, txn *sql.Tx, localpart, deviceID, ipAddr, userAgent string) error {
lastSeenTs := time.Now().UnixNano() / 1000000 lastSeenTs := time.Now().UnixNano() / 1000000
stmt := sqlutil.TxStmt(txn, s.updateDeviceLastSeenStmt) stmt := sqlutil.TxStmt(txn, s.updateDeviceLastSeenStmt)
_, err := stmt.ExecContext(ctx, lastSeenTs, ipAddr, localpart, deviceID) _, err := stmt.ExecContext(ctx, lastSeenTs, ipAddr, userAgent, localpart, deviceID)
return err return err
} }

View File

@ -0,0 +1,452 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"strings"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
const userDailyVisitsSchema = `
CREATE TABLE IF NOT EXISTS user_daily_visits (
localpart TEXT NOT NULL,
device_id TEXT NOT NULL,
timestamp BIGINT NOT NULL,
user_agent TEXT
);
-- Device IDs and timestamp must be unique for a given user per day
CREATE UNIQUE INDEX IF NOT EXISTS localpart_device_timestamp_idx ON user_daily_visits(localpart, device_id, timestamp);
CREATE INDEX IF NOT EXISTS timestamp_idx ON user_daily_visits(timestamp);
CREATE INDEX IF NOT EXISTS localpart_timestamp_idx ON user_daily_visits(localpart, timestamp);
`
const countUsersLastSeenAfterSQL = "" +
"SELECT COUNT(*) FROM (" +
" SELECT localpart FROM device_devices WHERE last_seen_ts > $1 " +
" GROUP BY localpart" +
" ) u"
// Note on the following countR30UsersSQL and countR30UsersV2SQL: The different checks are intentional.
// This is to ensure the values reported by Dendrite are the same as by Synapse.
// Queries are taken from: https://github.com/matrix-org/synapse/blob/9ce51a47f6e37abd0a1275281806399d874eb026/synapse/storage/databases/main/stats.py
/*
R30Users counts the number of 30 day retained users, defined as:
- Users who have created their accounts more than 30 days ago
- Where last seen at most 30 days ago
- Where account creation and last_seen are > 30 days apart
*/
const countR30UsersSQL = `
SELECT platform, COUNT(*) FROM (
SELECT users.localpart, platform, users.created_ts, MAX(uip.last_seen_ts)
FROM account_accounts users
INNER JOIN
(SELECT
localpart, last_seen_ts,
CASE
WHEN user_agent LIKE '%%Android%%' THEN 'android'
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
ELSE 'unknown'
END
AS platform
FROM device_devices
) uip
ON users.localpart = uip.localpart
AND users.account_type <> 4
AND users.created_ts < $1
AND uip.last_seen_ts > $2
AND (uip.last_seen_ts) - users.created_ts > $3
GROUP BY users.localpart, platform, users.created_ts
) u GROUP BY PLATFORM
`
// Note on the following countR30UsersSQL and countR30UsersV2SQL: The different checks are intentional.
// This is to ensure the values reported are the same as Synapse reports.
// Queries are taken from: https://github.com/matrix-org/synapse/blob/9ce51a47f6e37abd0a1275281806399d874eb026/synapse/storage/databases/main/stats.py
/*
R30UsersV2 counts the number of 30 day retained users, defined as users that:
- Appear more than once in the past 60 days
- Have more than 30 days between the most and least recent appearances that occurred in the past 60 days.
*/
const countR30UsersV2SQL = `
SELECT
client_type,
count(client_type)
FROM
(
SELECT
localpart,
CASE
WHEN
LOWER(user_agent) LIKE '%%riot%%' OR
LOWER(user_agent) LIKE '%%element%%'
THEN CASE
WHEN LOWER(user_agent) LIKE '%%electron%%' THEN 'electron'
WHEN LOWER(user_agent) LIKE '%%android%%' THEN 'android'
WHEN LOWER(user_agent) LIKE '%%ios%%' THEN 'ios'
ELSE 'unknown'
END
WHEN LOWER(user_agent) LIKE '%%mozilla%%' OR LOWER(user_agent) LIKE '%%gecko%%' THEN 'web'
ELSE 'unknown'
END as client_type
FROM user_daily_visits
WHERE timestamp > $1 AND timestamp < $2
GROUP BY localpart, client_type
HAVING max(timestamp) - min(timestamp) > $3
) AS temp
GROUP BY client_type
`
const countUserByAccountTypeSQL = `
SELECT COUNT(*) FROM account_accounts WHERE account_type IN ($1)
`
// $1 = Guest AccountType
// $3 & $4 = All non guest AccountType IDs
const countRegisteredUserByTypeSQL = `
SELECT user_type, COUNT(*) AS count FROM (
SELECT
CASE
WHEN account_type IN ($1) AND appservice_id IS NULL THEN 'native'
WHEN account_type = $4 AND appservice_id IS NULL THEN 'guest'
WHEN account_type IN ($5) AND appservice_id IS NOT NULL THEN 'bridged'
END AS user_type
FROM account_accounts
WHERE created_ts > $8
) AS t GROUP BY user_type
`
// account_type 1 = users; 3 = admins
const updateUserDailyVisitsSQL = `
INSERT INTO user_daily_visits(localpart, device_id, timestamp, user_agent)
SELECT u.localpart, u.device_id, $1, MAX(u.user_agent)
FROM device_devices AS u
LEFT JOIN (
SELECT localpart, device_id, timestamp FROM user_daily_visits
WHERE timestamp = $1
) udv
ON u.localpart = udv.localpart AND u.device_id = udv.device_id
INNER JOIN device_devices d ON d.localpart = u.localpart
INNER JOIN account_accounts a ON a.localpart = u.localpart
WHERE $2 <= d.last_seen_ts AND d.last_seen_ts < $3
AND a.account_type in (1, 3)
GROUP BY u.localpart, u.device_id
ON CONFLICT (localpart, device_id, timestamp) DO NOTHING
;
`
const queryDBEngineVersion = "select sqlite_version();"
type statsStatements struct {
serverName gomatrixserverlib.ServerName
db *sql.DB
lastUpdate time.Time
countUsersLastSeenAfterStmt *sql.Stmt
countR30UsersStmt *sql.Stmt
countR30UsersV2Stmt *sql.Stmt
updateUserDailyVisitsStmt *sql.Stmt
countUserByAccountTypeStmt *sql.Stmt
countRegisteredUserByTypeStmt *sql.Stmt
dbEngineVersionStmt *sql.Stmt
}
func NewSQLiteStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.StatsTable, error) {
s := &statsStatements{
serverName: serverName,
lastUpdate: time.Now(),
db: db,
}
_, err := db.Exec(userDailyVisitsSchema)
if err != nil {
return nil, err
}
go s.startTimers()
return s, sqlutil.StatementList{
{&s.countUsersLastSeenAfterStmt, countUsersLastSeenAfterSQL},
{&s.countR30UsersStmt, countR30UsersSQL},
{&s.countR30UsersV2Stmt, countR30UsersV2SQL},
{&s.updateUserDailyVisitsStmt, updateUserDailyVisitsSQL},
{&s.countUserByAccountTypeStmt, countUserByAccountTypeSQL},
{&s.countRegisteredUserByTypeStmt, countRegisteredUserByTypeSQL},
{&s.dbEngineVersionStmt, queryDBEngineVersion},
}.Prepare(db)
}
func (s *statsStatements) startTimers() {
var updateStatsFunc func()
updateStatsFunc = func() {
logrus.Infof("Executing UpdateUserDailyVisits")
if err := s.UpdateUserDailyVisits(context.Background(), nil, time.Now(), s.lastUpdate); err != nil {
logrus.WithError(err).Error("failed to update daily user visits")
}
time.AfterFunc(time.Hour*3, updateStatsFunc)
}
time.AfterFunc(time.Minute*5, updateStatsFunc)
}
func (s *statsStatements) allUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
query := strings.Replace(countUserByAccountTypeSQL, "($1)", sqlutil.QueryVariadic(4), 1)
queryStmt, err := s.db.Prepare(query)
if err != nil {
return 0, err
}
stmt := sqlutil.TxStmt(txn, queryStmt)
err = stmt.QueryRowContext(ctx,
1, 2, 3, 4,
).Scan(&result)
return
}
func (s *statsStatements) nonBridgedUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
query := strings.Replace(countUserByAccountTypeSQL, "($1)", sqlutil.QueryVariadic(3), 1)
queryStmt, err := s.db.Prepare(query)
if err != nil {
return 0, err
}
stmt := sqlutil.TxStmt(txn, queryStmt)
err = stmt.QueryRowContext(ctx,
1, 2, 3,
).Scan(&result)
return
}
func (s *statsStatements) registeredUserByType(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
// $1 = Guest AccountType; $2 = timestamp
// $3 & $4 = All non guest AccountType IDs
nonGuests := []api.AccountType{api.AccountTypeUser, api.AccountTypeAdmin, api.AccountTypeAppService}
countSQL := strings.Replace(countRegisteredUserByTypeSQL, "($1)", sqlutil.QueryVariadicOffset(len(nonGuests), 0), 1)
countSQL = strings.Replace(countSQL, "($5)", sqlutil.QueryVariadicOffset(len(nonGuests), 1+len(nonGuests)), 1)
queryStmt, err := s.db.Prepare(countSQL)
if err != nil {
return nil, err
}
stmt := sqlutil.TxStmt(txn, queryStmt)
registeredAfter := time.Now().AddDate(0, 0, -30)
params := make([]interface{}, len(nonGuests)*2+2)
// nonGuests is used twice
for i, v := range nonGuests {
params[i] = v // i: 0 1 2 => ($1, $2, $3)
params[i+1+len(nonGuests)] = v // i: 4 5 6 => ($5, $6, $7)
}
params[3] = api.AccountTypeGuest // $4
params[7] = gomatrixserverlib.AsTimestamp(registeredAfter) // $8
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "RegisteredUserByType: failed to close rows")
var userType string
var count int64
var result = make(map[string]int64)
for rows.Next() {
if err = rows.Scan(&userType, &count); err != nil {
return nil, err
}
result[userType] = count
}
return result, rows.Err()
}
func (s *statsStatements) dailyUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countUsersLastSeenAfterStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -1)
err = stmt.QueryRowContext(ctx,
gomatrixserverlib.AsTimestamp(lastSeenAfter),
).Scan(&result)
return
}
func (s *statsStatements) monthlyUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countUsersLastSeenAfterStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -30)
err = stmt.QueryRowContext(ctx,
gomatrixserverlib.AsTimestamp(lastSeenAfter),
).Scan(&result)
return
}
/* R30Users counts the number of 30 day retained users, defined as:
- Users who have created their accounts more than 30 days ago
- Where last seen at most 30 days ago
- Where account creation and last_seen are > 30 days apart
*/
func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
stmt := sqlutil.TxStmt(txn, s.countR30UsersStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -30)
diff := time.Hour * 24 * 30
rows, err := stmt.QueryContext(ctx,
gomatrixserverlib.AsTimestamp(lastSeenAfter),
gomatrixserverlib.AsTimestamp(lastSeenAfter),
diff.Milliseconds(),
)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "R30Users: failed to close rows")
var platform string
var count int64
var result = make(map[string]int64)
for rows.Next() {
if err = rows.Scan(&platform, &count); err != nil {
return nil, err
}
if platform == "unknown" {
continue
}
result["all"] += count
result[platform] = count
}
return result, rows.Err()
}
/* R30UsersV2 counts the number of 30 day retained users, defined as users that:
- Appear more than once in the past 60 days
- Have more than 30 days between the most and least recent appearances that occurred in the past 60 days.
*/
func (s *statsStatements) r30UsersV2(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
stmt := sqlutil.TxStmt(txn, s.countR30UsersV2Stmt)
sixtyDaysAgo := time.Now().AddDate(0, 0, -60)
diff := time.Hour * 24 * 30
tomorrow := time.Now().Add(time.Hour * 24)
rows, err := stmt.QueryContext(ctx,
gomatrixserverlib.AsTimestamp(sixtyDaysAgo),
gomatrixserverlib.AsTimestamp(tomorrow),
diff.Milliseconds(),
)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "R30UsersV2: failed to close rows")
var platform string
var count int64
var result = map[string]int64{
"ios": 0,
"android": 0,
"web": 0,
"electron": 0,
"all": 0,
}
for rows.Next() {
if err = rows.Scan(&platform, &count); err != nil {
return nil, err
}
if _, ok := result[platform]; !ok {
continue
}
result["all"] += count
result[platform] = count
}
return result, rows.Err()
}
// UserStatistics collects some information about users on this instance.
// Returns the stats itself as well as the database engine version and type.
// On error, returns the stats collected up to the error.
func (s *statsStatements) UserStatistics(ctx context.Context, txn *sql.Tx) (*types.UserStatistics, *types.DatabaseEngine, error) {
var (
stats = &types.UserStatistics{
R30UsersV2: map[string]int64{
"ios": 0,
"android": 0,
"web": 0,
"electron": 0,
"all": 0,
},
R30Users: map[string]int64{},
RegisteredUsersByType: map[string]int64{},
}
dbEngine = &types.DatabaseEngine{Engine: "SQLite", Version: "unknown"}
err error
)
stats.AllUsers, err = s.allUsers(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.DailyUsers, err = s.dailyUsers(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.MonthlyUsers, err = s.monthlyUsers(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.R30Users, err = s.r30Users(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.R30UsersV2, err = s.r30UsersV2(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.NonBridgedUsers, err = s.nonBridgedUsers(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stats.RegisteredUsersByType, err = s.registeredUserByType(ctx, txn)
if err != nil {
return stats, dbEngine, err
}
stmt := sqlutil.TxStmt(txn, s.dbEngineVersionStmt)
err = stmt.QueryRowContext(ctx).Scan(&dbEngine.Version)
return stats, dbEngine, err
}
func (s *statsStatements) UpdateUserDailyVisits(
ctx context.Context, txn *sql.Tx,
startTime, lastUpdate time.Time,
) error {
stmt := sqlutil.TxStmt(txn, s.updateUserDailyVisitsStmt)
startTime = startTime.Truncate(time.Hour * 24)
// edge case
if startTime.After(s.lastUpdate) {
startTime = startTime.AddDate(0, 0, -1)
}
_, err := stmt.ExecContext(ctx,
gomatrixserverlib.AsTimestamp(startTime),
gomatrixserverlib.AsTimestamp(lastUpdate),
gomatrixserverlib.AsTimestamp(time.Now()),
)
if err == nil {
s.lastUpdate = time.Now()
}
return err
}

View File

@ -95,6 +95,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil { if err != nil {
return nil, fmt.Errorf("NewPostgresNotificationTable: %w", err) return nil, fmt.Errorf("NewPostgresNotificationTable: %w", err)
} }
statsTable, err := NewSQLiteStatsTable(db, serverName)
if err != nil {
return nil, fmt.Errorf("NewSQLiteStatsTable: %w", err)
}
return &shared.Database{ return &shared.Database{
AccountDatas: accountDataTable, AccountDatas: accountDataTable,
Accounts: accountsTable, Accounts: accountsTable,
@ -107,6 +111,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
ThreePIDs: threePIDTable, ThreePIDs: threePIDTable,
Pushers: pusherTable, Pushers: pusherTable,
Notifications: notificationsTable, Notifications: notificationsTable,
Stats: statsTable,
ServerName: serverName, ServerName: serverName,
DB: db, DB: db,
Writer: writer, Writer: writer,

View File

@ -174,7 +174,7 @@ func Test_Devices(t *testing.T) {
newName := "new display name" newName := "new display name"
err = db.UpdateDevice(ctx, localpart, deviceWithID.ID, &newName) err = db.UpdateDevice(ctx, localpart, deviceWithID.ID, &newName)
assert.NoError(t, err, "unable to update device displayname") assert.NoError(t, err, "unable to update device displayname")
err = db.UpdateDeviceLastSeen(ctx, localpart, deviceWithID.ID, "127.0.0.1") err = db.UpdateDeviceLastSeen(ctx, localpart, deviceWithID.ID, "127.0.0.1", "Element Web")
assert.NoError(t, err, "unable to update device last seen") assert.NoError(t, err, "unable to update device last seen")
deviceWithID.DisplayName = newName deviceWithID.DisplayName = newName

View File

@ -18,9 +18,11 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/types"
) )
type AccountDataTable interface { type AccountDataTable interface {
@ -48,7 +50,7 @@ type DevicesTable interface {
SelectDeviceByID(ctx context.Context, localpart, deviceID string) (*api.Device, error) SelectDeviceByID(ctx context.Context, localpart, deviceID string) (*api.Device, error)
SelectDevicesByLocalpart(ctx context.Context, txn *sql.Tx, localpart, exceptDeviceID string) ([]api.Device, error) SelectDevicesByLocalpart(ctx context.Context, txn *sql.Tx, localpart, exceptDeviceID string) ([]api.Device, error)
SelectDevicesByID(ctx context.Context, deviceIDs []string) ([]api.Device, error) SelectDevicesByID(ctx context.Context, deviceIDs []string) ([]api.Device, error)
UpdateDeviceLastSeen(ctx context.Context, txn *sql.Tx, localpart, deviceID, ipAddr string) error UpdateDeviceLastSeen(ctx context.Context, txn *sql.Tx, localpart, deviceID, ipAddr, userAgent string) error
} }
type KeyBackupTable interface { type KeyBackupTable interface {
@ -111,6 +113,11 @@ type NotificationTable interface {
SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, _ error) SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, _ error)
} }
type StatsTable interface {
UserStatistics(ctx context.Context, txn *sql.Tx) (*types.UserStatistics, *types.DatabaseEngine, error)
UpdateUserDailyVisits(ctx context.Context, txn *sql.Tx, startTime, lastUpdate time.Time) error
}
type NotificationFilter uint32 type NotificationFilter uint32
const ( const (

View File

@ -0,0 +1,319 @@
package tables_test
import (
"context"
"database/sql"
"fmt"
"reflect"
"testing"
"time"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/postgres"
"github.com/matrix-org/dendrite/userapi/storage/sqlite3"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
func mustMakeDBs(t *testing.T, dbType test.DBType) (
*sql.DB, tables.AccountsTable, tables.DevicesTable, tables.StatsTable, func(),
) {
t.Helper()
var (
accTable tables.AccountsTable
devTable tables.DevicesTable
statsTable tables.StatsTable
err error
)
connStr, close := test.PrepareDBConnectionString(t, dbType)
db, err := sqlutil.Open(&config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
}, nil)
if err != nil {
t.Fatalf("failed to open db: %s", err)
}
switch dbType {
case test.DBTypeSQLite:
accTable, err = sqlite3.NewSQLiteAccountsTable(db, "localhost")
if err != nil {
t.Fatalf("unable to create acc db: %v", err)
}
devTable, err = sqlite3.NewSQLiteDevicesTable(db, "localhost")
if err != nil {
t.Fatalf("unable to open device db: %v", err)
}
statsTable, err = sqlite3.NewSQLiteStatsTable(db, "localhost")
if err != nil {
t.Fatalf("unable to open stats db: %v", err)
}
case test.DBTypePostgres:
accTable, err = postgres.NewPostgresAccountsTable(db, "localhost")
if err != nil {
t.Fatalf("unable to create acc db: %v", err)
}
devTable, err = postgres.NewPostgresDevicesTable(db, "localhost")
if err != nil {
t.Fatalf("unable to open device db: %v", err)
}
statsTable, err = postgres.NewPostgresStatsTable(db, "localhost")
if err != nil {
t.Fatalf("unable to open stats db: %v", err)
}
}
return db, accTable, devTable, statsTable, close
}
func mustMakeAccountAndDevice(
t *testing.T,
ctx context.Context,
accDB tables.AccountsTable,
devDB tables.DevicesTable,
localpart string,
accType api.AccountType,
userAgent string,
) {
t.Helper()
appServiceID := ""
if accType == api.AccountTypeAppService {
appServiceID = util.RandomString(16)
}
_, err := accDB.InsertAccount(ctx, nil, localpart, "", appServiceID, accType)
if err != nil {
t.Fatalf("unable to create account: %v", err)
}
_, err = devDB.InsertDevice(ctx, nil, "deviceID", localpart, util.RandomString(16), nil, "", userAgent)
if err != nil {
t.Fatalf("unable to create device: %v", err)
}
}
func mustUpdateDeviceLastSeen(
t *testing.T,
ctx context.Context,
db *sql.DB,
localpart string,
timestamp time.Time,
) {
t.Helper()
_, err := db.ExecContext(ctx, "UPDATE device_devices SET last_seen_ts = $1 WHERE localpart = $2", gomatrixserverlib.AsTimestamp(timestamp), localpart)
if err != nil {
t.Fatalf("unable to update device last seen")
}
}
func mustUserUpdateRegistered(
t *testing.T,
ctx context.Context,
db *sql.DB,
localpart string,
timestamp time.Time,
) {
_, err := db.ExecContext(ctx, "UPDATE account_accounts SET created_ts = $1 WHERE localpart = $2", gomatrixserverlib.AsTimestamp(timestamp), localpart)
if err != nil {
t.Fatalf("unable to update device last seen")
}
}
// These tests must run sequentially, as they build up on each other
func Test_UserStatistics(t *testing.T) {
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, accDB, devDB, statsDB, close := mustMakeDBs(t, dbType)
defer close()
wantType := "SQLite"
if dbType == test.DBTypePostgres {
wantType = "Postgres"
}
t.Run(fmt.Sprintf("want %s database engine", wantType), func(t *testing.T) {
_, gotDB, err := statsDB.UserStatistics(ctx, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if wantType != gotDB.Engine { // can't use DeepEqual, as the Version might differ
t.Errorf("UserStatistics() got DB engine = %+v, want %s", gotDB.Engine, wantType)
}
})
t.Run("Want Users", func(t *testing.T) {
mustMakeAccountAndDevice(t, ctx, accDB, devDB, "user1", api.AccountTypeUser, "Element Android")
mustMakeAccountAndDevice(t, ctx, accDB, devDB, "user2", api.AccountTypeUser, "Element iOS")
mustMakeAccountAndDevice(t, ctx, accDB, devDB, "user3", api.AccountTypeUser, "Element web")
mustMakeAccountAndDevice(t, ctx, accDB, devDB, "user4", api.AccountTypeGuest, "Element Electron")
mustMakeAccountAndDevice(t, ctx, accDB, devDB, "user5", api.AccountTypeAdmin, "gecko")
mustMakeAccountAndDevice(t, ctx, accDB, devDB, "user6", api.AccountTypeAppService, "gecko")
gotStats, _, err := statsDB.UserStatistics(ctx, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
wantStats := &types.UserStatistics{
RegisteredUsersByType: map[string]int64{
"native": 4,
"guest": 1,
"bridged": 1,
},
R30Users: map[string]int64{},
R30UsersV2: map[string]int64{
"ios": 0,
"android": 0,
"web": 0,
"electron": 0,
"all": 0,
},
AllUsers: 6,
NonBridgedUsers: 5,
DailyUsers: 6,
MonthlyUsers: 6,
}
if !reflect.DeepEqual(gotStats, wantStats) {
t.Errorf("UserStatistics() gotStats = \n%+v\nwant\n%+v", gotStats, wantStats)
}
})
t.Run("Users not active for one/two month", func(t *testing.T) {
mustUpdateDeviceLastSeen(t, ctx, db, "user1", time.Now().AddDate(0, -2, 0))
mustUpdateDeviceLastSeen(t, ctx, db, "user2", time.Now().AddDate(0, -1, 0))
gotStats, _, err := statsDB.UserStatistics(ctx, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
wantStats := &types.UserStatistics{
RegisteredUsersByType: map[string]int64{
"native": 4,
"guest": 1,
"bridged": 1,
},
R30Users: map[string]int64{},
R30UsersV2: map[string]int64{
"ios": 0,
"android": 0,
"web": 0,
"electron": 0,
"all": 0,
},
AllUsers: 6,
NonBridgedUsers: 5,
DailyUsers: 4,
MonthlyUsers: 4,
}
if !reflect.DeepEqual(gotStats, wantStats) {
t.Errorf("UserStatistics() gotStats = \n%+v\nwant\n%+v", gotStats, wantStats)
}
})
/* R30Users counts the number of 30 day retained users, defined as:
- Users who have created their accounts more than 30 days ago
- Where last seen at most 30 days ago
- Where account creation and last_seen are > 30 days apart
*/
t.Run("R30Users tests", func(t *testing.T) {
mustUserUpdateRegistered(t, ctx, db, "user1", time.Now().AddDate(0, -2, 0))
mustUpdateDeviceLastSeen(t, ctx, db, "user1", time.Now())
mustUserUpdateRegistered(t, ctx, db, "user4", time.Now().AddDate(0, -2, 0))
mustUpdateDeviceLastSeen(t, ctx, db, "user4", time.Now())
startTime := time.Now().AddDate(0, 0, -2)
err := statsDB.UpdateUserDailyVisits(ctx, nil, startTime, startTime.Truncate(time.Hour*24).Add(time.Hour))
if err != nil {
t.Fatalf("unable to update daily visits stats: %v", err)
}
gotStats, _, err := statsDB.UserStatistics(ctx, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
wantStats := &types.UserStatistics{
RegisteredUsersByType: map[string]int64{
"native": 3,
"bridged": 1,
},
R30Users: map[string]int64{
"all": 2,
"android": 1,
"electron": 1,
},
R30UsersV2: map[string]int64{
"ios": 0,
"android": 0,
"web": 0,
"electron": 0,
"all": 0,
},
AllUsers: 6,
NonBridgedUsers: 5,
DailyUsers: 5,
MonthlyUsers: 5,
}
if !reflect.DeepEqual(gotStats, wantStats) {
t.Errorf("UserStatistics() gotStats = \n%+v\nwant\n%+v", gotStats, wantStats)
}
})
/*
R30UsersV2 counts the number of 30 day retained users, defined as users that:
- Appear more than once in the past 60 days
- Have more than 30 days between the most and least recent appearances that occurred in the past 60 days.
most recent -> neueste
least recent -> älteste
*/
t.Run("R30UsersV2 tests", func(t *testing.T) {
// generate some data
for i := 100; i > 0; i-- {
mustUpdateDeviceLastSeen(t, ctx, db, "user1", time.Now().AddDate(0, 0, -i))
mustUpdateDeviceLastSeen(t, ctx, db, "user5", time.Now().AddDate(0, 0, -i))
startTime := time.Now().AddDate(0, 0, -i)
err := statsDB.UpdateUserDailyVisits(ctx, nil, startTime, startTime.Truncate(time.Hour*24).Add(time.Hour))
if err != nil {
t.Fatalf("unable to update daily visits stats: %v", err)
}
}
gotStats, _, err := statsDB.UserStatistics(ctx, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
wantStats := &types.UserStatistics{
RegisteredUsersByType: map[string]int64{
"native": 3,
"bridged": 1,
},
R30Users: map[string]int64{
"all": 2,
"android": 1,
"electron": 1,
},
R30UsersV2: map[string]int64{
"ios": 0,
"android": 1,
"web": 1,
"electron": 0,
"all": 2,
},
AllUsers: 6,
NonBridgedUsers: 5,
DailyUsers: 3,
MonthlyUsers: 5,
}
if !reflect.DeepEqual(gotStats, wantStats) {
t.Errorf("UserStatistics() gotStats = \n%+v\nwant\n%+v", gotStats, wantStats)
}
})
})
}

View File

@ -0,0 +1,30 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
type UserStatistics struct {
RegisteredUsersByType map[string]int64
R30Users map[string]int64
R30UsersV2 map[string]int64
AllUsers int64
NonBridgedUsers int64
DailyUsers int64
MonthlyUsers int64
}
type DatabaseEngine struct {
Engine string
Version string
}

View File

@ -30,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/userapi/inthttp" "github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/util"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -104,5 +105,9 @@ func NewInternalAPI(
} }
time.AfterFunc(time.Minute, cleanOldNotifs) time.AfterFunc(time.Minute, cleanOldNotifs)
if base.Cfg.Global.ReportStats.Enabled {
go util.StartPhoneHomeCollector(time.Now(), base.Cfg, db)
}
return userAPI return userAPI
} }

View File

@ -0,0 +1,160 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"bytes"
"context"
"encoding/json"
"math"
"net/http"
"runtime"
"syscall"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type phoneHomeStats struct {
prevData timestampToRUUsage
stats map[string]interface{}
serverName gomatrixserverlib.ServerName
startTime time.Time
cfg *config.Dendrite
db storage.Statistics
isMonolith bool
client *http.Client
}
type timestampToRUUsage struct {
timestamp int64
usage syscall.Rusage
}
func StartPhoneHomeCollector(startTime time.Time, cfg *config.Dendrite, statsDB storage.Statistics) {
p := phoneHomeStats{
startTime: startTime,
serverName: cfg.Global.ServerName,
cfg: cfg,
db: statsDB,
isMonolith: cfg.IsMonolith,
client: &http.Client{
Timeout: time.Second * 30,
Transport: http.DefaultTransport,
},
}
// start initial run after 5min
time.AfterFunc(time.Minute*5, p.collect)
// run every 3 hours
ticker := time.NewTicker(time.Hour * 3)
for range ticker.C {
p.collect()
}
}
func (p *phoneHomeStats) collect() {
p.stats = make(map[string]interface{})
// general information
p.stats["homeserver"] = p.serverName
p.stats["monolith"] = p.isMonolith
p.stats["version"] = internal.VersionString()
p.stats["timestamp"] = time.Now().Unix()
p.stats["go_version"] = runtime.Version()
p.stats["go_arch"] = runtime.GOARCH
p.stats["go_os"] = runtime.GOOS
p.stats["num_cpu"] = runtime.NumCPU()
p.stats["num_go_routine"] = runtime.NumGoroutine()
p.stats["uptime_seconds"] = math.Floor(time.Since(p.startTime).Seconds())
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
defer cancel()
// cpu and memory usage information
err := getMemoryStats(p)
if err != nil {
logrus.WithError(err).Warn("unable to get memory/cpu stats, using defaults")
}
// configuration information
p.stats["federation_disabled"] = p.cfg.Global.DisableFederation
p.stats["nats_embedded"] = true
p.stats["nats_in_memory"] = p.cfg.Global.JetStream.InMemory
if len(p.cfg.Global.JetStream.Addresses) > 0 {
p.stats["nats_embedded"] = false
p.stats["nats_in_memory"] = false // probably
}
if len(p.cfg.Logging) > 0 {
p.stats["log_level"] = p.cfg.Logging[0].Level
} else {
p.stats["log_level"] = "info"
}
// message and room stats
// TODO: Find a solution to actually set these values
p.stats["total_room_count"] = 0
p.stats["daily_messages"] = 0
p.stats["daily_sent_messages"] = 0
p.stats["daily_e2ee_messages"] = 0
p.stats["daily_sent_e2ee_messages"] = 0
// user stats and DB engine
userStats, db, err := p.db.UserStatistics(ctx)
if err != nil {
logrus.WithError(err).Warn("unable to query userstats, using default values")
}
p.stats["database_engine"] = db.Engine
p.stats["database_server_version"] = db.Version
p.stats["total_users"] = userStats.AllUsers
p.stats["total_nonbridged_users"] = userStats.NonBridgedUsers
p.stats["daily_active_users"] = userStats.DailyUsers
p.stats["monthly_active_users"] = userStats.MonthlyUsers
for t, c := range userStats.RegisteredUsersByType {
p.stats["daily_user_type_"+t] = c
}
for t, c := range userStats.R30Users {
p.stats["r30_users_"+t] = c
}
for t, c := range userStats.R30UsersV2 {
p.stats["r30v2_users_"+t] = c
}
output := bytes.Buffer{}
if err = json.NewEncoder(&output).Encode(p.stats); err != nil {
logrus.WithError(err).Error("unable to encode anonymous stats")
return
}
logrus.Infof("Reporting stats to %s: %s", p.cfg.Global.ReportStats.Endpoint, output.String())
request, err := http.NewRequestWithContext(ctx, http.MethodPost, p.cfg.Global.ReportStats.Endpoint, &output)
if err != nil {
logrus.WithError(err).Error("unable to create anonymous stats request")
return
}
request.Header.Set("User-Agent", "Dendrite/"+internal.VersionString())
_, err = p.client.Do(request)
if err != nil {
logrus.WithError(err).Error("unable to send anonymous stats")
return
}
}

47
userapi/util/stats.go Normal file
View File

@ -0,0 +1,47 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !wasm && !windows
// +build !wasm,!windows
package util
import (
"syscall"
"time"
"github.com/sirupsen/logrus"
)
func getMemoryStats(p *phoneHomeStats) error {
oldUsage := p.prevData
newUsage := syscall.Rusage{}
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &newUsage); err != nil {
logrus.WithError(err).Error("unable to get usage")
return err
}
newData := timestampToRUUsage{timestamp: time.Now().Unix(), usage: newUsage}
p.prevData = newData
usedCPUTime := (newUsage.Utime.Sec + newUsage.Stime.Sec) - (oldUsage.usage.Utime.Sec + oldUsage.usage.Stime.Sec)
if usedCPUTime == 0 || newData.timestamp == oldUsage.timestamp {
p.stats["cpu_average"] = 0
} else {
// conversion to int64 required for GOARCH=386
p.stats["cpu_average"] = int64(usedCPUTime) / (newData.timestamp - oldUsage.timestamp) * 100
}
p.stats["memory_rss"] = newUsage.Maxrss
return nil
}

View File

@ -0,0 +1,20 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
// stub, since WASM doesn't support syscall.Getrusage
func getMemoryStats(p *phoneHomeStats) error {
return nil
}

View File

@ -0,0 +1,29 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !wasm
// +build !wasm
package util
import (
"runtime"
)
func getMemoryStats(p *phoneHomeStats) error {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
p.stats["memory_rss"] = memStats.Alloc
return nil
}