Pinecone demo updates

This commit is contained in:
Neil Alexander 2021-06-14 13:13:07 +01:00
parent 2c9a390fa6
commit bd9dec8e06
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
4 changed files with 60 additions and 58 deletions

View File

@ -10,7 +10,6 @@ import (
"io"
"io/ioutil"
"log"
"math"
"net"
"net/http"
"os"
@ -37,15 +36,14 @@ import (
userapiAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
"github.com/matrix-org/pinecone/router"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
"github.com/matrix-org/pinecone/types"
pineconeTypes "github.com/matrix-org/pinecone/types"
_ "golang.org/x/mobile/bind"
)
@ -57,19 +55,19 @@ const (
)
type DendriteMonolith struct {
logger logrus.Logger
PineconeRouter *pineconeRouter.Router
PineconeMulticast *pineconeMulticast.Multicast
PineconeQUIC *pineconeSessions.Sessions
StorageDirectory string
CacheDirectory string
staticPeerURI string
staticPeerMutex sync.RWMutex
staticPeerAttempts atomic.Uint32
listener net.Listener
httpServer *http.Server
processContext *process.ProcessContext
userAPI userapiAPI.UserInternalAPI
logger logrus.Logger
PineconeRouter *pineconeRouter.Router
PineconeMulticast *pineconeMulticast.Multicast
PineconeQUIC *pineconeSessions.Sessions
StorageDirectory string
CacheDirectory string
staticPeerURI string
staticPeerMutex sync.RWMutex
staticPeerAttempt chan struct{}
listener net.Listener
httpServer *http.Server
processContext *process.ProcessContext
userAPI userapiAPI.UserInternalAPI
}
func (m *DendriteMonolith) BaseURL() string {
@ -99,7 +97,9 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) {
m.staticPeerMutex.Unlock()
m.DisconnectType(pineconeRouter.PeerTypeRemote)
if uri != "" {
go m.staticPeerConnect()
go func() {
m.staticPeerAttempt <- struct{}{}
}()
}
}
@ -195,17 +195,27 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
}
func (m *DendriteMonolith) staticPeerConnect() {
m.staticPeerMutex.RLock()
uri := m.staticPeerURI
m.staticPeerMutex.RUnlock()
if uri == "" {
return
attempt := func() {
if m.PineconeRouter.PeerCount(router.PeerTypeRemote) == 0 {
m.staticPeerMutex.RLock()
uri := m.staticPeerURI
m.staticPeerMutex.RUnlock()
if uri == "" {
return
}
if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
}
if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil {
exp := time.Second * time.Duration(math.Exp2(float64(m.staticPeerAttempts.Inc())))
time.AfterFunc(exp, m.staticPeerConnect)
} else {
m.staticPeerAttempts.Store(0)
for {
select {
case <-m.processContext.Context().Done():
case <-m.staticPeerAttempt:
attempt()
case <-time.After(time.Second * 5):
attempt()
}
}
}
@ -248,13 +258,6 @@ func (m *DendriteMonolith) Start() {
m.PineconeQUIC = pineconeSessions.NewSessions(logger, m.PineconeRouter)
m.PineconeMulticast = pineconeMulticast.NewMulticast(logger, m.PineconeRouter)
m.PineconeRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
if peertype == pineconeRouter.PeerTypeRemote {
m.staticPeerAttempts.Store(0)
time.AfterFunc(time.Second, m.staticPeerConnect)
}
})
prefix := hex.EncodeToString(pk)
cfg := &config.Dendrite{}
cfg.Defaults()
@ -359,8 +362,12 @@ func (m *DendriteMonolith) Start() {
},
Handler: h2c.NewHandler(pMux, h2s),
}
m.processContext = base.ProcessContext
m.staticPeerAttempt = make(chan struct{}, 1)
go m.staticPeerConnect()
go func() {
m.logger.Info("Listening on ", cfg.Global.ServerName)
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC))

View File

@ -23,7 +23,6 @@ import (
"fmt"
"io/ioutil"
"log"
"math"
"net"
"net/http"
"os"
@ -48,12 +47,11 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
"go.uber.org/atomic"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
"github.com/matrix-org/pinecone/router"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
pineconeTypes "github.com/matrix-org/pinecone/types"
"github.com/sirupsen/logrus"
)
@ -123,27 +121,23 @@ func main() {
pMulticast := pineconeMulticast.NewMulticast(logger, pRouter)
pMulticast.Start()
var staticPeerAttempts atomic.Uint32
var connectToStaticPeer func()
connectToStaticPeer = func() {
uri := *instancePeer
if uri == "" {
return
connectToStaticPeer := func() {
attempt := func() {
if pRouter.PeerCount(router.PeerTypeRemote) == 0 {
uri := *instancePeer
if uri == "" {
return
}
if err := conn.ConnectToPeer(pRouter, uri); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
}
if err := conn.ConnectToPeer(pRouter, uri); err != nil {
exp := time.Second * time.Duration(math.Exp2(float64(staticPeerAttempts.Inc())))
time.AfterFunc(exp, connectToStaticPeer)
} else {
staticPeerAttempts.Store(0)
for {
attempt()
time.Sleep(time.Second * 5)
}
}
pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
if peertype == pineconeRouter.PeerTypeRemote && err != nil {
staticPeerAttempts.Store(0)
time.AfterFunc(time.Second, connectToStaticPeer)
}
})
go connectToStaticPeer()
cfg := &config.Dendrite{}
cfg.Defaults()
@ -257,6 +251,7 @@ func main() {
Handler: pMux,
}
go connectToStaticPeer()
go func() {
pubkey := pRouter.PublicKey()
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))

2
go.mod
View File

@ -25,7 +25,7 @@ require (
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161
github.com/matrix-org/pinecone v0.0.0-20210602111459-5cb0e6aa1a6a
github.com/matrix-org/pinecone v0.0.0-20210614112651-5da1fab608c6
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646

4
go.sum
View File

@ -706,8 +706,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a h1:pV
github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161 h1:h1XVh05pLoC+nJjP3GIpj5wUsuC8WdHP3He0RTkRJTs=
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/pinecone v0.0.0-20210602111459-5cb0e6aa1a6a h1:BE/cfpyHO2ua1BK4Tibr+2oZCV3H1mC9G7g7Yvl1AmM=
github.com/matrix-org/pinecone v0.0.0-20210602111459-5cb0e6aa1a6a/go.mod h1:UQzJS6UVyVwfkr+RLrdvBB1vLyECqe3fLYNcbRxv8SA=
github.com/matrix-org/pinecone v0.0.0-20210614112651-5da1fab608c6 h1:ytVf81AkLmMAs0KeCYW6po0X3foMSKz0HccnipfsOVc=
github.com/matrix-org/pinecone v0.0.0-20210614112651-5da1fab608c6/go.mod h1:UQzJS6UVyVwfkr+RLrdvBB1vLyECqe3fLYNcbRxv8SA=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=