moonmath/bitcoinity/websocket.go

197 lines
3.6 KiB
Go

package bitcoinity
import (
"encoding/json"
"fmt"
"io"
"sync/atomic"
"time"
"golang.org/x/net/websocket"
)
type Websocket interface {
Join(Topic, Handler) error
Shutdown() error
}
type Handler func(*Message)
type Message struct {
Event Event `json:"event"`
Topic Topic `json:"topic"`
Ref *MessageId `json:"ref"`
Payload json.RawMessage `json:"payload"`
}
type MarketPayload struct {
Data MarketData `json:"data"`
}
type MarketData struct {
Currency Currency `json:"currency"`
Exchange Exchange `json:"exchange_name"`
Trade MarketTrade `json:"trade"`
}
type MarketTrade struct {
Amount float64 `json:"amount"`
Date float64 `json:"date"`
Exchange Exchange `json:"exchange_name"`
Price float64 `json:"price"`
}
type realWebsocket struct {
conn *websocket.Conn
inbox chan *Message
outbox chan outgoing
pending map[MessageId]Handler
subscriptions map[Topic]Handler
nextMsgId int64
running atomic.Bool
}
type outgoing struct {
Request *Message
ResponseHandler Handler
TopicHandler Handler
}
func newWebsocket(conn *websocket.Conn) *realWebsocket {
ws := realWebsocket{
conn: conn,
inbox: make(chan *Message),
outbox: make(chan outgoing),
pending: map[MessageId]Handler{},
subscriptions: map[Topic]Handler{},
}
ws.running.Store(true)
go ws.heartbeat()
go ws.reader()
go ws.mailman()
return &ws
}
func (ws *realWebsocket) heartbeat() {
for ws.running.Load() {
ws.outbox <- outgoing{
Request: &Message{
Topic: "phoenix",
Event: "heartbeat",
},
}
time.Sleep(time.Second * 30)
}
}
func (ws *realWebsocket) reader() {
var partial []byte
failures := 0
for ws.running.Load() {
buf := make([]byte, 16*1024)
n, err := ws.conn.Read(buf)
if err != nil {
if err == io.EOF {
ws.running.Store(false)
return
}
fmt.Printf("websocket read: %v\n", err)
continue
}
useful := buf[0:n]
partial = append(partial, useful...)
var msg Message
err = json.Unmarshal(partial, &msg)
if err != nil {
failures += 1
continue
}
failures = 0
partial = nil
ws.inbox <- &msg
}
}
func (ws *realWebsocket) mailman() {
for ws.running.Load() {
select {
case i := <-ws.inbox:
ws.dispatch(i)
case o := <-ws.outbox:
_ = ws.send(o)
}
}
}
func (ws *realWebsocket) dispatch(m *Message) {
var handler Handler
if m.Ref != nil {
handler = ws.pending[*m.Ref]
delete(ws.pending, *m.Ref)
} else if m.Topic != "" {
handler = ws.subscriptions[m.Topic]
}
if handler != nil {
handler(m)
}
}
func (ws *realWebsocket) send(o outgoing) error {
if o.Request.Ref == nil {
msgId := ws.nextMsgId + 1
ws.nextMsgId += 1
msgIdStr := fmt.Sprintf("%d", msgId)
o.Request.Ref = (*MessageId)(&msgIdStr)
}
req, err := json.Marshal(o.Request)
if err != nil {
return err
}
_, err = ws.conn.Write(req)
if err != nil {
return err
}
if o.ResponseHandler != nil {
ws.pending[*o.Request.Ref] = o.ResponseHandler
}
if o.TopicHandler != nil {
ws.subscriptions[o.Request.Topic] = o.TopicHandler
}
return nil
}
func (ws *realWebsocket) Join(topic Topic, topicHandler Handler) error {
respChan, respHandler := oneShot()
ws.outbox <- outgoing{
Request: &Message{
Event: EventPhxJoin,
Topic: topic,
Payload: EmptyPayload,
},
ResponseHandler: respHandler,
TopicHandler: topicHandler,
}
<-respChan
return nil
}
func (ws *realWebsocket) Shutdown() error {
ws.running.Store(false)
err := ws.conn.Close()
return err
}
func oneShot() (<-chan *Message, Handler) {
pipe := make(chan *Message)
return pipe, func(msg *Message) {
pipe <- msg
}
}