197 lines
3.6 KiB
Go
197 lines
3.6 KiB
Go
package bitcoinity
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"golang.org/x/net/websocket"
|
|
)
|
|
|
|
type Websocket interface {
|
|
Join(Topic, Handler) error
|
|
Shutdown() error
|
|
}
|
|
|
|
type Handler func(*Message)
|
|
|
|
type Message struct {
|
|
Event Event `json:"event"`
|
|
Topic Topic `json:"topic"`
|
|
Ref *MessageId `json:"ref"`
|
|
Payload json.RawMessage `json:"payload"`
|
|
}
|
|
|
|
type MarketPayload struct {
|
|
Data MarketData `json:"data"`
|
|
}
|
|
|
|
type MarketData struct {
|
|
Currency Currency `json:"currency"`
|
|
Exchange Exchange `json:"exchange_name"`
|
|
Trade MarketTrade `json:"trade"`
|
|
}
|
|
|
|
type MarketTrade struct {
|
|
Amount float64 `json:"amount"`
|
|
Date float64 `json:"date"`
|
|
Exchange Exchange `json:"exchange_name"`
|
|
Price float64 `json:"price"`
|
|
}
|
|
|
|
type realWebsocket struct {
|
|
conn *websocket.Conn
|
|
inbox chan *Message
|
|
outbox chan outgoing
|
|
|
|
pending map[MessageId]Handler
|
|
subscriptions map[Topic]Handler
|
|
nextMsgId int64
|
|
|
|
running atomic.Bool
|
|
}
|
|
|
|
type outgoing struct {
|
|
Request *Message
|
|
ResponseHandler Handler
|
|
TopicHandler Handler
|
|
}
|
|
|
|
func newWebsocket(conn *websocket.Conn) *realWebsocket {
|
|
ws := realWebsocket{
|
|
conn: conn,
|
|
inbox: make(chan *Message),
|
|
outbox: make(chan outgoing),
|
|
pending: map[MessageId]Handler{},
|
|
subscriptions: map[Topic]Handler{},
|
|
}
|
|
ws.running.Store(true)
|
|
|
|
go ws.heartbeat()
|
|
go ws.reader()
|
|
go ws.mailman()
|
|
return &ws
|
|
}
|
|
|
|
func (ws *realWebsocket) heartbeat() {
|
|
for ws.running.Load() {
|
|
ws.outbox <- outgoing{
|
|
Request: &Message{
|
|
Topic: "phoenix",
|
|
Event: "heartbeat",
|
|
},
|
|
}
|
|
time.Sleep(time.Second * 30)
|
|
}
|
|
}
|
|
|
|
func (ws *realWebsocket) reader() {
|
|
var partial []byte
|
|
failures := 0
|
|
|
|
for ws.running.Load() {
|
|
buf := make([]byte, 16*1024)
|
|
n, err := ws.conn.Read(buf)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
ws.running.Store(false)
|
|
return
|
|
}
|
|
fmt.Printf("websocket read: %v\n", err)
|
|
continue
|
|
}
|
|
|
|
useful := buf[0:n]
|
|
partial = append(partial, useful...)
|
|
|
|
var msg Message
|
|
err = json.Unmarshal(partial, &msg)
|
|
if err != nil {
|
|
failures += 1
|
|
continue
|
|
}
|
|
failures = 0
|
|
partial = nil
|
|
ws.inbox <- &msg
|
|
}
|
|
}
|
|
|
|
func (ws *realWebsocket) mailman() {
|
|
for ws.running.Load() {
|
|
select {
|
|
case i := <-ws.inbox:
|
|
ws.dispatch(i)
|
|
case o := <-ws.outbox:
|
|
_ = ws.send(o)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ws *realWebsocket) dispatch(m *Message) {
|
|
var handler Handler
|
|
if m.Ref != nil {
|
|
handler = ws.pending[*m.Ref]
|
|
delete(ws.pending, *m.Ref)
|
|
} else if m.Topic != "" {
|
|
handler = ws.subscriptions[m.Topic]
|
|
}
|
|
if handler != nil {
|
|
handler(m)
|
|
}
|
|
}
|
|
|
|
func (ws *realWebsocket) send(o outgoing) error {
|
|
if o.Request.Ref == nil {
|
|
msgId := ws.nextMsgId + 1
|
|
ws.nextMsgId += 1
|
|
msgIdStr := fmt.Sprintf("%d", msgId)
|
|
o.Request.Ref = (*MessageId)(&msgIdStr)
|
|
}
|
|
req, err := json.Marshal(o.Request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = ws.conn.Write(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if o.ResponseHandler != nil {
|
|
ws.pending[*o.Request.Ref] = o.ResponseHandler
|
|
}
|
|
if o.TopicHandler != nil {
|
|
ws.subscriptions[o.Request.Topic] = o.TopicHandler
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ws *realWebsocket) Join(topic Topic, topicHandler Handler) error {
|
|
respChan, respHandler := oneShot()
|
|
ws.outbox <- outgoing{
|
|
Request: &Message{
|
|
Event: EventPhxJoin,
|
|
Topic: topic,
|
|
Payload: EmptyPayload,
|
|
},
|
|
ResponseHandler: respHandler,
|
|
TopicHandler: topicHandler,
|
|
}
|
|
<-respChan
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ws *realWebsocket) Shutdown() error {
|
|
ws.running.Store(false)
|
|
err := ws.conn.Close()
|
|
return err
|
|
}
|
|
|
|
func oneShot() (<-chan *Message, Handler) {
|
|
pipe := make(chan *Message)
|
|
return pipe, func(msg *Message) {
|
|
pipe <- msg
|
|
}
|
|
}
|