SeismicDB is an actively maintained fork of the abandoned but brilliant TectonicDB project, SeismicDB is a tool to efficiently store highly compressed financial level 2 tick data, for example to record a Binance orderbook for crypto quant trading analysis. The system is market independent and can be used on stocks, futures & crypto, the only change is the connector.
Quickstart Guide – How to record Binance’s Orderbook with SeismicDB
Installing and setting up SeismicDB is easy.
- Install Rust for your os. SeismicDB is tested on Linux & macOS and should support any UNIX-like operating system, Windows users can download WSL.
- Run
cargo install seismic
db - The project is now installed.
To use your new installation you can start a server with the following command
sdb-server -f [path-to-output-folder] -i 1000
-a
The f option is self explanatory, a path to the folder in which you want to store the dtf files, the i is how often you want to automatically dump the tick data to the hard drive, in ticks. Higher numbers should improve performance at the risk of data loss, I recommend no higher than 10,000. -a is needed if you want the server to automatically save your data, if you don’t select it then you will need to issue a FLUSH command programmatically.
With the server up and running we can now move on to building a connector, currently the only updated connector is in rust, but I do plan on updating the python connector soon. The client is called sdb-client, you can include it as a dependency in a crate like so.
[depenencies]
sdb_client = "0.6.0"
sdb_core = "0.6.0"
For our project we will also need the following dependencies. This code is for demonstration purposes only, in production you should always have some level of fault tolerance for the websockets. I am using a portion of my own proprietary binance API connector, but for most usecases you could also use Binance-RS or similar.
In addition to the above dependencies for the sake of building a connector I will include this full program as follows
cargo.toml
[package]
name = "Seismic-Recorder"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = {version = "1.0", features =["full"]}
serde_json = "*"
serde = {version="*", features=["derive"]}
sdb_client = "0.6.0"
sdb_core = "0.6.0"
tokio-tungstenite = {version="0.20", features=["native-tls"]}
futures-util = "*"
main.rs
use sdb_client::client::SeismicClient;
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message;
use tokio;
mod models;
use sdb_core::dtf::update;
mod binance_futures;
//Example binance connector
//As this one is currently build it only handles order deltas, not the initial book, you may want to grab the book using the restful API.
#[tokio::main]
async fn main() {
let mut sclient = SeismicClient::new("127.0.0.1", "9001").unwrap();
let res = sclient.create_db("binance_SOLUSDT_PERP");
let res = sclient.create_db("binance_BTCUSDT_PERP");
let res = sclient.create_db("binance_ETHUSDT_PERP");
let res = sclient.create_db("binance_BNBUSDT_PERP");
let res = sclient.create_db("binance_LTCUSDT_PERP");
let res = sclient.create_db("binance_EOSUSDT_PERP");
//handling these would be a good idea. You can ignore the creation error if the table already exists safely.
let (tx, mut rx) = mpsc::channel(1000);
binance_futures::get_datasocket(r#"
{
"method": "SUBSCRIBE",
"params":
[
"btcusdt@depth",
"btcusdt@aggTrade",
"ethusdt@depth",
"ethusdt@aggTrade",
"bnbusdt@depth",
"bnbusdt@aggTrade"
],
"id": 1
}
"#.to_string(), tx).await;
while let Some(a) = rx.recv().await {
match a {
Message::Text(t) => {
let res:Result<models::binanceReq, serde_json::Error> = serde_json::from_str(&t);
match res {
Ok(d) => {
//if the deserialization is successful
match d.data.clone() {
models::BinanceStreamData::depthUpdate { E, T, s, U, u, pu, b, a } => {
for i in b {
let price: f32 = i[0].parse().unwrap();
let size: f32 = i[1].parse().unwrap();
let upd = update::Update{
ts: T as u64,
seq: U as u32,
is_trade: false,
is_bid: true,
price: price,
size: size
};
sclient.insert(Some(format!("binance_{}_PERP", s).as_str()), &upd, true); //using discard result improves throughput, means you cannot handle errors on the client side.
}
for i in a {
let price: f32 = i[0].parse().unwrap();
let size: f32 = i[1].parse().unwrap();
let upd = update::Update{
ts: T as u64,
seq: U as u32,
is_trade: false,
is_bid: false,
price: price,
size: size
};
sclient.insert(Some(format!("binance_{}_PERP", s).as_str()), &upd, true);
}
},
models::BinanceStreamData::aggTrade { E, s, a, p, q, f, l, T, m } => {
let price: f32 = p.parse().unwrap();
let size: f32 = q.parse().unwrap();
let upd = update::Update{
ts: T as u64,
seq: E as u32,
is_trade: true,
is_bid: false,
price: price,
size: size
};
sclient.insert(Some(format!("binance_{}_PERP", s).as_str()), &upd, true);
}
}
}
Err(Er) => {}
}
//println!("{:?}", res);
}
_ => {}
}
}
}
binance_futures.rs
use tokio_tungstenite;
use tokio::sync::mpsc;
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::Message;
//This code is part of a larger library, it has been addapted to function on it's own.
pub async fn get_datasocket(subscription: String, txr : mpsc::Sender<tokio_tungstenite::tungstenite::Message>) {
tokio::spawn(async move {
loop{
let (mut ws, res) = tokio_tungstenite::connect_async("wss://fstream.binance.com/stream").await.unwrap();
ws.send(tokio_tungstenite::tungstenite::Message::Text(subscription.clone())).await;
let (mut tx, mut rx) = ws.split();
while let Some(msg) = rx.next().await {
let ourtx = txr.clone();
let unwrapped = msg.unwrap();
txr.send(unwrapped.clone()).await;
match unwrapped.clone() {
Message::Ping(p) => {tx.send(Message::Pong(p));}
_ => {}
}
} }
});
}
models.rs
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "e")]
pub enum BinanceStreamData {
depthUpdate {
E: i64,
T: i64,
s: String,
U: i64,
u: i64,
pu: i64,
b: Vec<Vec<String>>,
a: Vec<Vec<String>>,
},
aggTrade {
E: i64,
s: String,
a: i64,
p: String,
q: String,
f: i64,
l: i64,
T: i64,
m: bool,
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct binanceReq {
stream: String,
pub data: BinanceStreamData
}
The above code is a rudimentary connector to load the tickers into seismicDB for binance futures. I do not recommend using this in production (no fault tolerance, no easy configuration), but simply as an example to build your own connectors upon. Please stay tuned for a future piece on data analysis.
The main method is the client.insert(book_name, &update, discard_result: bool), functioning similarly to a postgreSQL insert. The update struct requires the following. In the future I will add an option or potentially replace the u32 with a u64 in all cases, for now I show a hack below if you wanted to use this with OPNX or other exchanges using u64 id numbers.
update = update::Update{
ts: unixepoch*1000, //u64 of the unix epoch in milliseconds.
seq: trade / book id. /* Needs to be u32 currently, some exchanges like OPNX report bigger numbers, you can get around it with this hack
let tid : u64 = i.tradeId.parse().unwrap();
let tid : u32 = (tid % (u32::MAX as u64 + 1)) as u32; */
is_trade: bool whether the update is an orderbook update or a trade.
is_bid: bool whether the orderbook update is on the bid or ask side.
price: f32 of current price,
size: f32 of size.
}
Also please feel free to ask any questions about this project in the comments below. If you are looking to store only trade level tick data, I can also recommend checking out timescaledb .