Add dendrite-upgrade-test (#1901)

* Add WIP binary for testing dendrite version upgrades

* WIP dendrite upgrade work

* Finish dendrite upgrade checks

* go mod tidy

* Review comments; print container logs on failure

* Linting
This commit is contained in:
kegsay 2021-07-07 12:06:17 +01:00 committed by GitHub
parent bcd3ef38d0
commit d72d634391
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1332 additions and 1 deletions

View File

@ -0,0 +1,430 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"regexp"
"runtime"
"sort"
"strings"
"sync"
"time"
"github.com/Masterminds/semver/v3"
"github.com/codeclysm/extract"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
)
var (
flagTempDir = flag.String("tmp", "tmp", "Path to temporary directory to dump tarballs to")
flagFrom = flag.String("from", "0.1.0", "The version to start from e.g '0.3.1'.")
flagTo = flag.String("to", "HEAD", "The version to end on e.g '0.3.3'.")
flagBuildConcurrency = flag.Int("build-concurrency", runtime.NumCPU(), "The amount of build concurrency when building images")
alphaNumerics = regexp.MustCompile("[^a-zA-Z0-9]+")
)
// Embed the Dockerfile to use when building dendrite versions.
// We cannot use the dockerfile associated with the repo with each version sadly due to changes in
// Docker versions. Specifically, earlier Dendrite versions are incompatible with newer Docker clients
// due to the error:
// When using COPY with more than one source file, the destination must be a directory and end with a /
// We need to run a postgres anyway, so use the dockerfile associated with Complement instead.
const Dockerfile = `FROM golang:1.13-stretch as build
RUN apt-get update && apt-get install -y postgresql
WORKDIR /build
# Copy the build context to the repo as this is the right dendrite code. This is different to the
# Complement Dockerfile which wgets a branch.
COPY . .
RUN go build ./cmd/dendrite-monolith-server
RUN go build ./cmd/generate-keys
RUN go build ./cmd/generate-config
RUN ./generate-config --ci > dendrite.yaml
RUN ./generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key
# Replace the connection string with a single postgres DB, using user/db = 'postgres' and no password
RUN sed -i "s%connection_string:.*$%connection_string: postgresql://postgres@localhost/postgres?sslmode=disable%g" dendrite.yaml
# No password when connecting over localhost
RUN sed -i "s%127.0.0.1/32 md5%127.0.0.1/32 trust%g" /etc/postgresql/9.6/main/pg_hba.conf
# Bump up max conns for moar concurrency
RUN sed -i 's/max_connections = 100/max_connections = 2000/g' /etc/postgresql/9.6/main/postgresql.conf
RUN sed -i 's/max_open_conns:.*$/max_open_conns: 100/g' dendrite.yaml
# This entry script starts postgres, waits for it to be up then starts dendrite
RUN echo '\
#!/bin/bash -eu \n\
pg_lsclusters \n\
pg_ctlcluster 9.6 main start \n\
\n\
until pg_isready \n\
do \n\
echo "Waiting for postgres"; \n\
sleep 1; \n\
done \n\
\n\
sed -i "s/server_name: localhost/server_name: ${SERVER_NAME}/g" dendrite.yaml \n\
./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml \n\
' > run_dendrite.sh && chmod +x run_dendrite.sh
ENV SERVER_NAME=localhost
EXPOSE 8008 8448
CMD /build/run_dendrite.sh `
// downloadArchive downloads an arbitrary github archive of the form:
// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz
// and re-tarballs it without the top-level directory which contains branch information. It inserts
// the contents of `dockerfile` as a root file `Dockerfile` in the re-tarballed directory such that
// you can directly feed the retarballed archive to `ImageBuild` to have it run said dockerfile.
// Returns the tarball buffer on success.
func downloadArchive(cli *http.Client, tmpDir, archiveURL string, dockerfile []byte) (*bytes.Buffer, error) {
resp, err := cli.Get(archiveURL)
if err != nil {
return nil, err
}
// nolint:errcheck
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("got HTTP %d", resp.StatusCode)
}
if err = os.Mkdir(tmpDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("failed to make temporary directory: %s", err)
}
// nolint:errcheck
defer os.RemoveAll(tmpDir)
// dump the tarball temporarily, stripping the top-level directory
err = extract.Archive(context.Background(), resp.Body, tmpDir, func(inPath string) string {
// remove top level
segments := strings.Split(inPath, "/")
return strings.Join(segments[1:], "/")
})
if err != nil {
return nil, err
}
// add top level Dockerfile
err = ioutil.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm)
if err != nil {
return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err)
}
// now re-tarball it :/
var tarball bytes.Buffer
err = compress(tmpDir, &tarball)
if err != nil {
return nil, err
}
return &tarball, nil
}
// buildDendrite builds Dendrite on the branchOrTagName given. Returns the image ID or an error
func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir, branchOrTagName string) (string, error) {
log.Printf("%s: Downloading version %s to %s\n", branchOrTagName, branchOrTagName, tmpDir)
// pull an archive, this contains a top-level directory which screws with the build context
// which we need to fix up post download
u := fmt.Sprintf("https://github.com/matrix-org/dendrite/archive/%s.tar.gz", branchOrTagName)
tarball, err := downloadArchive(httpClient, tmpDir, u, []byte(Dockerfile))
if err != nil {
return "", fmt.Errorf("failed to download archive %s: %w", u, err)
}
log.Printf("%s: %s => %d bytes\n", branchOrTagName, u, tarball.Len())
log.Printf("%s: Building version %s\n", branchOrTagName, branchOrTagName)
res, err := dockerClient.ImageBuild(context.Background(), tarball, types.ImageBuildOptions{
Tags: []string{"dendrite-upgrade"},
})
if err != nil {
return "", fmt.Errorf("failed to start building image: %s", err)
}
// nolint:errcheck
defer res.Body.Close()
decoder := json.NewDecoder(res.Body)
// {"aux":{"ID":"sha256:247082c717963bc2639fc2daed08838d67811ea12356cd4fda43e1ffef94f2eb"}}
var imageID string
for decoder.More() {
var dl struct {
Stream string `json:"stream"`
Aux map[string]interface{} `json:"aux"`
}
if err := decoder.Decode(&dl); err != nil {
return "", fmt.Errorf("failed to decode build image output line: %w", err)
}
log.Printf("%s: %s", branchOrTagName, dl.Stream)
if dl.Aux != nil {
imgID, ok := dl.Aux["ID"]
if ok {
imageID = imgID.(string)
}
}
}
return imageID, nil
}
func getAndSortVersionsFromGithub(httpClient *http.Client) (semVers []*semver.Version, err error) {
u := "https://api.github.com/repos/matrix-org/dendrite/tags"
res, err := httpClient.Get(u)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
return nil, fmt.Errorf("%s returned HTTP %d", u, res.StatusCode)
}
resp := []struct {
Name string `json:"name"`
}{}
if err = json.NewDecoder(res.Body).Decode(&resp); err != nil {
return nil, err
}
for _, r := range resp {
v, err := semver.NewVersion(r.Name)
if err != nil {
continue // not a semver, that's ok and isn't an error, we allow tags that aren't semvers
}
semVers = append(semVers, v)
}
sort.Sort(semver.Collection(semVers))
return semVers, nil
}
func calculateVersions(cli *http.Client, from, to string) []string {
semvers, err := getAndSortVersionsFromGithub(cli)
if err != nil {
log.Fatalf("failed to collect semvers from github: %s", err)
}
// snip the lower bound depending on --from
if from != "" {
fromVer, err := semver.NewVersion(from)
if err != nil {
log.Fatalf("invalid --from: %s", err)
}
i := 0
for i = 0; i < len(semvers); i++ {
if semvers[i].LessThan(fromVer) {
continue
}
break
}
semvers = semvers[i:]
}
if to != "" && to != "HEAD" {
toVer, err := semver.NewVersion(to)
if err != nil {
log.Fatalf("invalid --to: %s", err)
}
var i int
for i = len(semvers) - 1; i >= 0; i-- {
if semvers[i].GreaterThan(toVer) {
continue
}
break
}
semvers = semvers[:i+1]
}
var versions []string
for _, sv := range semvers {
versions = append(versions, sv.Original())
}
if to == "HEAD" {
versions = append(versions, "HEAD")
}
return versions
}
func buildDendriteImages(httpClient *http.Client, dockerClient *client.Client, baseTempDir string, concurrency int, branchOrTagNames []string) map[string]string {
// concurrently build all versions, this can be done in any order. The mutex protects the map
branchToImageID := make(map[string]string)
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(concurrency)
ch := make(chan string, len(branchOrTagNames))
for _, branchName := range branchOrTagNames {
ch <- branchName
}
close(ch)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for branchName := range ch {
tmpDir := baseTempDir + alphaNumerics.ReplaceAllString(branchName, "")
imgID, err := buildDendrite(httpClient, dockerClient, tmpDir, branchName)
if err != nil {
log.Fatalf("%s: failed to build dendrite image: %s", branchName, err)
}
mu.Lock()
branchToImageID[branchName] = imgID
mu.Unlock()
}
}()
}
wg.Wait()
return branchToImageID
}
func runImage(dockerClient *client.Client, volumeName, version, imageID string) (csAPIURL, containerID string, err error) {
log.Printf("%s: running image %s\n", version, imageID)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
body, err := dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageID,
Env: []string{"SERVER_NAME=hs1"},
}, &container.HostConfig{
PublishAllPorts: true,
Mounts: []mount.Mount{
{
Type: mount.TypeVolume,
Source: volumeName,
Target: "/var/lib/postgresql/9.6/main",
},
},
}, nil, nil, "dendrite_upgrade_test_"+version)
if err != nil {
return "", "", fmt.Errorf("failed to ContainerCreate: %s", err)
}
containerID = body.ID
err = dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{})
if err != nil {
return "", "", fmt.Errorf("failed to ContainerStart: %s", err)
}
inspect, err := dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return "", "", err
}
csapiPortInfo, ok := inspect.NetworkSettings.Ports[nat.Port("8008/tcp")]
if !ok {
return "", "", fmt.Errorf("port 8008 not exposed - exposed ports: %v", inspect.NetworkSettings.Ports)
}
baseURL := fmt.Sprintf("http://localhost:%s", csapiPortInfo[0].HostPort)
versionsURL := fmt.Sprintf("%s/_matrix/client/versions", baseURL)
// hit /versions to check it is up
var lastErr error
for i := 0; i < 500; i++ {
res, err := http.Get(versionsURL)
if err != nil {
lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err)
time.Sleep(50 * time.Millisecond)
continue
}
if res.StatusCode != 200 {
lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status)
time.Sleep(50 * time.Millisecond)
continue
}
lastErr = nil
break
}
if lastErr != nil {
logs, err := dockerClient.ContainerLogs(context.Background(), containerID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
})
// ignore errors when cannot get logs, it's just for debugging anyways
if err == nil {
logbody, err := ioutil.ReadAll(logs)
if err == nil {
log.Printf("Container logs:\n\n%s\n\n", string(logbody))
}
}
}
return baseURL, containerID, lastErr
}
func destroyContainer(dockerClient *client.Client, containerID string) {
err := dockerClient.ContainerRemove(context.TODO(), containerID, types.ContainerRemoveOptions{
Force: true,
})
if err != nil {
log.Printf("failed to remove container %s : %s", containerID, err)
}
}
func loadAndRunTests(dockerClient *client.Client, volumeName, v string, branchToImageID map[string]string) error {
csAPIURL, containerID, err := runImage(dockerClient, volumeName, v, branchToImageID[v])
if err != nil {
return fmt.Errorf("failed to run container for branch %v: %v", v, err)
}
defer destroyContainer(dockerClient, containerID)
log.Printf("URL %s -> %s \n", csAPIURL, containerID)
if err = runTests(csAPIURL, v); err != nil {
return fmt.Errorf("failed to run tests on version %s: %s", v, err)
}
return nil
}
func verifyTests(dockerClient *client.Client, volumeName string, versions []string, branchToImageID map[string]string) error {
lastVer := versions[len(versions)-1]
csAPIURL, containerID, err := runImage(dockerClient, volumeName, lastVer, branchToImageID[lastVer])
if err != nil {
return fmt.Errorf("failed to run container for branch %v: %v", lastVer, err)
}
defer destroyContainer(dockerClient, containerID)
return verifyTestsRan(csAPIURL, versions)
}
func main() {
flag.Parse()
httpClient := &http.Client{
Timeout: 60 * time.Second,
}
dockerClient, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Fatalf("failed to make docker client: %s", err)
}
if *flagFrom == "" {
flag.Usage()
os.Exit(1)
}
versions := calculateVersions(httpClient, *flagFrom, *flagTo)
log.Printf("Testing dendrite versions: %v\n", versions)
branchToImageID := buildDendriteImages(httpClient, dockerClient, *flagTempDir, *flagBuildConcurrency, versions)
// make a shared postgres volume
volume, err := dockerClient.VolumeCreate(context.Background(), volume.VolumeCreateBody{
Name: "dendrite_upgrade_test",
})
if err != nil {
log.Fatalf("failed to make docker volume: %s", err)
}
failed := false
defer func() {
perr := recover()
log.Println("removing postgres volume")
verr := dockerClient.VolumeRemove(context.Background(), volume.Name, true)
if perr == nil {
perr = verr
}
if perr != nil {
panic(perr)
}
if failed {
os.Exit(1)
}
}()
// run through images sequentially
for _, v := range versions {
if err = loadAndRunTests(dockerClient, volume.Name, v, branchToImageID); err != nil {
log.Printf("failed to run tests for %v: %s\n", v, err)
failed = true
break
}
}
if err := verifyTests(dockerClient, volume.Name, versions, branchToImageID); err != nil {
log.Printf("failed to verify test results: %s", err)
failed = true
}
}

