Initial public commit.

This commit is contained in:
2024-03-17 02:10:05 -07:00
commit 23ed509200
20 changed files with 1298 additions and 0 deletions

44
bitcoinity/client.go Normal file
View File

@@ -0,0 +1,44 @@
package bitcoinity
import (
"context"
"github.com/carlmjohnson/requests"
"golang.org/x/net/websocket"
)
func GetWebsocket() (Websocket, error) {
origin := baseUrl
url := "wss://bitcoinity.org/webs_bridge/websocket?vsn=1.0.0"
conn, err := websocket.Dial(url, "", origin)
if err != nil {
return nil, err
}
ws := newWebsocket(conn)
return ws, nil
}
func GetTicker(
ctx context.Context, req *GetTickerRequest,
) (*GetTickerResponse, error) {
var resp GetTickerResponse
err := requests.New(commonConfig).
Path("/markets/get_ticker").
Param("currency", string(req.Currency)).
Param("exchange", string(req.Exchange)).
Param("span", string(req.Span)).
ToJSON(&resp).
Fetch(ctx)
if err != nil {
return nil, err
}
return &resp, nil
}
const baseUrl = "https://bitcoinity.org"
func commonConfig(rb *requests.Builder) {
rb.
BaseURL(baseUrl).
Accept("application/json;charset=utf-8")
}

File diff suppressed because one or more lines are too long

101
bitcoinity/model.go Normal file
View File

@@ -0,0 +1,101 @@
package bitcoinity
import (
"encoding/json"
"fmt"
)
var EmptyPayload json.RawMessage = []byte("{}")
type MessageId string
type Topic string
const TopicAll Topic = "all"
func MarketTopic(e Exchange, c Currency) Topic {
topic := fmt.Sprintf("webs:markets_%s_%s", e, c)
return Topic(topic)
}
type Event string
const (
EventPhxJoin Event = "phx_join"
EventPhxReply Event = "phx_reply"
EventNewMsg Event = "new_msg"
)
type GetTickerRequest struct {
Currency Currency
Exchange Exchange
Span Span
}
type GetTickerResponse struct {
TickerLife int `json:"ticker_life"`
VolumeResolution int `json:"volume_resolution"`
PriceChange string `json:"price_change"`
PriceHigh string `json:"price_high"`
PriceLow string `json:"price_low"`
Buys []Trade `json:"buys"`
Sells []Trade `json:"sells"`
Lasts []Trade `json:"lasts"`
Volume []Volume `json:"volume"`
}
type Currency string
const (
USD Currency = "USD"
EUR Currency = "EUR"
GBP Currency = "GBP"
AUD Currency = "AUD"
JPY Currency = "JPY"
CAD Currency = "CAD"
)
type Exchange string
const (
Coinbase Exchange = "coinbase"
Bitfinex Exchange = "bitfinex"
Bitstamp Exchange = "bitstamp"
Kraken Exchange = "kraken"
Gemini Exchange = "gemini"
)
type Span string
const (
Span10Minutes Span = "10m"
Span1Hour Span = "1h"
Span12Hours Span = "12h"
Span24Hours Span = "24h"
Span3Days Span = "3d"
Span7Days Span = "7d"
Span30Days Span = "30d"
Span6Months Span = "6m"
Span2Years Span = "2y"
SpanAll Span = "all"
)
type Trade struct {
Timestamp int64
Price float64
}
func (t *Trade) UnmarshalJSON(b []byte) error {
a := []interface{}{&t.Timestamp, &t.Price}
return json.Unmarshal(b, &a)
}
type Volume struct {
Timestamp int64
Size float64
}
func (v *Volume) UnmarshalJSON(b []byte) error {
a := []interface{}{&v.Timestamp, &v.Size}
return json.Unmarshal(b, &a)
}

23
bitcoinity/model_test.go Normal file
View File

@@ -0,0 +1,23 @@
package bitcoinity_test
import (
_ "embed"
"encoding/json"
"testing"
"code.humancabbage.net/moonmath/bitcoinity"
)
func TestUnmarshalGetTickerResponse(t *testing.T) {
var resp bitcoinity.GetTickerResponse
err := json.Unmarshal(getTickerJson, &resp)
if err != nil {
t.Errorf("failed to unmarshal get_ticker JSON: %v", err)
}
if resp.TickerLife != 604800 {
t.Errorf("expected TickerLife == 604800, not %v", resp.TickerLife)
}
}
//go:embed get_ticker.json
var getTickerJson []byte

