package bitcoinity import ( "encoding/json" "fmt" "io" "sync/atomic" "time" "golang.org/x/net/websocket" ) type Websocket interface { Join(Topic, Handler) error Shutdown() error } type Handler func(*Message) type Message struct { Event Event `json:"event"` Topic Topic `json:"topic"` Ref *MessageId `json:"ref"` Payload json.RawMessage `json:"payload"` } type MarketPayload struct { Data MarketData `json:"data"` } type MarketData struct { Currency Currency `json:"currency"` Exchange Exchange `json:"exchange_name"` Trade MarketTrade `json:"trade"` } type MarketTrade struct { Amount float64 `json:"amount"` Date float64 `json:"date"` Exchange Exchange `json:"exchange_name"` Price float64 `json:"price"` } type realWebsocket struct { conn *websocket.Conn inbox chan *Message outbox chan outgoing pending map[MessageId]Handler subscriptions map[Topic]Handler nextMsgId int64 running atomic.Bool } type outgoing struct { Request *Message ResponseHandler Handler TopicHandler Handler } func newWebsocket(conn *websocket.Conn) *realWebsocket { ws := realWebsocket{ conn: conn, inbox: make(chan *Message), outbox: make(chan outgoing), pending: map[MessageId]Handler{}, subscriptions: map[Topic]Handler{}, } ws.running.Store(true) go ws.heartbeat() go ws.reader() go ws.mailman() return &ws } func (ws *realWebsocket) heartbeat() { for ws.running.Load() { ws.outbox <- outgoing{ Request: &Message{ Topic: "phoenix", Event: "heartbeat", }, } time.Sleep(time.Second * 30) } } func (ws *realWebsocket) reader() { var partial []byte failures := 0 for ws.running.Load() { buf := make([]byte, 16*1024) n, err := ws.conn.Read(buf) if err != nil { if err == io.EOF { ws.running.Store(false) return } fmt.Printf("websocket read: %v\n", err) continue } useful := buf[0:n] partial = append(partial, useful...) var msg Message err = json.Unmarshal(partial, &msg) if err != nil { failures += 1 continue } failures = 0 partial = nil ws.inbox <- &msg } } func (ws *realWebsocket) mailman() { for ws.running.Load() { select { case i := <-ws.inbox: ws.dispatch(i) case o := <-ws.outbox: _ = ws.send(o) } } } func (ws *realWebsocket) dispatch(m *Message) { var handler Handler if m.Ref != nil { handler = ws.pending[*m.Ref] delete(ws.pending, *m.Ref) } else if m.Topic != "" { handler = ws.subscriptions[m.Topic] } if handler != nil { handler(m) } } func (ws *realWebsocket) send(o outgoing) error { if o.Request.Ref == nil { msgId := ws.nextMsgId + 1 ws.nextMsgId += 1 msgIdStr := fmt.Sprintf("%d", msgId) o.Request.Ref = (*MessageId)(&msgIdStr) } req, err := json.Marshal(o.Request) if err != nil { return err } _, err = ws.conn.Write(req) if err != nil { return err } if o.ResponseHandler != nil { ws.pending[*o.Request.Ref] = o.ResponseHandler } if o.TopicHandler != nil { ws.subscriptions[o.Request.Topic] = o.TopicHandler } return nil } func (ws *realWebsocket) Join(topic Topic, topicHandler Handler) error { respChan, respHandler := oneShot() ws.outbox <- outgoing{ Request: &Message{ Event: EventPhxJoin, Topic: topic, Payload: EmptyPayload, }, ResponseHandler: respHandler, TopicHandler: topicHandler, } <-respChan return nil } func (ws *realWebsocket) Shutdown() error { ws.running.Store(false) err := ws.conn.Close() return err } func oneShot() (<-chan *Message, Handler) { pipe := make(chan *Message) return pipe, func(msg *Message) { pipe <- msg } }