diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 249afe60..ae926bab 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -112,6 +112,7 @@ listen: media_api: "localhost:7774" public_rooms_api: "localhost:7775" federation_sender: "localhost:7776" + appservice: "localhost:7777" # The configuration for tracing the dendrite components. tracing: diff --git a/src/github.com/matrix-org/dendrite/appservice/README.md b/src/github.com/matrix-org/dendrite/appservice/README.md new file mode 100644 index 00000000..5b00386d --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/README.md @@ -0,0 +1,10 @@ +# Application Service + +This component interfaces with external [Application +Services](https://matrix.org/docs/spec/application_service/unstable.html). +This includes any HTTP endpoints that Application Services call, as well as talking +to any HTTP endpoints that Application Services provide themselves. + +## Consumers + +This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services. \ No newline at end of file diff --git a/src/github.com/matrix-org/dendrite/appservice/appservice.go b/src/github.com/matrix-org/dendrite/appservice/appservice.go new file mode 100644 index 00000000..2eecf8a2 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/appservice.go @@ -0,0 +1,50 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package appservice + +import ( + "github.com/matrix-org/dendrite/appservice/consumers" + "github.com/matrix-org/dendrite/appservice/routing" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +// SetupAppServiceAPIComponent sets up and registers HTTP handlers for the AppServices +// component. +func SetupAppServiceAPIComponent( + base *basecomponent.BaseDendrite, + accountsDB *accounts.Database, + federation *gomatrixserverlib.FederationClient, + aliasAPI api.RoomserverAliasAPI, + queryAPI api.RoomserverQueryAPI, + transactionsCache *transactions.Cache, +) { + consumer := consumers.NewOutputRoomEventConsumer( + base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, + ) + if err := consumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start app service's room server consumer") + } + + // Set up HTTP Endpoints + routing.Setup( + base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB, + federation, transactionsCache, + ) +} diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go new file mode 100644 index 00000000..d0b6a5ba --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -0,0 +1,144 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + + log "github.com/sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { + roomServerConsumer *common.ContinualConsumer + db *accounts.Database + query api.RoomserverQueryAPI + serverName string +} + +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + store *accounts.Database, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEventConsumer { + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEventConsumer{ + roomServerConsumer: &consumer, + db: store, + query: queryAPI, + serverName: string(cfg.Matrix.ServerName), + } + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from room servers +func (s *OutputRoomEventConsumer) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the room server output log. +// It is not safe for this function to be called from multiple goroutines, or else the +// sync stream position may race and be incorrectly calculated. +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + + ev := output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "type": ev.Type(), + }).Info("appservice received event from roomserver") + + events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + if err != nil { + return err + } + + return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs) +} + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEventConsumer) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + // If the event is a membership update (e.g. for a profile update), it won't + // show up in AddsStateEventIDs, so we need to add it manually + if event.Type() == "m.room.member" { + return []gomatrixserverlib.Event{event}, nil + } + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + result := []gomatrixserverlib.Event{} + missing := []string{} + for _, id := range addsStateEventIDs { + // Append the current event in the results if its ID is in the events list + if id == event.EventID() { + result = append(result, event) + } else { + // If the event isn't the current one, add it to the list of events + // to retrieve from the roomserver + missing = append(missing, id) + } + } + + // Request the missing events from the roomserver + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + + return result, nil +} diff --git a/src/github.com/matrix-org/dendrite/appservice/routing/routing.go b/src/github.com/matrix-org/dendrite/appservice/routing/routing.go new file mode 100644 index 00000000..f0b8461d --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/routing/routing.go @@ -0,0 +1,61 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const pathPrefixApp = "/_matrix/app/r0" + +// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client +// to clients which need to make outbound HTTP requests. +func Setup( + apiMux *mux.Router, cfg config.Dendrite, // nolint: unparam + queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, // nolint: unparam + accountDB *accounts.Database, // nolint: unparam + federation *gomatrixserverlib.FederationClient, // nolint: unparam + transactionsCache *transactions.Cache, // nolint: unparam +) { + appMux := apiMux.PathPrefix(pathPrefixApp).Subrouter() + + appMux.Handle("/alias", + common.MakeExternalAPI("alias", func(req *http.Request) util.JSONResponse { + // TODO: Implement + return util.JSONResponse{ + Code: http.StatusOK, + JSON: nil, + } + }), + ).Methods(http.MethodGet, http.MethodOptions) + appMux.Handle("/user", + common.MakeExternalAPI("user", func(req *http.Request) util.JSONResponse { + // TODO: Implement + return util.JSONResponse{ + Code: http.StatusOK, + JSON: nil, + } + }), + ).Methods(http.MethodGet, http.MethodOptions) +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go new file mode 100644 index 00000000..3c537bea --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go @@ -0,0 +1,38 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/matrix-org/dendrite/appservice" + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/transactions" +) + +func main() { + cfg := basecomponent.ParseFlags() + base := basecomponent.NewBaseDendrite(cfg, "AppService") + + defer base.Close() // nolint: errcheck + accountDB := base.CreateAccountsDB() + federation := base.CreateFederationClient() + alias, _, query := base.CreateHTTPRoomserverAPIs() + cache := transactions.New() + + appservice.SetupAppServiceAPIComponent( + base, accountDB, federation, alias, query, cache, + ) + + base.SetupAndServeHTTP(string(base.Cfg.Listen.FederationSender)) +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 84c2b394..2d3bc09e 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" @@ -65,6 +66,7 @@ func main() { mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query) + appservice.SetupAppServiceAPIComponent(base, accountDB, federation, alias, query, transactions.New()) httpHandler := common.WrapHandlerInCORS(base.APIMux)