From 42564e8ed63b36e2d72d630a65c82eb5d3ddf272 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 9 May 2017 09:05:05 +0100 Subject: [PATCH] Factor out creating/deleting/writing to kafka topics (#94) --- .../cmd/roomserver-integration-tests/main.go | 68 +++++------------ .../cmd/syncserver-integration-tests/main.go | 62 ++++----------- .../matrix-org/dendrite/common/test/kafka.go | 76 +++++++++++++++++++ 3 files changed, 107 insertions(+), 99 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/common/test/kafka.go diff --git a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go index 3f58658c..f11c1c89 100644 --- a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go @@ -16,13 +16,15 @@ package main import ( "fmt" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" "os" "os/exec" "path/filepath" "strings" "time" + + "github.com/matrix-org/dendrite/common/test" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" ) var ( @@ -46,6 +48,15 @@ var ( testDatabase = defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s binary_parameters=yes", testDatabaseName)) ) +var exe = test.KafkaExecutor{ + ZookeeperURI: zookeeperURI, + KafkaDirectory: kafkaDir, + KafkaURI: kafkaURI, + // Send stdout and stderr to our stderr so that we see error messages from + // the kafka process. + OutputWriter: os.Stderr, +} + func defaulting(value, defaultValue string) string { if value == "" { value = defaultValue @@ -75,36 +86,6 @@ func createDatabase(database string) error { return cmd.Run() } -func createTopic(topic string) error { - cmd := exec.Command( - filepath.Join(kafkaDir, "bin", "kafka-topics.sh"), - "--create", - "--zookeeper", zookeeperURI, - "--replication-factor", "1", - "--partitions", "1", - "--topic", topic, - ) - // Send stdout and stderr to our stderr so that we see error messages from - // the kafka process. - cmd.Stdout = os.Stderr - cmd.Stderr = os.Stderr - return cmd.Run() -} - -func writeToTopic(topic string, data []string) error { - cmd := exec.Command( - filepath.Join(kafkaDir, "bin", "kafka-console-producer.sh"), - "--broker-list", kafkaURI, - "--topic", topic, - ) - // Send stdout and stderr to our stderr so that we see error messages from - // the kafka process. - cmd.Stdout = os.Stderr - cmd.Stderr = os.Stderr - cmd.Stdin = strings.NewReader(strings.Join(data, "\n")) - return cmd.Run() -} - // runAndReadFromTopic runs a command and waits for a number of messages to be // written to a kafka topic. It returns if the command exits, the number of // messages is reached or after a timeout. It kills the command before it returns. @@ -173,19 +154,6 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP return lines, nil } -func deleteTopic(topic string) error { - cmd := exec.Command( - filepath.Join(kafkaDir, "bin", "kafka-topics.sh"), - "--delete", - "--if-exists", - "--zookeeper", zookeeperURI, - "--topic", topic, - ) - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stderr - return cmd.Run() -} - // testRoomserver is used to run integration tests against a single roomserver. // It creates new kafka topics for the input and output of the roomserver. // It writes the input messages to the input kafka topic, formatting each message @@ -200,16 +168,16 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R inputTopic = "roomserverInput" outputTopic = "roomserverOutput" ) - deleteTopic(inputTopic) - if err := createTopic(inputTopic); err != nil { + exe.DeleteTopic(inputTopic) + if err := exe.CreateTopic(inputTopic); err != nil { panic(err) } - deleteTopic(outputTopic) - if err := createTopic(outputTopic); err != nil { + exe.DeleteTopic(outputTopic) + if err := exe.CreateTopic(outputTopic); err != nil { panic(err) } - if err := writeToTopic(inputTopic, canonicalJSONInput(input)); err != nil { + if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil { panic(err) } diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index 4cf1096b..03e5196c 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/matrix-org/dendrite/common/test" "github.com/matrix-org/gomatrixserverlib" ) @@ -50,6 +51,15 @@ var ( const inputTopic = "syncserverInput" +var exe = test.KafkaExecutor{ + ZookeeperURI: zookeeperURI, + KafkaDirectory: kafkaDir, + KafkaURI: kafkaURI, + // Send stdout and stderr to our stderr so that we see error messages from + // the kafka process. + OutputWriter: os.Stderr, +} + var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"] roomserver_topic: "` + inputTopic + `" database: "` + testDatabase + `" @@ -85,52 +95,6 @@ func createDatabase(database string) error { return cmd.Run() } -// TODO: dupes roomserver integration tests. Factor out. -func createTopic(topic string) error { - cmd := exec.Command( - filepath.Join(kafkaDir, "bin", "kafka-topics.sh"), - "--create", - "--zookeeper", zookeeperURI, - "--replication-factor", "1", - "--partitions", "1", - "--topic", topic, - ) - // Send stdout and stderr to our stderr so that we see error messages from - // the kafka process. - cmd.Stdout = os.Stderr - cmd.Stderr = os.Stderr - return cmd.Run() -} - -// TODO: dupes roomserver integration tests. Factor out. -func writeToTopic(topic string, data []string) error { - cmd := exec.Command( - filepath.Join(kafkaDir, "bin", "kafka-console-producer.sh"), - "--broker-list", kafkaURI, - "--topic", topic, - ) - // Send stdout and stderr to our stderr so that we see error messages from - // the kafka process. - cmd.Stdout = os.Stderr - cmd.Stderr = os.Stderr - cmd.Stdin = strings.NewReader(strings.Join(data, "\n")) - return cmd.Run() -} - -// TODO: dupes roomserver integration tests. Factor out. -func deleteTopic(topic string) error { - cmd := exec.Command( - filepath.Join(kafkaDir, "bin", "kafka-topics.sh"), - "--delete", - "--if-exists", - "--zookeeper", zookeeperURI, - "--topic", topic, - ) - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stderr - return cmd.Run() -} - // TODO: dupes roomserver integration tests. Factor out. func canonicalJSONInput(jsonData []string) []string { for i := range jsonData { @@ -182,11 +146,11 @@ func doSyncRequest(done chan error, want []string, since string) func() { } func testSyncServer(input, want []string, since string) { - deleteTopic(inputTopic) - if err := createTopic(inputTopic); err != nil { + exe.DeleteTopic(inputTopic) + if err := exe.CreateTopic(inputTopic); err != nil { panic(err) } - if err := writeToTopic(inputTopic, canonicalJSONInput(input)); err != nil { + if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil { panic(err) } diff --git a/src/github.com/matrix-org/dendrite/common/test/kafka.go b/src/github.com/matrix-org/dendrite/common/test/kafka.go new file mode 100644 index 00000000..cbf24630 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/test/kafka.go @@ -0,0 +1,76 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "io" + "os/exec" + "path/filepath" + "strings" +) + +// KafkaExecutor executes kafka scripts. +type KafkaExecutor struct { + // The location of Zookeeper. Typically this is `localhost:2181`. + ZookeeperURI string + // The directory where Kafka is installed to. Used to locate kafka scripts. + KafkaDirectory string + // The location of the Kafka logs. Typically this is `localhost:9092`. + KafkaURI string + // Where stdout and stderr should be written to. Typically this is `os.Stderr`. + OutputWriter io.Writer +} + +// CreateTopic creates a new kafka topic. This is created with a single partition. +func (e *KafkaExecutor) CreateTopic(topic string) error { + cmd := exec.Command( + filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"), + "--create", + "--zookeeper", e.ZookeeperURI, + "--replication-factor", "1", + "--partitions", "1", + "--topic", topic, + ) + cmd.Stdout = e.OutputWriter + cmd.Stderr = e.OutputWriter + return cmd.Run() +} + +// WriteToTopic writes data to a kafka topic. +func (e *KafkaExecutor) WriteToTopic(topic string, data []string) error { + cmd := exec.Command( + filepath.Join(e.KafkaDirectory, "bin", "kafka-console-producer.sh"), + "--broker-list", e.KafkaURI, + "--topic", topic, + ) + cmd.Stdout = e.OutputWriter + cmd.Stderr = e.OutputWriter + cmd.Stdin = strings.NewReader(strings.Join(data, "\n")) + return cmd.Run() +} + +// DeleteTopic deletes a given kafka topic if it exists. +func (e *KafkaExecutor) DeleteTopic(topic string) error { + cmd := exec.Command( + filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"), + "--delete", + "--if-exists", + "--zookeeper", e.ZookeeperURI, + "--topic", topic, + ) + cmd.Stderr = e.OutputWriter + cmd.Stdout = e.OutputWriter + return cmd.Run() +}