196
bitcoinity/websocket.go Normal file
View File

@@ -0,0 +1,196 @@
package bitcoinity
import (
"encoding/json"
"fmt"
"io"
"sync/atomic"
"time"
"golang.org/x/net/websocket"
)
type Websocket interface {
Join(Topic, Handler) error
Shutdown() error
}
type Handler func(*Message)
type Message struct {
Event Event `json:"event"`
Topic Topic `json:"topic"`
Ref *MessageId `json:"ref"`
Payload json.RawMessage `json:"payload"`
}
type MarketPayload struct {
Data MarketData `json:"data"`
}
type MarketData struct {
Currency Currency `json:"currency"`
Exchange Exchange `json:"exchange_name"`
Trade MarketTrade `json:"trade"`
}
type MarketTrade struct {
Amount float64 `json:"amount"`
Date float64 `json:"date"`
Exchange Exchange `json:"exchange_name"`
Price float64 `json:"price"`
}
type realWebsocket struct {
conn *websocket.Conn
inbox chan *Message
outbox chan outgoing
pending map[MessageId]Handler
subscriptions map[Topic]Handler
nextMsgId int64
running atomic.Bool
}
type outgoing struct {
Request *Message
ResponseHandler Handler
TopicHandler Handler
}
func newWebsocket(conn *websocket.Conn) *realWebsocket {
ws := realWebsocket{
conn: conn,
inbox: make(chan *Message),
outbox: make(chan outgoing),
pending: map[MessageId]Handler{},
subscriptions: map[Topic]Handler{},
}
ws.running.Store(true)
go ws.heartbeat()
go ws.reader()
go ws.mailman()
return &ws
}
func (ws *realWebsocket) heartbeat() {
for ws.running.Load() {
ws.outbox <- outgoing{
Request: &Message{
Topic: "phoenix",
Event: "heartbeat",
},
}
time.Sleep(time.Second * 30)
}
}
func (ws *realWebsocket) reader() {
var partial []byte
failures := 0
for ws.running.Load() {
buf := make([]byte, 16*1024)
n, err := ws.conn.Read(buf)
if err != nil {
if err == io.EOF {
ws.running.Store(false)
return
}
fmt.Printf("websocket read: %v\n", err)
continue
}
useful := buf[0:n]
partial = append(partial, useful...)
var msg Message
err = json.Unmarshal(partial, &msg)
if err != nil {
failures += 1
continue
}
failures = 0
partial = nil
ws.inbox <- &msg
}
}
func (ws *realWebsocket) mailman() {
for ws.running.Load() {
select {
case i := <-ws.inbox:
ws.dispatch(i)
case o := <-ws.outbox:
_ = ws.send(o)
}
}
}
func (ws *realWebsocket) dispatch(m *Message) {
var handler Handler
if m.Ref != nil {
handler = ws.pending[*m.Ref]
delete(ws.pending, *m.Ref)
} else if m.Topic != "" {
handler = ws.subscriptions[m.Topic]
}
if handler != nil {
handler(m)
}
}
func (ws *realWebsocket) send(o outgoing) error {
if o.Request.Ref == nil {
msgId := ws.nextMsgId + 1
ws.nextMsgId += 1
msgIdStr := fmt.Sprintf("%d", msgId)
o.Request.Ref = (*MessageId)(&msgIdStr)
}
req, err := json.Marshal(o.Request)
if err != nil {
return err
}
_, err = ws.conn.Write(req)
if err != nil {
return err
}
if o.ResponseHandler != nil {
ws.pending[*o.Request.Ref] = o.ResponseHandler
}
if o.TopicHandler != nil {
ws.subscriptions[o.Request.Topic] = o.TopicHandler
}
return nil
}
func (ws *realWebsocket) Join(topic Topic, topicHandler Handler) error {
respChan, respHandler := oneShot()
ws.outbox <- outgoing{
Request: &Message{
Event: EventPhxJoin,
Topic: topic,
Payload: EmptyPayload,
},
ResponseHandler: respHandler,
TopicHandler: topicHandler,
}
<-respChan
return nil
}
func (ws *realWebsocket) Shutdown() error {
ws.running.Store(false)
err := ws.conn.Close()
return err
}
func oneShot() (<-chan *Message, Handler) {
pipe := make(chan *Message)
return pipe, func(msg *Message) {
pipe <- msg
}
}