View File

@ -0,0 +1,57 @@
package main
import (
"archive/tar"
"compress/gzip"
"io"
"os"
"path/filepath"
"strings"
)
// From https://gist.github.com/mimoo/25fc9716e0f1353791f5908f94d6e726
// Modified to strip off top-level when compressing
func compress(src string, buf io.Writer) error {
// tar > gzip > buf
zr := gzip.NewWriter(buf)
tw := tar.NewWriter(zr)
// walk through every file in the folder
_ = filepath.Walk(src, func(file string, fi os.FileInfo, e error) error {
// generate tar header
header, err := tar.FileInfoHeader(fi, file)
if err != nil {
return err
}
// must provide real name
// (see https://golang.org/src/archive/tar/common.go?#L626)
header.Name = strings.TrimPrefix(filepath.ToSlash(file), src+"/")
// write header
if err := tw.WriteHeader(header); err != nil {
return err
}
// if not a dir, write file content
if !fi.IsDir() {
data, err := os.Open(file)
if err != nil {
return err
}
if _, err := io.Copy(tw, data); err != nil {
return err
}
}
return nil
})
// produce tar
if err := tw.Close(); err != nil {
return err
}
// produce gzip
if err := zr.Close(); err != nil {
return err
}
//
return nil
}

View File

@ -0,0 +1,192 @@
package main
import (
"fmt"
"log"
"strings"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
)
const userPassword = "this_is_a_long_password"
type user struct {
userID string
localpart string
client *gomatrix.Client
}
// runTests performs the following operations:
// - register alice and bob with branch name muxed into the localpart
// - create a DM room for the 2 users and exchange messages
// - create/join a public #global room and exchange messages
func runTests(baseURL, branchName string) error {
// register 2 users
users := []user{
{
localpart: "alice" + branchName,
},
{
localpart: "bob" + branchName,
},
}
for i, u := range users {
client, err := gomatrix.NewClient(baseURL, "", "")
if err != nil {
return err
}
resp, err := client.RegisterDummy(&gomatrix.ReqRegister{
Username: strings.ToLower(u.localpart),
Password: userPassword,
})
if err != nil {
return fmt.Errorf("failed to register %s: %s", u.localpart, err)
}
client, err = gomatrix.NewClient(baseURL, resp.UserID, resp.AccessToken)
if err != nil {
return err
}
users[i].client = client
users[i].userID = resp.UserID
}
// create DM room, join it and exchange messages
createRoomResp, err := users[0].client.CreateRoom(&gomatrix.ReqCreateRoom{
Preset: "trusted_private_chat",
Invite: []string{users[1].userID},
IsDirect: true,
})
if err != nil {
return fmt.Errorf("failed to create DM room: %s", err)
}
dmRoomID := createRoomResp.RoomID
if _, err = users[1].client.JoinRoom(dmRoomID, "", nil); err != nil {
return fmt.Errorf("failed to join DM room: %s", err)
}
msgs := []struct {
client *gomatrix.Client
text string
}{
{
client: users[0].client, text: "1: " + branchName,
},
{
client: users[1].client, text: "2: " + branchName,
},
{
client: users[0].client, text: "3: " + branchName,
},
{
client: users[1].client, text: "4: " + branchName,
},
}
for _, msg := range msgs {
_, err = msg.client.SendText(dmRoomID, msg.text)
if err != nil {
return fmt.Errorf("failed to send text in dm room: %s", err)
}
}
// attempt to create/join the shared public room
publicRoomID := ""
createRoomResp, err = users[0].client.CreateRoom(&gomatrix.ReqCreateRoom{
RoomAliasName: "global",
Preset: "public_chat",
})
if err != nil { // this is okay and expected if the room already exists and the aliases clash
// try to join it
_, domain, err2 := gomatrixserverlib.SplitID('@', users[0].userID)
if err2 != nil {
return fmt.Errorf("failed to split user ID: %s, %s", users[0].userID, err2)
}
joinRoomResp, err2 := users[0].client.JoinRoom(fmt.Sprintf("#global:%s", domain), "", nil)
if err2 != nil {
return fmt.Errorf("alice failed to join public room: %s", err2)
}
publicRoomID = joinRoomResp.RoomID
} else {
publicRoomID = createRoomResp.RoomID
}
if _, err = users[1].client.JoinRoom(publicRoomID, "", nil); err != nil {
return fmt.Errorf("bob failed to join public room: %s", err)
}
// send messages
for _, msg := range msgs {
_, err = msg.client.SendText(publicRoomID, "public "+msg.text)
if err != nil {
return fmt.Errorf("failed to send text in public room: %s", err)
}
}
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
return nil
}
// verifyTestsRan checks that the HS has the right rooms/messages
func verifyTestsRan(baseURL string, branchNames []string) error {
log.Println("Verifying tests....")
// check we can login as all users
var resp *gomatrix.RespLogin
for _, branchName := range branchNames {
client, err := gomatrix.NewClient(baseURL, "", "")
if err != nil {
return err
}
userLocalparts := []string{
"alice" + branchName,
"bob" + branchName,
}
for _, userLocalpart := range userLocalparts {
resp, err = client.Login(&gomatrix.ReqLogin{
Type: "m.login.password",
User: strings.ToLower(userLocalpart),
Password: userPassword,
})
if err != nil {
return fmt.Errorf("failed to login as %s: %s", userLocalpart, err)
}
if resp.AccessToken == "" {
return fmt.Errorf("failed to login, bad response: %+v", resp)
}
}
}
log.Println(" accounts exist: OK")
client, err := gomatrix.NewClient(baseURL, resp.UserID, resp.AccessToken)
if err != nil {
return err
}
_, domain, err := gomatrixserverlib.SplitID('@', client.UserID)
if err != nil {
return err
}
u := client.BuildURL("directory", "room", fmt.Sprintf("#global:%s", domain))
r := struct {
RoomID string `json:"room_id"`
}{}
err = client.MakeRequest("GET", u, nil, &r)
if err != nil {
return fmt.Errorf("failed to /directory: %s", err)
}
if r.RoomID == "" {
return fmt.Errorf("/directory lookup returned no room ID")
}
log.Println(" public room exists: OK")
history, err := client.Messages(r.RoomID, client.Store.LoadNextBatch(client.UserID), "", 'b', 100)
if err != nil {
return fmt.Errorf("failed to get /messages: %s", err)
}
// we expect 4 messages per version
msgCount := 0
for _, ev := range history.Chunk {
if ev.Type == "m.room.message" {
msgCount += 1
}
}
wantMsgCount := len(branchNames) * 4
if msgCount != wantMsgCount {
return fmt.Errorf("got %d messages in global room, want %d", msgCount, wantMsgCount)
}
log.Println(" messages exist: OK")
return nil
}

10
go.mod
View File

@ -3,12 +3,19 @@ module github.com/matrix-org/dendrite
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/Masterminds/semver/v3 v3.1.1
github.com/Shopify/sarama v1.28.0
github.com/codeclysm/extract v2.2.0+incompatible
github.com/containerd/containerd v1.5.2 // indirect
github.com/docker/docker v20.10.7+incompatible
github.com/docker/go-connections v0.4.0
github.com/getsentry/sentry-go v0.10.0
github.com/gologme/log v1.2.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/h2non/filetype v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098 // indirect
github.com/lib/pq v1.9.0
github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-circuit v0.4.0
@ -28,6 +35,7 @@ require (
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb
github.com/morikuni/aec v1.0.0 // indirect
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6
github.com/opentracing/opentracing-go v1.2.0
@ -41,7 +49,7 @@ require (
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20210218094457-e77ca8019daa
go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/mobile v0.0.0-20210220033013-bdb1ca9a1e08
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1

644
go.sum

File diff suppressed because it is too large Load Diff