From e226d564ecb6dadac3cdcd3e85782d09c846739a Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 9 May 2017 15:58:31 +0100 Subject: [PATCH] Fix a race with sync server integration tests (#95) --- .../cmd/syncserver-integration-tests/main.go | 155 +++++++++++------- 1 file changed, 99 insertions(+), 56 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index 03e5196c..7e5ae648 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -22,6 +22,7 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "time" "github.com/matrix-org/dendrite/common/test" @@ -60,6 +61,23 @@ var exe = test.KafkaExecutor{ OutputWriter: os.Stderr, } +var ( + lastRequestMutex sync.Mutex + lastRequestErr error +) + +func setLastRequestError(err error) { + lastRequestMutex.Lock() + defer lastRequestMutex.Unlock() + lastRequestErr = err +} + +func getLastRequestError() error { + lastRequestMutex.Lock() + defer lastRequestMutex.Unlock() + return lastRequestErr +} + var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"] roomserver_topic: "` + inputTopic + `" database: "` + testDatabase + `" @@ -107,53 +125,56 @@ func canonicalJSONInput(jsonData []string) []string { return jsonData } -func doSyncRequest(done chan error, want []string, since string) func() { - return func() { - cli := &http.Client{ - Timeout: 5 * time.Second, - } - res, err := cli.Get("http://" + syncserverAddr + "/api/_matrix/client/r0/sync?access_token=@alice:localhost&since=" + since) +// doSyncRequest does a /sync request and returns an error if it fails or doesn't +// return the wanted string. +func doSyncRequest(syncServerURL, want string) error { + cli := &http.Client{ + Timeout: 5 * time.Second, + } + res, err := cli.Get(syncServerURL) + if err != nil { + return err + } + if res.StatusCode != 200 { + return fmt.Errorf("/sync returned HTTP status %d", res.StatusCode) + } + defer res.Body.Close() + resBytes, err := ioutil.ReadAll(res.Body) + if err != nil { + return err + } + jsonBytes, err := gomatrixserverlib.CanonicalJSON(resBytes) + if err != nil { + return err + } + if string(jsonBytes) != want { + return fmt.Errorf("/sync returned wrong bytes. Expected:\n%s\n\nGot:\n%s", want, string(jsonBytes)) + } + return nil +} + +// syncRequestUntilSuccess blocks and performs the same /sync request over and over until +// the response returns the wanted string, where it will close the given channel and return. +// It will keep track of the last error in `lastRequestErr`. +func syncRequestUntilSuccess(done chan error, want string, since string) { + for { + err := doSyncRequest( + "http://"+syncserverAddr+"/api/_matrix/client/r0/sync?access_token=@alice:localhost&since="+since, + want, + ) if err != nil { - done <- err - return + setLastRequestError(err) + time.Sleep(1 * time.Second) // don't tightloop + continue } - if res.StatusCode != 200 { - done <- fmt.Errorf("/sync returned HTTP status %d", res.StatusCode) - return - } - defer res.Body.Close() - resBytes, err := ioutil.ReadAll(res.Body) - if err != nil { - done <- err - return - } - jsonBytes, err := gomatrixserverlib.CanonicalJSON(resBytes) - if err != nil { - done <- err - return - } - if string(jsonBytes) != want[0] { - fmt.Println("Expected:") - fmt.Println(want[0]) - fmt.Println("Got:") - fmt.Println(string(jsonBytes)) - done <- fmt.Errorf("/sync returned wrong bytes") - return - } - // all good, clean up close(done) + return } } -func testSyncServer(input, want []string, since string) { - exe.DeleteTopic(inputTopic) - if err := exe.CreateTopic(inputTopic); err != nil { - panic(err) - } - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil { - panic(err) - } - +// prepareSyncServer creates the database and config file needed for the sync server to run. +// It also prepares the CLI command to execute. +func prepareSyncServer() *exec.Cmd { if err := createDatabase(testDatabaseName); err != nil { panic(err) } @@ -171,36 +192,58 @@ func testSyncServer(input, want []string, since string) { ) cmd.Stderr = os.Stderr cmd.Stdout = os.Stderr + return cmd +} + +func testSyncServer(input, want []string, since string) { + // Write the logs to kafka so the sync server has some data to work with. + exe.DeleteTopic(inputTopic) + if err := exe.CreateTopic(inputTopic); err != nil { + panic(err) + } + if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil { + panic(err) + } + + cmd := prepareSyncServer() if err := cmd.Start(); err != nil { panic("failed to start sync server: " + err.Error()) } done := make(chan error, 1) - // TODO: Waiting 1s is racey. Maybe keep hitting it until it doesn't get Connection Refused? - time.AfterFunc(1*time.Second, doSyncRequest(done, want, since)) + // We need to wait for the sync server to: + // - have created the tables + // - be listening on the given port + // - have consumed the kafka logs + // before we begin hitting it with /sync requests. We don't get told when it has done + // all these things, so we just continually hit /sync until it returns the right bytes. + // We can't even wait for the first valid 200 OK response because it's possible to race + // with consuming the kafka logs (so the /sync response will be missing events and + // therefore fail the test). + go syncRequestUntilSuccess(done, want[0], since) - // wait for it to die or timeout + // wait for the sync server to exit or our test timeout to expire go func() { - cmdErr := cmd.Wait() - if cmdErr != nil { - exitErr, ok := cmdErr.(*exec.ExitError) - if ok { - fmt.Println("\nSYNC SERVER ERROR: (", exitErr, ")") - fmt.Println("sync server failed to run. If failing with 'pq: password authentication failed for user' try:") - fmt.Println(" export PGHOST=/var/run/postgresql\n") - } - } - done <- cmdErr + done <- cmd.Wait() }() select { case <-time.After(timeout): + if reqErr := getLastRequestError(); reqErr != nil { + fmt.Println("Last /sync request error:") + fmt.Println(reqErr) + } + if err := cmd.Process.Kill(); err != nil { panic(err) } panic("dendrite-sync-api-server timed out") - case err := <-done: + case err, open := <-done: cmd.Process.Kill() // ensure server is dead, only cleaning up so don't care about errors this returns. - if err != nil { + if open { // channel is closed on success + fmt.Println("=============================================================================================") + fmt.Println("sync server failed to run. If failing with 'pq: password authentication failed for user' try:") + fmt.Println(" export PGHOST=/var/run/postgresql\n") + fmt.Println("=============================================================================================") panic(err) } }