The Stream infrastructure is based on NATS allowing you to subscribe based on topic and interest. For example, you can subscribe to all trade messages for all cryptocurrency Exchanges, trade messages for a single market on a single Exchange, or even trade messages for a single market pair across all Exchanges.
You can check out CryptoQuote Stream in the browser using this example.
You can subscribe to a single ticker on an Exchange. The topic below would stream you BTC/ETH trades from the Binance crypto Exchange:
Given the above topic, you would receive all trades for BTC/USD for all supported Exchanges.
One of the more powerful features of CryptoQuote Stream is the ability to subscribe to a full exchange. The topic below streams all trades from the Binance crypto Exchange:
.
Below is some bare-bones example code that will connect and subscribe to CryptoQuote Stream.
go get github.com/nats-io/nats.go/
package main
import (
"fmt"
nats "github.com/nats-io/nats.go"
)
func main() {
// Connect to a server
nc, _ := nats.Connect("nats://username:[email protected]:4222")
// Simple Async Subscriber
nc.Subscribe("hose.trade.>", func(m *nats.Msg) {
fmt.Printf("Received a Trade: %s\n", string(m.Data))
})
// block
chann := make(chan struct{})
<-chann
}
pip install nats
import asyncio
import time
import signal
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
def run(loop):
nc = NATS()
options = {
"servers": ["nats://username:[email protected]:4222"],
"io_loop": loop,
}
yield from nc.connect(**options)
@asyncio.coroutine
def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# "*" matches any token, at any level of the subject.
# list to all markets on binance
yield from nc.subscribe("hose.*.binance.>", cb=message_handler)
# full hose
# yield from nc.subscribe("hose.>", cb=message_handler)
# listen to BTCUSD system wide
yield from nc.subscribe("hose.*.*.btcusd", cb=message_handler)
# listen to all Trades
# yield from nc.subscribe("hose.trades.>", cb=message_handler)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
try:
loop.run_forever()
finally:
loop.close()
npm install [email protected]
const nats = require('nats');
let exchange = process.argv[2] || 'gdax';
let natsHost = process.env.NATS_HOST || 'nats-01.cryptoquote.io';
const nc = nats.connect(`nats://username:pass@${natsHost}`);
setTimeout(() => {
nc.subscribe(`hose.*.${exchange}.>`, function (msg) {
console.log('Received a message: ' + msg);
});
}, 1500);
// NATS Client v2 Example:
const { connect, StringCodec } = require("nats");
const sc = StringCodec();
(async () => {
const nc = await connect({ user: 'USER', pass: 'PASS', servers: ['nats-01.cryptoquote.io'] });
const sub = nc.subscribe('hose.trade.*.btcusd');
for await (const m of sub) {
let msg = JSON.parse(sc.decode(m.data))
console.log('msg', msg)
}
})();
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.5.0</version>
</dependency>
package main;
import java.nio.charset.StandardCharsets;
import com.google.gson.Gson;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
class Message {
public String updateType;
public String symbol;
public long updated;
public String toString() {
return symbol + " updated at " + updated + " for event " + updateType;
}
}
public class Firehose {
public static void main(String... a) {
try {
Connection nc = Nats.connect("nats://user:[email protected]");
Gson gson = new Gson();
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String json = new String(msg.getData(), StandardCharsets.UTF_8);
Message updateMsg = gson.fromJson(json, Message.class);
// Use the object
System.out.println(updateMsg);
});
// Subscribe to all trades
d.subscribe("hose.trade.>");
} catch (Exception e) {
e.printStackTrace();
}
}
}