Compare commits

...

2 Commits

Author SHA1 Message Date
f81314adc1 Updated .gitignore 2022-12-10 15:36:36 +01:00
6b11549a99 Github sync 2022-12-10 15:35:16 +01:00
65 changed files with 5700 additions and 4165 deletions

57
.gitignore vendored Normal file
View File

@ -0,0 +1,57 @@
#### Python
*.pyc
#### C
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf

165
MutualFundsJSON.txt Normal file
View File

@ -0,0 +1,165 @@
/***************** MUTUAL FUNDS *******************************/
42[ "mutualFundPortfolioUpdate",
{"fund":"6796531e-8668-49d1-bb01-f07db4264ce6",
"tUpdate":
{"event":"Transaction Update",
"transactions":[{
"coin":"miko",
"type":0,
"userid":"6796531e-8668-49d1-bb01-f07db4264ce6",
"quantity":1,
"timestamp":1655806346312,
"completed":false,
"price":null,"fund":true
}],
"portfolio":null
}
}
]
42[ "mutualFundPortfolioUpdate",
{"fund":"6796531e-8668-49d1-bb01-f07db4264ce6",
"tUpdate":
{"event":"Transaction Update",
"transactions":[{
"coin":"miko","type":0,"userid":"6796531e-8668-49d1-bb01-f07db4264ce6","quantity":1,"timestamp":1655806346312,"completed":true,"price":32820.18,"fund":true}],
"portfolio":{
"miko":{"amount":352,"timestamp":1655806346312,"meanPurchasePrice":29645.69}
}
}
}]
42[ "mutualFundBalanceUpdate",
{"funds":
{"6796531e-8668-49d1-bb01-f07db4264ce6":4924583.6}
}
]
42[ "mutualFundRunningHistoryUpdate",
{"f9df4304-e084-494a-b918-9b5ab1c6842b":
{"volume":1000,"price":100000,"saleValue":100000,"networth":100000000,"members":1,"timestamp":1655806533376},
"851a75f1-19cf-4aad-b3f0-95ebf1973e4b":
{"volume":81000,"price":2810,"saleValue":2810,"networth":224663343.5,"members":4,"timestamp":1655806533380},
...
}
]
42[ "mutualFundStatUpdate",
{"funds":
{"f9df4304-e084-494a-b918-9b5ab1c6842b":
{"volume":1000,"price":100000,"saleValue":100000,"networth":100000000,"members":1},
"851a75f1-19cf-4aad-b3f0-95ebf1973e4b":
{"volume":81000,"price":2810,"saleValue":2810,"networth":224663343.5,"members":4},
...
}
}
]
42[ "mutualFundMembersUpdate",
{"type":"add",
"fund":"851a75f1-19cf-4aad-b3f0-95ebf1973e4b",
"username":"Bombshell Blondes Banking",
"userid":"726b5a5e-9dc0-438e-b6be-fb901a51f482"}
]
42[ "mutualFundOrderUpdate",
{"fund":"851a75f1-19cf-4aad-b3f0-95ebf1973e4b","buys":0,"sells":0}
]
42[" mutualFundMakePublicUpdate",
{"fund":"f9df4304-e084-494a-b918-9b5ab1c6842b"}
]
XHR:
{"success":true,"funds":
{"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":
{"id":"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f",
"name":"Miyatsuko",
"dateCreated":"2022-06-20T21:04:21.129Z",
"icon":"inanis",
"missionStatement":"Abayo Chocoball",
"tag":"造"
,"color":"e0ae2f",
"ceo":"10dcbbd5-f761-44f3-92ce-ea5b01f1d747",
"balance":1228441634.6,
"released":true,
"members":
[ {"id":"10dcbbd5-f761-44f3-92ce-ea5b01f1d747","username":"[造] Miyatsuko-Priestess Treasury","boardMember":true},
{"id":"135c986e-0cc3-45d1-ad9a-77725a99d44f","username":"[造] Touching Fluffy Tails Management Co.","boardMember":true},
...
],
"portfolio":
{ "kronii":{"amount":8,"timestamp":1655760060678,"meanPurchasePrice":16719.18},
"inanis":{"amount":8,"timestamp":1655760516709,"meanPurchasePrice":27564.07},
...
}
},
...,
...,
...
},
"bulletins":
{"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":[],
"7b24a25b-8f78-42b6-85ff-1d3f62c7132a":[
{"id":"3e7a8c16-2e44-42aa-91ee-57674378982f",
"fund":"7b24a25b-8f78-42b6-85ff-1d3f62c7132a",
"author":"f7c3b57a-8b4f-4715-9f37-dfcc81fb0c18",
"authorName":"[狐] Kitsune Holdings 🌽🦊🍔",
"timestamp":"2022-06-20T22:54:11.252Z",
"message":"OhaKOOOOOOOOOOOOOOOOOOON"
}
],
...
},
"fundHistory":{
"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":
[{"timestamp":"2022-06-20T21:04:21.129Z","volume":100000,"price":16000,"networth":1600000000,"members":1}],
"81570634-37bb-4e10-b2cb-ea1bf955fc53":
[{"timestamp":"2022-06-20T21:04:24.109Z","volume":1000,"price":25000,"networth":25000000,"members":1}],
...
},
"fundStats":{
"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":{
"volume":100000,
"price":16000,
"saleValue":16000,
"networth":1584327512,
"members":10,
"history":[
{"volume":100000,"price":16000,"saleValue":16000,"networth":1600000000,"members":1,"timestamp":1655759061132},
{"volume":100000,"price":16000,"saleValue":16000,"networth":1600000000,"members":8,"timestamp":1655759714668},
...
]
},
...
}
"orders":{
"779ee96f-5a24-4244-b1ce-a8cc540f0cbd":
{"buys":6980,"sells":0},
"6796531e-8668-49d1-bb01-f07db4264ce6":
{"buys":12157,"sells":0},
...
}
"fundPayouts":{},
"fundsToDissolve":{}

View File

@ -1,11 +0,0 @@
#include "api.h"
class nsfq_proxy_api {
public:
private:
/* HTTP wrapper to interface with the server. */
/* Websocket used to interface with the server. */
websocket_endpoint ws_endpoint;
}

View File

@ -1,22 +0,0 @@
#ifndef _API_H_
#define _API_H_
#include <iostream>
#include <string>
#include <stdlib.h>
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp>
#include <websocketpp/common/thread.hpp>
#include <websocketpp/common/memory.hpp>
#include <nlohmann/json.hpp>
#include "../parser/parser.h"
#include "../common/common.h"
#include "../safe_queue/safe_queue.h"
#include "../my_ssl/my_ssl.h"
#endif

View File

@ -0,0 +1,60 @@
#include "complete_db.h"
ws_msg_parsed<WS_EVENT_TRANSACTION> raw_msg_parse(nlohmann::json op) {
ws_msg_parsed<WS_EVENT_TRANSACTION> rop;
rop.coin = op["coin"];
rop.type = op["type"];
rop.userid = op["userid"];
rop.quantity = op["quantity"];
rop.timestamp = op["timestamp"];
rop.completed = op["completed"];
rop.timestamp = op["timestamp"];
rop.price = op["price"];
return rop;
}
int complete_db(long ts_up, unsigned int days, bool check) {
long tmp_ts;
std::string BASE_URL = "https://nasfaq.biz/api/getHistory?timestamp={}";
std::string TMP_URL;
nlohmann::json res_json;
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> tmp;
nsfq_http::http_connector::connector c;
/* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
return 1;
}
/* Update database if needed */
tmp_ts = ts_up;
for(int i = 0; i < days; i++) {
std::cout << "Fetching history for ts = " << tmp_ts << std::endl;
tmp_ts -= 60*60*24*1000;
TMP_URL = fmt::format(BASE_URL, tmp_ts);
c.get((const char*)TMP_URL.c_str());
res_json = nlohmann::json::parse(c.get_buffer());
tmp = {};
if(!res_json.empty()) {
for(auto const & x:res_json["history"]["transactions"]) {
tmp.transaction_list.push_back(raw_msg_parse(x));
}
}
if(check) db::push_uhistory(&C, tmp);
else db::push_history(&C, tmp);
}
return 0;
}

View File

@ -0,0 +1,7 @@
#include "../common/common.h"
#include "../http/http_connector.h"
#include "../sql/db_init.h"
#include "../sql/db_handle.h"
int complete_db(long ts_up, unsigned int days, bool check);

View File

@ -1,9 +0,0 @@
#include "client.h"
namespace proxy {
client::client(void) {
m_q = SafeQueue<std::string>;
m_endpoint =
}
}

View File

@ -1,24 +0,0 @@
#ifndef _CLIENT_H_
#define _CLIENT_H_
#include "../common/common.h"
#include "../ws/ssl_ws.h"
namespace proxy {
/**
NASFAQ proxy client.
*/
class client {
public:
client(void);
~client(void);
void init(void);
private:
SafeQueue<std::string> q;
ws::websocket_endpoint endpoint;
}
}
#endif

View File

@ -20,11 +20,12 @@ std::ostream& operator<< (std::ostream& stream, WS_MSG const & type) {
} }
/** /**
//TODO: GOES INTO pretty_print.cpp
Pretty prints COIN_PRICE events. Pretty prints COIN_PRICE events.
*/ */
template<> template<>
std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> const & op) { std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_COIN_PRICE> const & op) {
stream << "WS_EVENT_COIN_PRICE_UPDATE:" stream << "WS_EVENT_COIN_PRICE:"
<< " Coin: " << op.coin << " Coin: " << op.coin
<< " Price: " << op.price << " Price: " << op.price
<< " SaleValue: " << op.saleValue << " SaleValue: " << op.saleValue
@ -49,11 +50,46 @@ std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_TRANSACT
} }
/** /**
Pretty prints the transactions held in the WS_EVENT_HISTORY_UPDATE vector. Pretty prints portfolio_coin_t.
*/
std::ostream& operator<< (std::ostream& stream, portfolio_coin_t const & op) {
stream << "portfolio_coin_t:"
<< " Coin: " << op.coin
<< " Amount: " << op.amount
<< " Timestamp: " << op.ts
<< " MPP: " << op.mpp;
return stream;
}
/**
Pretty prints WS_EVENT_MF_PORTFOLIO events.
*/ */
template<> template<>
std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> const & op) { std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> const & op) {
stream << "WS_EVENT_HISTORY_UPDATE:\n"; stream << "WS_EVENT_MF_PORTFOLIO:"
<< " Fund: " << op.fund
<< " Event: " << op.event << std::endl
<< "Transactions: " << std::endl;
for(auto & element : op.transactions) {
stream << "\t" << element << std::endl;
}
stream << "Portfolio: " << std::endl;
for(auto & element : op.portfolio) {
stream << "\t" << element << std::endl;
}
return stream;
}
/**
Pretty prints the transactions held in the WS_EVENT_HISTORY vector.
*/
template<>
std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_HISTORY> const & op) {
stream << "WS_EVENT_HISTORY:\n";
for(auto & element : op.transaction_list) { for(auto & element : op.transaction_list) {
stream << "\t" << element << std::endl; stream << "\t" << element << std::endl;
} }

View File

@ -13,7 +13,7 @@
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
//#include "../safe_queue/safe_queue.h" // TODO: REFACTOR ALL THIS TRASH HOLY SHIT
/*********************************************************************************** /***********************************************************************************
WEBSOCKET MESSAGE STRUCTURES WEBSOCKET MESSAGE STRUCTURES
@ -58,7 +58,7 @@ struct ws_msg_parsed;
coinPriceUpdate structure coinPriceUpdate structure
*/ */
template <> template <>
struct ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> { struct ws_msg_parsed<WS_EVENT_COIN_PRICE> {
std::string coin; std::string coin;
float price; float price;
float saleValue; float saleValue;
@ -66,7 +66,7 @@ struct ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> {
}; };
/** /**
Auxiliary type for WS_EVENT_HISTORY_UPDATE. Auxiliary type for WS_EVENT_HISTORY.
This holds individual transactions as handed by the websocket in a list. This holds individual transactions as handed by the websocket in a list.
*/ */
template <> template <>
@ -78,16 +78,43 @@ struct ws_msg_parsed<WS_EVENT_TRANSACTION> {
long timestamp; long timestamp;
bool completed; bool completed;
float price; float price;
bool fund;
}; };
/** /**
historyUpdate structure holding transactions. historyUpdate structure holding transactions.
*/ */
template<> template<>
struct ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> { struct ws_msg_parsed<WS_EVENT_HISTORY> {
std::vector<ws_msg_parsed<WS_EVENT_TRANSACTION>> transaction_list; std::vector<ws_msg_parsed<WS_EVENT_TRANSACTION>> transaction_list;
}; };
/***********************************************************************************
MUTUAL FUNDS
***********************************************************************************/
/**
Portfolio type TODO: Make this global, could be used with users' wallets
nasfaq::types::portfolio::coin
*/
typedef struct portfolio_coin_t {
std::string coin; //TODO: make this an enum
uint amount;
long ts;
float mpp;
} portfolio_coin_t ;
/**
Portfolio update
*/
template <>
struct ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> {
std::string fund;
std::string event; // TODO: type for this
std::vector<ws_msg_parsed<WS_EVENT_TRANSACTION>> transactions;
std::vector<portfolio_coin_t> portfolio;
};
/*********************************************************************************** /***********************************************************************************
WEBSOCKET MESSAGE STRUCTURES FUNCTIONS WEBSOCKET MESSAGE STRUCTURES FUNCTIONS
***********************************************************************************/ ***********************************************************************************/

View File

@ -0,0 +1,30 @@
#include "formatting.h"
/*
See https://stackoverflow.com/questions/2896600/how-to-replace-all-occurrences-of-a-character-in-string
*/
void replace_all(std::string& str, const std::string& from, const std::string& to) {
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != std::string::npos) {
str.replace(start_pos, from.length(), to);
start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
}
}
/*
Splits a string "{...}, ..., {...}" in an array ["{...}", ..., "{...}"].
ONLY HANDLES DICTIONARIES OF DEPTH 1.
*/
std::vector<std::string> tokenize_json_array(std::string op, std::string token) {
int start = 0;
int end = op.find("}");
std::vector<std::string> ret;
while( end != -1 ) {
ret.push_back(op.substr(start, end - start + 1));
start = end + token.size() + 1; // + 1 accounts for the ",".
end = op.find(token, start);
}
return ret;
}

View File

@ -0,0 +1,16 @@
#ifndef _FORMATTING_H_
#define _FORMATTING_H_
#include <iostream>
#include <string>
#include <stdlib.h>
#include "../common/common.h"
#include <nlohmann/json.hpp>
void replace_all(std::string&, const std::string&, const std::string&);
std::vector<std::string> tokenize_json_array(std::string, std::string token = "}");
#endif

View File

@ -1,13 +1,20 @@
X(WS_EVENT_COIN_PRICE_UPDATE)
X(WS_EVENT_TRANSACTION)
X(WS_EVENT_HISTORY_UPDATE)
X(WS_EVENT_BROKER_FEE_UPDATE)
X(WS_EVENT_TODAY_PRICES_UPDATE)
X(WS_EVENT_LEADERBOARD_UPDATE)
X(WS_EVENT_FLOOR_UPDATE)
X(WS_EVENT_AUCTION_UPDATE)
X(WS_EVENT_BENCHMARK_UPDATE)
X(WS_EVENT_SUPERCHAT_UPDATE)
X(WS_EVENT_GACHA_UPDATE)
X(WS_EVENT_ADD_MESSAGE_GLOBAL)
X(WS_EVENT_UNKNOWN) X(WS_EVENT_UNKNOWN)
X(WS_EVENT_COIN_PRICE)
X(WS_EVENT_TRANSACTION)
X(WS_EVENT_HISTORY)
X(WS_EVENT_BROKER_FEE)
X(WS_EVENT_TODAY_PRICES)
X(WS_EVENT_LEADERBOARD)
X(WS_EVENT_FLOOR)
X(WS_EVENT_AUCTION)
X(WS_EVENT_BENCHMARK)
X(WS_EVENT_SUPERCHAT)
X(WS_EVENT_GACHA)
X(WS_EVENT_ADD_MESSAGE_GLOBAL)
X(WS_EVENT_MF_PORTFOLIO)
X(WS_EVENT_MF_BALANCE)
X(WS_EVENT_MF_RUNNINGHISTORY)
X(WS_EVENT_MF_STAT)
X(WS_EVENT_MF_MEMBERS)
X(WS_EVENT_MF_ORDER)
X(WS_EVENT_MF_MAKEPUBLIC)

View File

@ -1,110 +1,10 @@
#include "parser.h" #include "parser.h"
namespace parser { namespace parser {
WS_MSG msg_type_detect(std::string op) { /*******************************************************************************
WS_MSG ret; Parser object
*******************************************************************************/
if (op.substr(0, 30).find("coinPriceUpdate") != std::string::npos) {
ret = WS_EVENT_COIN_PRICE_UPDATE;
} else if (op.substr(0, 30).find("historyUpdate") != std::string::npos) {
ret = WS_EVENT_HISTORY_UPDATE;
} else if (op.substr(0, 30).find("todayPricespdate") != std::string::npos) {
ret = WS_EVENT_TODAY_PRICES_UPDATE;
} else if (op.substr(0, 30).find("brokerFeeUpdate") != std::string::npos) {
ret = WS_EVENT_BROKER_FEE_UPDATE;
} else {
ret = WS_EVENT_UNKNOWN;
}
return ret;
}
template <WS_MSG E>
ws_msg_parsed<E> parse(std::string rmsg) {};
template <>
ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> raw_msg_parse<WS_EVENT_COIN_PRICE_UPDATE>(std::string rmsg) {
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> rop;
rop.coin = jparsed["coin"];
rop.price = (jparsed["info"])["price"];
rop.saleValue = (jparsed["info"])["saleValue"];
rop.inCirculation = (jparsed["info"])["inCirculation"];
return rop;
}
/****************************************************************
Helper functions should go in common or something more general
****************************************************************/
/*
See https://stackoverflow.com/questions/2896600/how-to-replace-all-occurrences-of-a-character-in-string
*/
static inline void replace_all(std::string& str, const std::string& from, const std::string& to) {
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != std::string::npos) {
str.replace(start_pos, from.length(), to);
start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
}
}
/*
Splits a string "{...}, ..., {...}" in an array ["{...}", ..., "{...}"].
ONLY HANDLES DICTIONARIES OF DEPTH 1.
*/
std::vector<std::string> tokenize_json_array(std::string op, std::string token = "}") {
int start = 0;
int end = op.find("}");
std::vector<std::string> ret;
while( end != -1 ) {
ret.push_back(op.substr(start, end - start + 1));
start = end + token.size() + 1; // + 1 accounts for the ",".
end = op.find(token, start);
}
return ret;
}
template <>
ws_msg_parsed<WS_EVENT_TRANSACTION> raw_msg_parse<WS_EVENT_TRANSACTION>(std::string rmsg) {
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
ws_msg_parsed<WS_EVENT_TRANSACTION> rop;
rop.coin = jparsed["coin"];
rop.type = jparsed["type"];
rop.userid = jparsed["userid"];
rop.quantity = jparsed["quantity"];
rop.timestamp = jparsed["timestamp"];
rop.completed = jparsed["completed"];
rop.timestamp = jparsed["timestamp"];
rop.price = jparsed["price"];
return rop;
}
template<>
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> raw_msg_parse<WS_EVENT_HISTORY_UPDATE>(std::string rmsg) {
std::vector<std::string> raw_vect;
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> rop;
/* Replace \" by " */
replace_all(rmsg, "\\\"", "\"");
/* Extract array */
raw_vect = tokenize_json_array(rmsg);
/* Create the output array by parsing each transaction elements */
for(auto & raw_tr : raw_vect) {
rop.transaction_list.push_back(raw_msg_parse<WS_EVENT_TRANSACTION>(raw_tr));
}
return rop;
}
parser::parser(ws::connection_metadata::ptr metadata, pqxx::connection* C) parser::parser(ws::connection_metadata::ptr metadata, pqxx::connection* C)
: m_metadata(metadata) : m_metadata(metadata)
, m_process_queue_state(false) , m_process_queue_state(false)
@ -112,7 +12,6 @@ parser::parser(ws::connection_metadata::ptr metadata, pqxx::connection* C)
{} {}
parser::~parser() { parser::~parser() {
} }
void parser::process_queue() { void parser::process_queue() {
@ -125,25 +24,35 @@ void parser::process_queue() {
while(m_process_queue_state) { while(m_process_queue_state) {
raw_data = m_metadata->pop_message(); raw_data = m_metadata->pop_message();
type = msg_type_detect(raw_data);
type = types::extract(raw_data);
// TODO: make a function for each of these cases and use a switch as in parser_aux
/* POSTGRESQL STUFF GOES HERE */ /* POSTGRESQL STUFF GOES HERE */
if (type == WS_EVENT_COIN_PRICE_UPDATE) { if (type == WS_EVENT_COIN_PRICE) {
/* 18 = 3 + 15 */ /* 18 = 3 + 15 */
raw_data = raw_data.substr(18, raw_data.length()-18); raw_data = raw_data.substr(18, raw_data.length()-18);
ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> parsed_msg = raw_msg_parse<WS_EVENT_COIN_PRICE_UPDATE>(raw_data); ws_msg_parsed<WS_EVENT_COIN_PRICE> parsed_msg = single<WS_EVENT_COIN_PRICE>(raw_data);
std::cout << parsed_msg << std::endl; std::cout << parsed_msg << std::endl;
db::push_coin_price(m_connection, parsed_msg); db::push_coin_price(m_connection, parsed_msg);
} else if (type == WS_EVENT_HISTORY_UPDATE) { } else if (type == WS_EVENT_HISTORY) {
raw_data = raw_data.substr(18, raw_data.length()-18); raw_data = raw_data.substr(18, raw_data.length()-18);
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> parsed_msg = raw_msg_parse<WS_EVENT_HISTORY_UPDATE>(raw_data); ws_msg_parsed<WS_EVENT_HISTORY> parsed_msg = single<WS_EVENT_HISTORY>(raw_data);
std::cout << parsed_msg << std::endl; std::cout << parsed_msg << std::endl;
db::push_history(m_connection, parsed_msg); db::push_history(m_connection, parsed_msg);
//std::cout << "\x1B[31mTexting\033[0m\t\t" << std::endl; //std::cout << "\x1B[31mTexting\033[0m\t\t" << std::endl;
} }
else if (type == WS_EVENT_MF_PORTFOLIO) {
raw_data = raw_data.substr(28, raw_data.length()-28);
//raw_data = payload::extract(raw_data);
ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> parsed_msg = single<WS_EVENT_MF_PORTFOLIO>(raw_data);
std::cout << parsed_msg << std::endl;
}
} }
} }
@ -168,9 +77,140 @@ void parser::process_queue_thread_join()
pthread_join(m_process_queue_thread, 0); pthread_join(m_process_queue_thread, 0);
} }
void parser::process_queue_stop() { void parser::process_queue_stop() {
m_process_queue_state = false; m_process_queue_state = false;
} }
/*******************************************************************************
Message parsing
*******************************************************************************/
//template <WS_MSG E>
//ws_msg_parsed<E> parse(std::string rmsg) {};
/*
Takes as input a raw string associated to a coinPriceUpdate ws event and returns a parsed representation of it.
*/
template <>
ws_msg_parsed<WS_EVENT_COIN_PRICE> single<WS_EVENT_COIN_PRICE>(std::string rmsg) {
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
ws_msg_parsed<WS_EVENT_COIN_PRICE> rop;
rop.coin = jparsed["coin"];
rop.price = (jparsed["info"])["price"];
rop.saleValue = (jparsed["info"])["saleValue"];
rop.inCirculation = (jparsed["info"])["inCirculation"];
return rop;
} }
/*
Takes as input a raw string associated to a transaction ws event and returns a parsed representation of it.
*/
template <>
ws_msg_parsed<WS_EVENT_TRANSACTION> single<WS_EVENT_TRANSACTION>(std::string rmsg) {
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
ws_msg_parsed<WS_EVENT_TRANSACTION> rop;
rop.coin = jparsed["coin"];
rop.type = jparsed["type"];
rop.userid = jparsed["userid"];
rop.quantity = jparsed["quantity"];
rop.timestamp = jparsed["timestamp"];
rop.completed = jparsed["completed"];
rop.timestamp = jparsed["timestamp"];
rop.price = jparsed["price"];
// rop.fund = jparsed["fund"];
return rop;
}
/*
Same as above but takes a json as input.
*/
template <>
ws_msg_parsed<WS_EVENT_TRANSACTION> single_j<WS_EVENT_TRANSACTION>(nlohmann::json op) {
ws_msg_parsed<WS_EVENT_TRANSACTION> rop;
rop.coin = op["coin"];
rop.type = op["type"];
rop.userid = op["userid"];
rop.quantity = op["quantity"];
rop.timestamp = op["timestamp"];
rop.completed = op["completed"];
rop.timestamp = op["timestamp"];
if(op["price"] == nullptr) {
rop.price = 0;
} else {
rop.price = op["price"];
}
// rop.fund = op["fund"];
return rop;
}
/*
Takes as input a raw string associated to a historyUpdate ws event and returns a parsed representation of it.
*/
template<>
ws_msg_parsed<WS_EVENT_HISTORY> single<WS_EVENT_HISTORY>(std::string rmsg) {
std::vector<std::string> raw_vect;
ws_msg_parsed<WS_EVENT_HISTORY> rop;
/* Replace \" by " */
replace_all(rmsg, "\\\"", "\"");
/* Extract array */
raw_vect = tokenize_json_array(rmsg);
/* Create the output array by parsing each transaction elements */
for(auto & raw_tr : raw_vect) {
rop.transaction_list.push_back(single<WS_EVENT_TRANSACTION>(raw_tr));
}
return rop;
}
///////////////////////////////// TODO: PUT THIS FUCKING TRASH SOMEWHERE ELSE /////////
/*
Takes as input a json and returns its representation as portfolio_coin_t.
*/
portfolio_coin_t parse_portfolio_coin(std::string coin, nlohmann::json op) {
portfolio_coin_t rop;
rop.coin = coin;
rop.amount = op["amount"];
rop.ts = op["timestamp"];
rop.mpp = op["meanPurchasePrice"];
return rop;
}
////////////////////////////////////////////////////////////////////////////
/*
Takes as input a raw string associated to a mutualFundPortfolioUpdate as ws event and returns a parsed representation of it.
*/
template <>
ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> single<WS_EVENT_MF_PORTFOLIO>(std::string rmsg) {
ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> rop;
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
rop.fund = jparsed["fund"];
rop.event = (jparsed["tUpdate"])["event"];
for(auto & raw_tr : (jparsed["tUpdate"])["transactions"]) {
rop.transactions.push_back(single_j<WS_EVENT_TRANSACTION>(raw_tr));
}
if((jparsed["tUpdate"])["portfolio"] != NULL) {
for(auto & item : (jparsed["tUpdate"])["portfolio"].items()) {
rop.portfolio.push_back(parse_portfolio_coin(item.key(), item.value()));
}
}
return rop;
}
} //parser

View File

@ -9,24 +9,17 @@
#include "../sql/db_handle.h" #include "../sql/db_handle.h"
#include "../common/common.h" #include "../common/common.h"
#include "../common/formatting.h"
#include "../ws/ssl_ws.h" #include "../ws/ssl_ws.h"
#include "./parser_aux.h"
//TODO: Remove ws here and use a safequeue
namespace parser { namespace parser {
WS_MSG msg_type_detect(std::string); /*******************************************************************************
Parser object
template <WS_MSG E> *******************************************************************************/
ws_msg_parsed<E> raw_msg_parse(std::string);
template <>
ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> raw_msg_parse<WS_EVENT_COIN_PRICE_UPDATE>(std::string);
template<>
ws_msg_parsed<WS_EVENT_TRANSACTION> raw_msg_parse<WS_EVENT_TRANSACTION>(std::string);
template<>
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> raw_msg_parse<WS_EVENT_HISTORY_UPDATE>(std::string);
class parser { class parser {
public: public:
parser(ws::connection_metadata::ptr, pqxx::connection*); parser(ws::connection_metadata::ptr, pqxx::connection*);
@ -45,7 +38,30 @@ private:
static void* process_queue_helper(void*); static void* process_queue_helper(void*);
}; };
} /*******************************************************************************
Message parsing
*******************************************************************************/
template <WS_MSG E>
ws_msg_parsed<E> single(std::string);
template <WS_MSG E>
ws_msg_parsed<E> single_j(nlohmann::json);
template <>
ws_msg_parsed<WS_EVENT_COIN_PRICE> single<WS_EVENT_COIN_PRICE>(std::string);
template<>
ws_msg_parsed<WS_EVENT_TRANSACTION> single<WS_EVENT_TRANSACTION>(std::string);
template <>
ws_msg_parsed<WS_EVENT_TRANSACTION> single_j<WS_EVENT_TRANSACTION>(nlohmann::json );
template<>
ws_msg_parsed<WS_EVENT_HISTORY> single<WS_EVENT_HISTORY>(std::string);
template <>
ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> single<WS_EVENT_MF_PORTFOLIO>(std::string);
} // parser
#endif #endif

View File

@ -0,0 +1,57 @@
#include "parser_aux.h"
/*******************************************************************************
Types.
*******************************************************************************/
namespace parser::types {
/*
Takes a string as input and returns a WS_MSG corresponding to the underlying event type.
Returns WS_EVENT_UNKNOWN if no match is detected.
TODO: handle malformed strings, only one ",...
*/
WS_MSG extract(std::string op) {
WS_MSG ret;
std::size_t pos_beg;
std::size_t pos_end;
std::string raw_type;
// Find raw_type as first occurence of "RAW_TYPE" in op
pos_beg = op.find_first_of('"');
pos_end = op.find_first_of('"', pos_beg + 1);
raw_type = op.substr(pos_beg+1, pos_end - pos_beg-1);
switch(map_rawIdent[raw_type]) {
case(0): ret = WS_EVENT_UNKNOWN; break;
case(1): ret = WS_EVENT_COIN_PRICE; break;
case(2): ret = WS_EVENT_HISTORY; break;
case(3): ret = WS_EVENT_TODAY_PRICES; break;
case(4): ret = WS_EVENT_BROKER_FEE; break;
case(10): ret = WS_EVENT_MF_PORTFOLIO; break;
case(11): ret = WS_EVENT_MF_BALANCE; break;
case(12): ret = WS_EVENT_MF_RUNNINGHISTORY; break;
case(13): ret = WS_EVENT_MF_STAT; break;
case(14): ret = WS_EVENT_MF_MEMBERS; break;
case(15): ret = WS_EVENT_MF_ORDER; break;
case(16): ret = WS_EVENT_MF_MAKEPUBLIC; break;
default: ret = WS_EVENT_UNKNOWN;
}
return ret;
}
} // parser::type
namespace parser::payload {
std::string extract(std::string op) {
std::string ret;
std::size_t tmp;
// Find second occurence of ". Raw payloads are of the form 42["RAW_TYPE",PAYLOAD ]
tmp = op.find_first_of('"', 3) + 2; // Wew hopefully that doesn't break
ret = op.substr(tmp, op.length() - tmp);
return ret;
}
}

View File

@ -0,0 +1,41 @@
#ifndef _PARSER_AUX_H_
#define _PARSER_AUX_H_
#include "../common/common.h"
/*******************************************************************************
Types
*******************************************************************************/
namespace parser::types {
/*
Raw message identifiers and int conversion.
*/
static std::map<std::string, int> map_rawIdent = {
{"unknown", 0},
{"coinPriceUpdate", 1},
{"historyUpdate", 2},
{"todayPricespdate", 3},
{"brokerFeeUpdate", 4},
{"mutualFundPortfolioUpdate", 10},
{"mutualFundBalanceUpdate", 11},
{"mutualFundRunningHistoryUpdate", 12},
{"mutualFundStatUpdate" , 13},
{"mutualFundMembersUpdate" , 14},
{"mutualFundOrderUpdate" , 15},
{"mutualFundMakePublicUpdate" , 16}
};
/*
Takes a string as input and returns its corresponding WS_MSG type.
*/
WS_MSG extract(std::string);
} // parser::types
namespace parser::payload {
std::string extract(std::string);
} // parser::payload
#endif

View File

@ -1,50 +0,0 @@
#include "safe_queue.h"
template <typename T>
TestQueue<T>::TestQueue(void)
: q()
{}
template <typename T>
TestQueue<T>::~TestQueue(void)
{}
template <typename T>
SafeQueue<T>::SafeQueue(void)
: q()
, m()
, c()
{}
template <typename T>
SafeQueue<T>::~SafeQueue(void)
{}
// Add an element to the queue.
template <typename T>
void SafeQueue<T>::enqueue(T t)
{
std::lock_guard<std::mutex> lock(m);
q.push(t);
c.notify_one();
}
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
template <typename T>
T SafeQueue<T>::dequeue(void) {
std::unique_lock<std::mutex> lock(m);
while(q.empty())
{
// release lock as long as the wait and reaquire it afterwards.
c.wait(lock);
}
T val = q.front();
q.pop();
return val;
}
template <typename T>
bool SafeQueue<T>::empty(void) {
return q.empty();
}

View File

@ -1,36 +0,0 @@
#ifndef _SAFE_QUEUE_H_
#define _SAFE_QUEUE_H_
#include <queue>
#include <mutex>
#include <condition_variable>
// A threadsafe-queue.
template <class T>
class TestQueue
{
public:
TestQueue();
~TestQueue();
private:
std::queue<T> q;
};
// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
SafeQueue(void);
~SafeQueue(void);
void enqueue(T);
T dequeue(void);
bool empty(void);
private:
std::queue<T> q;
mutable std::mutex m;
std::condition_variable c;
};
#endif

View File

@ -1,122 +1,34 @@
#include "db_handle.h" #include "db_handle.h"
namespace db { namespace db {
namespace query { void commit(pqxx::connection *C, std::string query) {
/* Query templates for HISTORY. */ /* Create a transactional object. */
//TODO: Move these definitions to .h file. pqxx::work W(*C);
const std::string query_template_transaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\
"VALUES('{}', {}, '{}', {}, {}, {}, {});";
const std::string query_template_history_ts = "SELECT * FROM HISTORY WHERE TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string query_template_userid_ts = "SELECT DISTINCT USERID FROM HISTORY WHERE TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string query_template_slope_ts_top = "SELECT PRICE, TIMESTAMP FROM HISTORY "\ /* Execute SQL query */
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\ W.exec( query );
"ORDER BY TIMESTAMP ASC "\ W.commit();
"LIMIT 1;";
const std::string query_template_transactions_userid_ts = "SELECT * FROM HISTORY WHERE USERID = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string query_template_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\
"ORDER BY TIMESTAMP DESC "\
"LIMIT 1;";
const std::string query_template_last_n_spaced_prices = "select PRICE, TIMESTAMP FROM last_n_spaced_prices('{}', {}, {});";
const std::string query_template_user = "INSERT INTO USERS(USERID, USERNAME, ICON, NETWORTH) "\
"VALUES('{}', E'{}', '{}', {}) "
"ON CONFLICT (USERID) DO UPDATE SET "\
"NETWORTH = {};";
const std::string query_template_userid_to_username = "select USERNAME FROM USERS WHERE USERID = '{}'";
/********************************************************************************
GLOBAL QUERIES
********************************************************************************/
std::string make_push_query_transaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op ) {
return fmt::format(query_template_transaction,
op.coin,
op.type,
op.userid,
op.quantity,
op.timestamp,
op.completed,
op.price);
}
std::string make_pull_query_history_ts( long ts_high, long ts_low ) {
return fmt::format(query_template_history_ts, ts_high, ts_low);
}
std::string make_pull_query_userid_list_ts( long ts_high, long ts_low ) {
return fmt::format(query_template_userid_ts, ts_high, ts_low);
}
std::string make_pull_query_transactions_userid_ts( std::string userid, long ts_high, long ts_low ) {
return fmt::format(query_template_transactions_userid_ts, userid, ts_high, ts_low);
}
std::string make_pull_query_top_price_cycle_ts( std::string coin, long ts_high, long ts_low ) {
return fmt::format(query_template_slope_ts_top, coin, ts_high, ts_low);
}
std::string make_pull_query_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low ) {
return fmt::format(query_template_slope_ts_bot, coin, ts_high, ts_low);
}
/* Query templates for COIN PRICE. */
const std::string query_template_coin_price = "INSERT INTO COINPRICE(COIN, PRICE, SALEVALUE, INCIRCULATION, LASTUPDATE) "\
"VALUES('{}', {}, {}, {}, {}) "\
"ON CONFLICT (COIN) DO UPDATE SET "\
"PRICE = {}, SALEVALUE = {}, INCIRCULATION = {}, LASTUPDATE = {};";
std::string make_push_query_coin_price( ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> op ) {
return fmt::format(query_template_coin_price,
op.coin,
op.price,
op.saleValue,
op.inCirculation,
std::time(NULL)*1000,
op.price,
op.saleValue,
op.inCirculation,
std::time(NULL)*1000);
}
/* Query templates for math functions. */
std::string make_pull_query_last_n_spaced_prices( std::string coin, int nb_cycles, int delta) {
return fmt::format(query_template_last_n_spaced_prices,
coin,
nb_cycles,
delta);
}
std::string make_push_query_user(user_t user){
return fmt::format(query_template_user,
user.userid,
user.username,
user.icon,
user.networth,
user.networth);
}
std::string make_pull_query_userid_to_username(std::string userid) {
return fmt::format(query_template_userid_to_username,
userid);
}
} }
}
/******************************************************************************** namespace db::push {
HISTORY
********************************************************************************/
/* Add transactions contained in history update to the database. */ /* Add transactions contained in history update to the database. */
void push_history( pqxx::connection* C, ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> op ) { void push_history( pqxx::connection* C, ws_msg_parsed<WS_EVENT_HISTORY> op ) {
std::string query; std::string query;
for(auto& element : op.transaction_list){ for(auto& element : op.transaction_list){
query += query::make_push_query_transaction(element); query += query::fmt::p_query_transaction(element);
}
db::commit(C, query);
}
/* Same as push_history but checks for unique timestamp. */
void push_uhistory( pqxx::connection* C, ws_msg_parsed<WS_EVENT_HISTORY> op ) {
std::string query;
for(auto& element : op.transaction_list){
query += query::make_push_query_utransaction(element);
} }
/* Create a transactional object. */ /* Create a transactional object. */
@ -127,17 +39,18 @@ namespace db {
W.commit(); W.commit();
} }
/* Returns the last cycle in form of a ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> struct. */ /* Returns the last cycle in form of a ws_msg_parsed<WS_EVENT_HISTORY> struct. */
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_last_cycle(pqxx::connection* C) { ws_msg_parsed<WS_EVENT_HISTORY> pull_last_cycle(pqxx::connection* C) {
std::string query; std::string query;
long ts_high, ts_low; long ts_high, ts_low;
ws_msg_parsed<WS_EVENT_TRANSACTION> tmp; ws_msg_parsed<WS_EVENT_TRANSACTION> tmp;
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> rop; ws_msg_parsed<WS_EVENT_HISTORY> rop;
ts_high = time(NULL)*1000; ts_high = time(NULL)*1000;
ts_low = ts_high - 600*1000; ts_low = ts_high - 600*1000;
query = query::make_pull_query_history_ts(ts_high, ts_low); query = query::make_pull_query_history_ts(ts_high, ts_low);
//std::cout << query << std::endl;
/* Create a non-transactional object. */ /* Create a non-transactional object. */
pqxx::nontransaction N(*C); pqxx::nontransaction N(*C);
@ -178,15 +91,14 @@ namespace db {
for (pqxx::result::const_iterator c = R.begin(); c != R.end(); ++c) { for (pqxx::result::const_iterator c = R.begin(); c != R.end(); ++c) {
rop.push_back(c[0].as<std::string>()); rop.push_back(c[0].as<std::string>());
} }
return rop; return rop;
} }
/* Fetches all transactions by userid during period ts_low to ts_high. */ /* Fetches all transactions by userid during period ts_low to ts_high. */
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_transactions_userid_ts( pqxx::connection *C, std::string userid, long ts_high, long ts_low) { ws_msg_parsed<WS_EVENT_HISTORY> pull_transactions_userid_ts( pqxx::connection *C, std::string userid, long ts_high, long ts_low) {
std::string query; std::string query;
ws_msg_parsed<WS_EVENT_TRANSACTION> tmp; ws_msg_parsed<WS_EVENT_TRANSACTION> tmp;
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> rop; ws_msg_parsed<WS_EVENT_HISTORY> rop;
query = query::make_pull_query_transactions_userid_ts(userid, ts_high, ts_low); query = query::make_pull_query_transactions_userid_ts(userid, ts_high, ts_low);
@ -213,19 +125,21 @@ namespace db {
} }
/* Fetches last cycle's transactions for userid. */ /* Fetches last cycle's transactions for userid. */
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_last_cycle_userid( pqxx::connection *C, std::string userid) { ws_msg_parsed<WS_EVENT_HISTORY> pull_last_cycle_userid( pqxx::connection *C, std::string userid) {
long ts_high, ts_low; long ts_high, ts_low;
ws_msg_parsed<WS_EVENT_HISTORY> rop;
ts_high = time(NULL)*1000; ts_high = time(NULL)*1000;
ts_low = ts_high - 600*1000; ts_low = ts_high - 60*10*1000;
return pull_transactions_userid_ts(C, userid, ts_high, ts_low); rop = pull_transactions_userid_ts(C, userid, ts_high, ts_low);
return rop;
} }
/******************************************************************************** /********************************************************************************
COIN PRICE COIN PRICE
********************************************************************************/ ********************************************************************************/
void push_coin_price( pqxx::connection* C, ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> op) { void push_coin_price( pqxx::connection* C, ws_msg_parsed<WS_EVENT_COIN_PRICE> op) {
std::string query; std::string query;
query = query::make_push_query_coin_price(op); query = query::make_push_query_coin_price(op);

View File

@ -19,12 +19,13 @@ namespace db {
/******************************************************************************** /********************************************************************************
HISTORY HISTORY
********************************************************************************/ ********************************************************************************/
void push_history( pqxx::connection*, ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> ); void push_history( pqxx::connection*, ws_msg_parsed<WS_EVENT_HISTORY> );
void push_uhistory( pqxx::connection*, ws_msg_parsed<WS_EVENT_HISTORY> );
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_last_cycle( pqxx::connection*); ws_msg_parsed<WS_EVENT_HISTORY> pull_last_cycle( pqxx::connection*);
std::vector<std::string> pull_userid_list_ts( pqxx::connection *, long, long); std::vector<std::string> pull_userid_list_ts( pqxx::connection *, long, long);
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_transactions_userid_ts( pqxx::connection *, std::string, long, long); ws_msg_parsed<WS_EVENT_HISTORY> pull_transactions_userid_ts( pqxx::connection *, std::string, long, long);
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_last_cycle_userid( pqxx::connection *, std::string); ws_msg_parsed<WS_EVENT_HISTORY> pull_last_cycle_userid( pqxx::connection *, std::string);
float pull_last_cycle_slope(pqxx::connection*, std::string); float pull_last_cycle_slope(pqxx::connection*, std::string);
@ -33,7 +34,7 @@ namespace db {
/******************************************************************************** /********************************************************************************
COIN PRICE COIN PRICE
********************************************************************************/ ********************************************************************************/
void push_coin_price( pqxx::connection*, ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> ); void push_coin_price( pqxx::connection*, ws_msg_parsed<WS_EVENT_COIN_PRICE> );
/******************************************************************************** /********************************************************************************
COIN PRICE COIN PRICE
@ -44,6 +45,7 @@ namespace db {
USERS USERS
********************************************************************************/ ********************************************************************************/
void push_users( pqxx::connection*, std::vector<user_t> ); void push_users( pqxx::connection*, std::vector<user_t> );
std::vector<std::string> pull_userids_ts(pqxx::connection *C, long ts_low, long ts_high);
std::string userid_to_username(pqxx::connection*, std::string userid); std::string userid_to_username(pqxx::connection*, std::string userid);
} }
#endif #endif

View File

@ -1,137 +1,112 @@
#include "db_init.h" #include "db_init.h"
/*******************************************************************************
Wrapper for single querying
*******************************************************************************/
int single_query(pqxx::connection& C, std::string query) {
try {
pqxx::work W(C);
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
}
namespace db { namespace db {
namespace create {
int create_table_history(pqxx::connection& C) { /*******************************************************************************
std::string query; Table creation queries
*******************************************************************************/
namespace table {
try { int history(pqxx::connection& C) {
/* Create a transactional object. */ // TODO: make timestamp the primary key
pqxx::work W(C); // make sizes macros
std::string query = "CREATE TABLE IF NOT EXISTS HISTORY(" \
/* Create sql query for history table creation */ "ID SERIAL PRIMARY KEY NOT NULL," \
query = "CREATE TABLE IF NOT EXISTS HISTORY(" \ "COIN CHAR(32) NOT NULL,"\
"ID SERIAL PRIMARY KEY NOT NULL," \ "TYPE INT,"\
"COIN CHAR(32) NOT NULL,"\ "USERID CHAR(128) NOT NULL,"\
"TYPE INT,"\ "QUANTITY INT,"\
"USERID CHAR(128) NOT NULL,"\ "TIMESTAMP BIGINT NOT NULL,"\
"QUANTITY INT,"\ "COMPLETED BOOLEAN,"\
"TIMESTAMP BIGINT NOT NULL,"\ "PRICE REAL);";
"COMPLETED BOOLEAN,"\ return single_query(C, query);
"PRICE REAL);";
/* Execute SQL query */
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
} }
int create_table_coin_price(pqxx::connection& C) { int coin_price(pqxx::connection& C) {
std::string query; std::string query = "CREATE TABLE IF NOT EXISTS COINPRICE(" \
"COIN CHAR(32) PRIMARY KEY,"\
try { "PRICE REAL," \
/* Create a transactional object. */ "SALEVALUE REAL," \
pqxx::work W(C); "INCIRCULATION INT, "\
"LASTUPDATE BIGINT);";
/* Create sql query for history table creation */ return single_query(C, query);
query = "CREATE TABLE IF NOT EXISTS COINPRICE(" \
"COIN CHAR(32) PRIMARY KEY,"\
"PRICE REAL," \
"SALEVALUE REAL," \
"INCIRCULATION INT, "\
"LASTUPDATE BIGINT);";
/* Execute SQL query */
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
} }
int create_last_n_anchor_prices_function(pqxx::connection& C) { int users(pqxx::connection& C) {
std::string query; std::string query = "CREATE TABLE IF NOT EXISTS USERS(" \
"USERID CHAR(128) PRIMARY KEY,"\
try { "USERNAME TEXT NOT NULL,"\
/* Create a transactional object. */ "ICON CHAR(32) NOT NULL,"\
pqxx::work W(C); "NETWORTH REAL);";
return single_query(C, query);
/* Create sql query for spaced prices function, delta is in seconds*/
query = "CREATE OR REPLACE FUNCTION last_n_spaced_prices(var_coin CHAR(32), var_n INT, var_delta INT) RETURNS SETOF HISTORY AS $$" \
"DECLARE "\
" row HISTORY%ROWTYPE; "\
" cur_date BIGINT = 9223372036854775806; "\
" nb_dates INT = 0; "\
" nb_dates_max INT = var_n; "\
"BEGIN "\
" FOR row IN "\
" SELECT * "\
" FROM HISTORY "\
" WHERE HISTORY.COIN = var_coin "\
" ORDER BY TIMESTAMP DESC "\
" LOOP "\
" IF nb_dates = nb_dates_max "\
" THEN "\
" EXIT; "\
" END IF; "\
" "\
" IF row.TIMESTAMP <= cur_date - var_delta*1000 OR cur_date IS NULL "\
" THEN "\
" cur_date := row.TIMESTAMP; "\
" nb_dates := nb_dates + 1; "\
" RETURN NEXT row; "\
" END IF; "\
" END LOOP; "\
"END; "\
"$$ LANGUAGE plpgsql; ";
/* Execute SQL query */
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
} }
} // db::create::table
int create_table_users(pqxx::connection& C) { /*******************************************************************************
std::string query; Function creation queries
*******************************************************************************/
try { namespace function {
/* Create a transactional object. */
pqxx::work W(C);
/* Create sql query for history table creation */
query = "CREATE TABLE IF NOT EXISTS USERS(" \
"USERID CHAR(128) PRIMARY KEY,"\
"USERNAME TEXT NOT NULL,"\
"ICON CHAR(32) NOT NULL,"\
"NETWORTH REAL);";
/* Execute SQL query */
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
int last_n_anchor_prices_function(pqxx::connection& C) {
/* Create sql query for spaced prices function, delta is in seconds*/
std::string query = "CREATE OR REPLACE FUNCTION last_n_spaced_prices(var_coin CHAR(32), var_n INT, var_delta INT) RETURNS SETOF HISTORY AS $$" \
"DECLARE "\
" row HISTORY%ROWTYPE; "\
" cur_date BIGINT = 9223372036854775806; "\
" nb_dates INT = 0; "\
" nb_dates_max INT = var_n; "\
"BEGIN "\
" FOR row IN "\
" SELECT * "\
" FROM HISTORY "\
" WHERE HISTORY.COIN = var_coin "\
" ORDER BY TIMESTAMP DESC "\
" LOOP "\
" IF nb_dates = nb_dates_max "\
" THEN "\
" EXIT; "\
" END IF; "\
" "\
" IF row.TIMESTAMP <= cur_date - var_delta*1000 OR cur_date IS NULL "\
" THEN "\
" cur_date := row.TIMESTAMP; "\
" nb_dates := nb_dates + 1; "\
" RETURN NEXT row; "\
" END IF; "\
" END LOOP; "\
"END; "\
"$$ LANGUAGE plpgsql; ";
return single_query(C, query);
} }
} // db::create::function
/*******************************************************************************
Database creation query
*******************************************************************************/
namespace database{
int populate() { int all(void) {
try { try {
pqxx::connection C("dbname = nasfaq user = steaky \ // TODO: GLOBAL INFO
hostaddr = 127.0.0.1 port = 5432"); pqxx::connection C("dbname = nasfaq user = steaky hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) { if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl; std::cout << "Opened database successfully: " << C.dbname() << std::endl;
@ -140,13 +115,11 @@ int populate() {
return 1; return 1;
} }
/* Create tables. */ table::history(C);
create_table_history(C); table::coin_price(C);
create_table_coin_price(C); table::users(C);
create_table_users(C);
/* Create functions. */ function::last_n_anchor_prices_function(C);
create_last_n_anchor_prices_function(C);
} catch (const std::exception &e) { } catch (const std::exception &e) {
std::cerr << e.what() << std::endl; std::cerr << e.what() << std::endl;
@ -154,4 +127,6 @@ int populate() {
} }
return 0; return 0;
} }
} // db::create::database
} // db::create
} }

View File

@ -3,16 +3,9 @@
#include <iostream> #include <iostream>
#include <pqxx/pqxx> #include <pqxx/pqxx>
#include <fmt/core.h>
#include <ctime>
#include "../http/users.h" namespace db::create::database {
int all(void);
namespace db {
int create_table_history(pqxx::connection&);
int create_table_coin_price(pqxx::connection&);
int create_table_users(pqxx::connection&);
int populate();
} }
#endif #endif

View File

@ -0,0 +1,82 @@
#include "queries.h"
namespace db::queries::fmt {
std::string p_transaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op ) {
return fmt::format(p_transaction,
op.coin,
op.type,
op.userid,
op.quantity,
op.timestamp,
op.completed,
op.price);
}
std::string p_utransaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op ) {
return fmt::format(p_utransaction,
op.coin,
op.type,
op.userid,
op.quantity,
op.timestamp,
op.completed,
op.price,
op.timestamp);
}
std::string f_history_ts( long ts_high, long ts_low ) {
return fmt::format(f_history_ts, ts_high, ts_low);
}
std::string f_userid_list_ts( long ts_high, long ts_low ) {
return fmt::format(f_userid_ts, ts_high, ts_low);
}
std::string f_transactions_userid_ts( std::string userid, long ts_high, long ts_low ) {
return fmt::format(f_transactions_userid_ts, userid, ts_high, ts_low);
}
std::string f_top_price_cycle_ts( std::string coin, long ts_high, long ts_low ) {
return fmt::format(f_slope_ts_top, coin, ts_high, ts_low);
}
std::string f_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low ) {
return fmt::format(f_slope_ts_bot, coin, ts_high, ts_low);
}
std::string p_coin_price( ws_msg_parsed<WS_EVENT_COIN_PRICE> op ) {
return fmt::format(p_coin_price,
op.coin,
op.price,
op.saleValue,
op.inCirculation,
std::time(NULL)*1000,
op.price,
op.saleValue,
op.inCirculation,
std::time(NULL)*1000);
}
/* Query templates for math functions. */
std::string f_last_n_spaced_prices( std::string coin, int nb_cycles, int delta) {
return fmt::format(f_last_n_spaced_prices,
coin,
nb_cycles,
delta);
}
std::string p_user(user_t user){
return fmt::format(p_user,
user.userid,
user.username,
user.icon,
user.networth,
user.networth);
}
std::string f_query_userid_to_username(std::string userid) {
return fmt::format(f_userid_to_username,
userid);
}
} // db::queries::fmt

75
connections/sql/queries.h Normal file
View File

@ -0,0 +1,75 @@
#ifndef _QUERIES_H_
#define _QUERIES_H_
#include "../common/common.h"
namespace db {
namespace query {
namespace tmplt {
/* Push a user in USERS */
const std::string p_user = "INSERT INTO USERS(USERID, USERNAME, ICON, NETWORTH) "\
"VALUES('{}', E'{}', '{}', {}) "
"ON CONFLICT (USERID) DO UPDATE SET "\
"NETWORTH = {};";
/* Push a transaction in HISTORY */
const std::string p_transaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\
"VALUES('{}', {}, '{}', {}, {}, {}, {});";
/* Push a transaction in HISTORY with unicity check (for db completion) */
const std::string p_utransaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\
"SELECT '{}', {}, '{}', {}, {}, {}, {} " \
"WHERE NOT EXISTS (SELECT TIMESTAMP FROM HISTORY WHERE TIMESTAMP = {} limit 1);";
/* Update coin price. */
const std::string p_coin_price = "INSERT INTO COINPRICE(COIN, PRICE, SALEVALUE, INCIRCULATION, LASTUPDATE) "\
"VALUES('{}', {}, {}, {}, {}) "\
"ON CONFLICT (COIN) DO UPDATE SET "\
"PRICE = {}, SALEVALUE = {}, INCIRCULATION = {}, LASTUPDATE = {};";
const std::string f_history_ts = "SELECT * FROM HISTORY "\
"WHERE TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string f_userid_ts = "SELECT DISTINCT USERID FROM HISTORY " \
"WHERE TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string f_slope_ts_top = "SELECT PRICE, TIMESTAMP FROM HISTORY "\
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\
"ORDER BY TIMESTAMP ASC "\
"LIMIT 1;";
const std::string f_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\
"ORDER BY TIMESTAMP DESC "\
"LIMIT 1;";
const std::string f_transactions_userid_ts = "SELECT * FROM HISTORY "\
"WHERE USERID = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string f_last_n_spaced_prices = "select PRICE, TIMESTAMP FROM last_n_spaced_prices('{}', {}, {});";
const std::string f_id_to_username = "select USERNAME FROM USERS WHERE USERID = '{}'";
} // db::query::tmpt
namespace db::queries::fmt {
std::string p_transaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op );
std::string p_utransaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op );
std::string p_coin_price( ws_msg_parsed<WS_EVENT_COIN_PRICE> op );
std::string p_user(user_t user);
std::string f_history_ts( long ts_high, long ts_low );
std::string f_userid_list_ts( long ts_high, long ts_low );
std::string f_transactions_userid_ts( std::string userid, long ts_high, long ts_low );
std::string f_top_price_cycle_ts( std::string coin, long ts_high, long ts_low );
std::string f_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low );
std::string f_last_n_spaced_prices( std::string coin, int nb_cycles, int delta);
std::string f_query_userid_to_username(std::string userid);
} // db::query::fmt
#endif

View File

@ -0,0 +1,8 @@
g++ ../../common/common.cpp \
../../aux/complete_db.cpp \
../../http/http_connector.cpp \
../../http/users.cpp \
../../sql/db_init.cpp \
../../sql/db_handle.cpp \
./test_complete_db.cpp \
-o test_complete_db -lpqxx -lpq -lfmt -lcurl -lcrypto -lssl -lboost_system -lboost_random -lboost_thread -lpthread && ./test_complete_db

Binary file not shown.

View File

@ -0,0 +1,9 @@
#include "test_complete_db.h"
int main(void) {
long ts_up = 1655503200000;
unsigned int days = 30;
complete_db(ts_up, days, false);
return 1;
}

View File

@ -0,0 +1,2 @@
#include "../../aux/complete_db.h"

BIN
connections/test/db_init Executable file

Binary file not shown.

Binary file not shown.

View File

@ -2,10 +2,53 @@
#include <graphviz/cgraph.h> #include <graphviz/cgraph.h>
#include "../common/common.h" #include "../common/common.h"
#include "../sql/db_init.h"
#include "../sql/db_handle.h"
#include "../../maths/maths.h"
#include "../../maths/graph.h" #include "../../maths/graph.h"
std::map<std::string, std::map<std::string, float>> make_map(pqxx::connection *C) {
// Timestamps
long ts_high = time(NULL)*1000;
long ts_low = ts_high - 60*10*1000;
float thresh = 10;
/* Create graph (test zone) */
std::map<std::string, std::map<std::string, float>> res;
std::vector<std::string> userids = db::pull_userid_list_ts(C, ts_high, ts_low);
std::cout << "Number of userids active during delta_ts: " << userids.size() << std::endl;
res = norm::diff_map(C, userids, ts_high, ts_low, thresh);
return res;
}
void make_json(pqxx::connection *C, std::map<std::string, std::map<std::string, float>> op) {
norm::diff_save_json(C, op);
}
void make_graph(std::map<std::string, std::map<std::string, float>> op) {
make_graph_from_map(op);
}
int main(void) { int main(void) {
test(); /* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
}
std::map<std::string, std::map<std::string, float>> res;
res = make_map(&C);
make_json(&C, res);
//make_graph(res);
return 1; return 1;
} }

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,7 @@
g++ ../common/common.cpp \ g++ ../common/common.cpp \
../sql/db_init.cpp \
../sql/db_handle.cpp \
../../maths/graph.cpp \ ../../maths/graph.cpp \
../../maths/maths.cpp \
graph.cpp \ graph.cpp \
-L/usr/lib -lgvc -lcgraph -lfmt -lpqxx -lpq -o graph && ./graph -L/usr/lib -lgvc -lcgraph -lfmt -lpqxx -lpq -o graph && ./graph

0
connections/test/log.txt Normal file
View File

Binary file not shown.

View File

@ -2,7 +2,6 @@
#include "../http/http_connector.h" #include "../http/http_connector.h"
#include "../http/users.h" #include "../http/users.h"
#include "../ws/http_handshake.h" #include "../ws/http_handshake.h"
#include "../safe_queue/safe_queue.h"
#include "../ws/ssl_ws.h" #include "../ws/ssl_ws.h"
#include "../sql/db_init.h" #include "../sql/db_init.h"
#include "../sql/db_handle.h" #include "../sql/db_handle.h"
@ -36,7 +35,7 @@ int connect_ws() {
} }
/* Populate database if needed. */ /* Populate database if needed. */
db::populate(); db::create::database::all();
/* Open up database connection. */ /* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \ pqxx::connection C("dbname = nasfaq user = steaky \

View File

@ -0,0 +1,29 @@
#include "../common/common.h"
#include "../sql/db_init.h"
int connect_db() {
bool done = false;
std::string input;
/* Populate database if needed. */
db::create::database::all();
/* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
return 1;
}
return 0;
}
int main(void) {
connect_db();
return 1;
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -4,57 +4,76 @@ import networkx as nx
import pyvis import pyvis
import json import json
THRESHOLD = 10 MAX_DISPLAY = 3
THRESHOLD = 5
def make_net(N, json_path): fun = lambda x : 10-x
with open(json_path, 'r') as f:
data = json.load(f)
weighted_edges = [] json_path = "graph.json"
for parent, element in data.items():
for child, weight in element.items():
if parent == child: pass
elif weight > THRESHOLD: pass
else:
weighted_edges.append((parent, child, round(weight, 1)))
# Add edges net = Network(height='980px', width='100%', bgcolor='#222222', font_color='white')
N.add_weighted_edges_from(weighted_edges) net.barnes_hut()
def nudge(pos, x_shift, y_shift): with open(json_path, 'r') as f:
return {n:(x + x_shift, y + y_shift) for n,(x,y) in pos.items()} data = json.load(f)
net = nx.Graph() sources, targets, weights = [], [], []
make_net(net, "graph.json")
pos = nx.circular_layout(net) for parent, element in data.items():
tmp_targets, tmp_weights = [], []
for child, weight in element.items():
if parent == child: pass
else:
adj_weight = fun(weight)
net.add_node(parent, parent, title = parent)
net.add_node(child, child, title = child)
nx.draw_networkx( tmp_targets.append(child)
net, pos, edge_color='black', width=1, linewidths=1, tmp_weights.append( round(adj_weight, 2) )
node_size=100, node_color='pink', alpha=0.9,
with_labels = False)
#labels={node: node for node in net.nodes()}
edge_labels = dict([((n1, n2), net[n1][n2]['weight']) s1, s2 = (list(reversed(t)) for t in zip(*sorted(zip(tmp_weights, tmp_targets))))
for n1, n2 in net.edges]) for i in range(min(MAX_DISPLAY, len(s2))):
w = round(s1[i], 2)
if w >= THRESHOLD:
net.add_edge(parent, s2[i], value = w, title = s1[i])
#nx.draw_networkx_edge_labels( neighbor_map = net.get_adj_list()
# net, pos, edges = net.get_edges()
# edge_labels=edge_labels, nodes = net.get_nodes()
# font_color='red'
#)
N_nodes = len(nodes)
N_edges = len(edges)
pretty_net = Network(height='750px', width='100%', bgcolor='#222222', font_color='white') weights=[[] for i in range(N_nodes)]
pretty_net.barnes_hut()
pretty_net.from_nx(net)
pretty_net.show_buttons(filter_=['physics'])
#Associating weights to neighbors
for i in range(N_nodes): #Loop through nodes
for neighbor in neighbor_map[nodes[i]]: #and neighbors
for j in range(N_edges): #associate weights to the edge between node and neighbor
if (edges[j]['from']==nodes[i] and edges[j]['to']==neighbor) or \
(edges[j]['from']==neighbor and edges[j]['to']==nodes[i]):
weights[i].append(edges[j]['value'])
for node,i in zip(net.nodes,range(N_nodes)):
node['value']=len(neighbor_map[node['id']])
node['weight']=[str(weights[i][k]) for k in range(len(weights[i]))]
list_neighbor=list(neighbor_map[node['id']])
pretty_net.show("graph.html") #Sort by score
w_list = [node['weight'][k] for k in range(node['value'])]
n_list = [list_neighbor[k] for k in range(node['value'])]
try:
s_weights, s_neighbors = (list(t) for t in zip(*sorted(zip(w_list, n_list))))
#plt.axis('off') #Concatenating neighbors and weights
#plt.show() hover_str=[s_neighbors[k]+' '+ s_weights[k] for k in range(node['value'])]
#Setting up node title for hovering
node['title']+=' Neighbors:<br>'+'<br>'.join(hover_str)
except ValueError:
pass
net.toggle_physics(True)
net.show("graph.html")

View File

@ -1,10 +1,11 @@
g++ ../safe_queue/safe_queue.cpp \ g++ ../common/common.cpp \
../common/common.cpp \ ../common/formatting.cpp \
../http/http_connector.cpp \ ../http/http_connector.cpp \
../http/users.cpp \ ../http/users.cpp \
../ws/ssl_ws.cpp \ ../ws/ssl_ws.cpp \
../sql/db_init.cpp \ ../sql/db_init.cpp \
../sql/db_handle.cpp \ ../sql/db_handle.cpp \
../parser/parser_aux.cpp \
../parser/parser.cpp \ ../parser/parser.cpp \
../ws/http_handshake.cpp \ ../ws/http_handshake.cpp \
./main.cpp \ ./main.cpp \

View File

@ -0,0 +1,4 @@
g++ ../common/common.cpp \
../sql/db_init.cpp \
./main_db_init.cpp \
-L/usr/lib -lgvc -lcgraph -lfmt -lpqxx -lpq -o db_init && ./db_init

Binary file not shown.

View File

@ -1,62 +0,0 @@
#include "../common/common.h"
#include "../sql/db_init.h"
#include "../sql/db_handle.h"
#include "../../maths/maths.h"
#include "../../maths/graph.h"
int connect_db() {
bool done = false;
std::string input;
/* Populate database if needed. */
db::populate();
/* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
return 1;
}
/* Loop and print data through parser*/
while(!done) {
input = std::cin.get();
if(input == "q" || input == "quit") {
done = true;
} else if (input == "t") {
std::string coin = "miko";
//float s = db::pull_last_cycle_slope(&C, coin);
float ws = optimizer::get_last_n_weighted_slope(&C, coin, 2, 600);
std::cout << "slope: " << ws << std::endl;
} else if (input == "d") {
long ts_low, ts_high;
float threshold;
std::vector<std::string> userids;
std::map<std::string, std::map<std::string, float>> result;
ts_high = time(NULL)*1000;
ts_low = ts_high - 600*1000;
threshold = 10;
userids = db::pull_userid_list_ts(&C, ts_high, ts_low);
result = norm::diff_map(&C, userids, ts_high, ts_low, threshold);
//make_graph_from_map(result);
norm::diff_save_json(&C, result);
}
}
return 0;
}
int main(void) {
connect_db();
return 1;
}

View File

@ -1,7 +0,0 @@
g++ ../common/common.cpp \
../sql/db_init.cpp \
../sql/db_handle.cpp \
../../maths/maths.cpp \
../../maths/graph.cpp \
sql.cpp \
-L/usr/lib -lgvc -lcgraph -lfmt -lpqxx -lpq -o sql && ./sql

View File

@ -6,8 +6,6 @@ namespace ws {
static const char *POLLING_URL_0 = "https://nasfaq.biz/socket/?EIO=4&transport=polling&t=Ny7z439"; static const char *POLLING_URL_0 = "https://nasfaq.biz/socket/?EIO=4&transport=polling&t=Ny7z439";
static const char *POLLING_URL_1 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z472"; static const char *POLLING_URL_1 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z472";
static const char *POLLING_URL_2 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z4Bn&sid="; static const char *POLLING_URL_2 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z4Bn&sid=";
static const char *POLLING_URL_3 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z4Bp&sid=";
static const char *POLLING_URL_4 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z4EU&sid=";
nlohmann::json json_ret; nlohmann::json json_ret;
std::string sid; std::string sid;
@ -24,10 +22,6 @@ namespace ws {
/* Post "40" acknowledgement with sid. */ /* Post "40" acknowledgement with sid. */
c.post(POLLING_URL_2 + sid, "40"); c.post(POLLING_URL_2 + sid, "40");
/* Continue handshake (might be unneeded). See XHR trace. */
//c.get(POLLING_URL_3 + sid);
//c.get(POLLING_URL_4 + sid);
return sid; return sid;
} }
} }

View File

@ -67,6 +67,7 @@ namespace ws {
} }
} else if (payload.substr(0, 2) == "42") { } else if (payload.substr(0, 2) == "42") {
// Raw messages are of the form "42[PAYLOAD]"
std::string payload_format = payload.substr(3, payload.length() - 4); std::string payload_format = payload.substr(3, payload.length() - 4);
push_message(payload_format); push_message(payload_format);
} }
@ -85,11 +86,6 @@ namespace ws {
return m_status; return m_status;
} }
/**
Should it use the cdt variable?
If it doesn't lock this thread up maybe, otherwise it should be defined externally.
Isn't the connection in another thread anyways?
*/
bool connection_metadata::msg_queue_empty(void) { bool connection_metadata::msg_queue_empty(void) {
return m_msg_q.empty(); return m_msg_q.empty();
} }
@ -250,8 +246,17 @@ namespace ws {
std::cout << "> Error sending message: " << ec.message() << std::endl; std::cout << "> Error sending message: " << ec.message() << std::endl;
return; return;
} }
}
// metadata_it->second->record_sent_message(message); // TODO This should read to a buffer (void function)
std::string websocket_endpoint::read(int id) const {
con_list::const_iterator metadata_it = m_connection_list.find(id);
if(metadata_it == m_connection_list.end()) {
std::cout << "> No connection found with id " << id << std::endl;
return "";
}
return metadata_it->second->pop_message();
} }
connection_metadata::ptr websocket_endpoint::get_metadata(int id) const { connection_metadata::ptr websocket_endpoint::get_metadata(int id) const {
@ -262,12 +267,5 @@ namespace ws {
return metadata_it->second; return metadata_it->second;
} }
} }
std::string websocket_endpoint::get_queue_front(int id) const {
con_list::const_iterator metadata_it = m_connection_list.find(id);
if(metadata_it == m_connection_list.end()) {
return "";
} else {
return metadata_it->second->pop_message();
}
}
} }

View File

@ -1,23 +1,13 @@
#ifndef _SSL_WS_H_ #ifndef _SSL_WS_H_
#define _SSL_WS_H_ #define _SSL_WS_H_
#include <iostream>
#include <string>
#include <stdlib.h>
//#include "../parser/parser.h"
#include "../common/common.h" #include "../common/common.h"
#include "../safe_queue/safe_queue.h"
#include <websocketpp/config/asio_client.hpp> #include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp> #include <websocketpp/client.hpp>
#include <websocketpp/common/thread.hpp> #include <websocketpp/common/thread.hpp>
#include <websocketpp/common/memory.hpp> #include <websocketpp/common/memory.hpp>
#include <nlohmann/json.hpp>
namespace ws { namespace ws {
typedef websocketpp::client<websocketpp::config::asio_tls_client> client; typedef websocketpp::client<websocketpp::config::asio_tls_client> client;
typedef std::shared_ptr<boost::asio::ssl::context> context_ptr; typedef std::shared_ptr<boost::asio::ssl::context> context_ptr;
@ -49,7 +39,6 @@ namespace ws {
std::string m_uri; std::string m_uri;
std::string m_server; std::string m_server;
std::string m_error_reason; std::string m_error_reason;
//std::vector<std::string> m_messages;
client *m_endpoint; client *m_endpoint;
std::queue<std::string> m_msg_q; std::queue<std::string> m_msg_q;
mutable std::mutex m_msg_q_mutex; mutable std::mutex m_msg_q_mutex;
@ -68,7 +57,7 @@ namespace ws {
void close(int, websocketpp::close::status::value, std::string); void close(int, websocketpp::close::status::value, std::string);
void send(int, std::string); void send(int, std::string);
connection_metadata::ptr get_metadata(int) const; connection_metadata::ptr get_metadata(int) const;
std::string get_queue_front(int) const; std::string read(int) const;
private: private:
typedef std::map<int, connection_metadata::ptr> con_list; typedef std::map<int, connection_metadata::ptr> con_list;
client m_endpoint; client m_endpoint;

View File

@ -0,0 +1,90 @@
#include "../../common/common.h"
#include "../../http/http_connector.h"
#include "../../ws/http_handshake.h"
#include "../../ws/ssl_ws.h"
int connect_ws() {
bool done = false;
std::string input;
ws::websocket_endpoint endpoint;
/* Get session id */
std::string sid = ws::http_handshake::get_sid();
/* Start connection */
std::string uri = "wss://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=websocket&sid=" + sid;
int id = endpoint.connect(uri);
if(id != -1) {
std::cout << ">Created nasfaq websocket connection" << std::endl;
} else {
return -1;
}
sleep(1);
/* Display connection metadata */
ws::connection_metadata::ptr metadata = endpoint.get_metadata(id);
if (metadata) {
std::cout << *metadata << std::endl;
} else {
std::cout << ">Unknown connection id " << id << std::endl;
}
/* Populate database if needed. */
db::populate();
/* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
return 1;
}
/* Update users. */
db::push_users(&C, users::get_users());
/* Set up parser */
parser::parser p(metadata, &C);
/* Loop and print data through parser*/
while(!done) {
input = std::cin.get();
if(input == "q" || input == "quit") {
done = true;
} else if (input == "p") {
std::cout << "PROCESSING QUEUE " << std::endl;
p.process_queue_start();
} else if (input == "s") {
std::cout << "STOPPING QUEUE PROCESSING" << std::endl;
p.process_queue_stop();
} else if (input == "t") {
std::cout << "TEST" << std::endl;
auto ret = db::pull_last_cycle(&C);
std::cout << ret << std::endl;
}
}
/* Close websocket */
std::stringstream ss(input);
std::string cmd;
int close_code = websocketpp::close::status::normal;
std::string reason;
ss >> cmd >> id >> close_code;
std::getline(ss, reason);
endpoint.close(id, close_code, reason);
return 0;
}
int main(void) {
connect_ws();
return 1;
}

11
connections/ws/test/run.sh Executable file
View File

@ -0,0 +1,11 @@
g++ ../safe_queue/safe_queue.cpp \
../common/common.cpp \
../http/http_connector.cpp \
../http/users.cpp \
../ws/ssl_ws.cpp \
../sql/db_init.cpp \
../sql/db_handle.cpp \
../parser/parser.cpp \
../ws/http_handshake.cpp \
./main.cpp \
-o main -lpqxx -lpq -lfmt -lcurl -lcrypto -lssl -lboost_system -lboost_random -lboost_thread -lpthread && ./main

View File

@ -1,5 +1,7 @@
#include "maths.h" #include "maths.h"
#define DIFF_ALL_MIN_CYCLE_SIZE 0
namespace query { namespace query {
const std::string query_template_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\ const std::string query_template_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\ "WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\
@ -56,36 +58,63 @@ namespace norm {
return rop; return rop;
} }
std::map<std::string, float> diff_all(pqxx::connection *C, cycle_t base_cycle, std::map<std::string, cycle_t> cycles, std::vector<std::string> userid_list, std::string userid, long ts_high, long ts_low, float threshold) { std::map<std::string, float> diff_all(
std::map<std::string, float> result; pqxx::connection *C,
cycle_t tmp1, tmp2; cycle_t base_cycle,
float tmp_res; std::map<std::string, cycle_t> cycles,
long ts_high,
long ts_low,
float threshold){
for(auto const& id : userid_list) { std::map<std::string, float> result;
tmp2 = cycles.find(id)->second; std::string tmp_uid;
if (cycle_size(tmp2) >= 10) { cycle_t tmp_c;
tmp_res = norm(diff_vector(tmp1, tmp2)); float tmp_f;
if (tmp_res < threshold) {
result[id] = tmp_res; for(std::map<std::string,cycle_t>::iterator iter = cycles.begin(); iter != cycles.end(); ++iter) {
std::cout << "Difference between " << userid << " and " << id << " : " << result[id] << std::endl; tmp_uid = iter->first;
} tmp_c = iter->second;
tmp_f = norm(diff_vector(base_cycle, tmp_c));
if(tmp_f && tmp_f < threshold){
result.insert(std::pair<std::string, float>(tmp_uid, tmp_f));
} }
} }
return result; return result;
} }
std::map<std::string, std::map<std::string, float>> diff_map(pqxx::connection *C, std::vector<std::string> userids, long ts_high, long ts_low, float threshold){ std::map<std::string, std::map<std::string, float>> diff_map(
pqxx::connection *C,
std::vector<std::string> userids,
long ts_high,
long ts_low,
float threshold){
std::map<std::string, std::map<std::string, float>> result; std::map<std::string, std::map<std::string, float>> result;
std::map<std::string, cycle_t> cycles; std::map<std::string, cycle_t> cycles;
std::map<std::string, float> tmp; std::map<std::string, float> tmp;
cycle_t tmp_c;
int c_size;
unsigned int pos;
std::string userid;
/* Prepare cycles from db. */ /* Prepare cycles from db. */
for(auto const& id : userids) { pos = 0;
cycles.insert( std::pair<std::string, cycle_t>(id, history_to_cycle(db::pull_last_cycle_userid(C, id))) ); for(auto const& userid:userids) {
tmp_c = history_to_cycle(db::pull_last_cycle_userid(C, userid));
c_size = cycle_size(tmp_c);
/* Only keep players with large cycles */
if (c_size >= DIFF_ALL_MIN_CYCLE_SIZE) {
cycles.insert(std::pair<std::string, cycle_t>(userid, tmp_c));
}
pos++;
} }
for (auto& userid:userids) { /* Compute norms and diffs */
tmp = norm::diff_all(C, cycles[userid], cycles, userids, userid, ts_high, ts_low, threshold); for(std::map<std::string,cycle_t>::iterator iter = cycles.begin(); iter != cycles.end(); ++iter) {
userid = iter->first;
tmp = norm::diff_all(C, cycles[userid], cycles, ts_high, ts_low, threshold);
result.insert( std::pair<std::string, std::map<std::string, float>>( userid, tmp )); result.insert( std::pair<std::string, std::map<std::string, float>>( userid, tmp ));
} }

View File

@ -14,14 +14,32 @@ namespace norm {
float norm(cycle_t); float norm(cycle_t);
cycle_t history_to_cycle(ws_msg_parsed<WS_EVENT_HISTORY_UPDATE>); cycle_t history_to_cycle(ws_msg_parsed<WS_EVENT_HISTORY_UPDATE>);
cycle_t diff_vector(cycle_t, cycle_t); cycle_t diff_vector(cycle_t, cycle_t);
std::map<std::string, float> diff_all(pqxx::connection *C, cycle_t base_cycle, std::map<std::string, cycle_t> cycles, std::vector<std::string> userid_list, std::string userid, long ts_high, long ts_low, float threshold); std::map<std::string, float> diff_all(
std::map<std::string, std::map<std::string, float>> diff_map(pqxx::connection *C, std::vector<std::string> userid_list, long, long, float); pqxx::connection *C,
void diff_save_json( pqxx::connection *, std::map<std::string, std::map<std::string, float>>); cycle_t base_cycle,
std::map<std::string, cycle_t> cycles,
std::vector<std::string> userid_list,
long ts_high,
long ts_low,
float threshold);
std::map<std::string, std::map<std::string, float>> diff_map(
pqxx::connection *C,
std::vector<std::string> userid_list,
long,
long,
float);
void diff_save_json(
pqxx::connection *,
std::map<std::string,std::map<std::string, float>>);
} }
namespace optimizer{ namespace optimizer{
std::tuple<long, long> get_last_nth_cycle_ts(int); std::tuple<long, long> get_last_nth_cycle_ts(int);
float get_last_n_weighted_slope(pqxx::connection*, std::string, int, int); float get_last_n_weighted_slope(
pqxx::connection*,
std::string,
int,
int);
} }
#endif #endif

Binary file not shown.

View File

@ -0,0 +1,7 @@
import json
USERID = "314d0bda-d7f0-4636-aed7-5ea02743604b"
USERID_P1 = "10dcbbd5-f761-44f3-92ce-ea5b01f1d747"
USERID_P2 = "193ecdf2-689e-40da-b245-d28db3a02d2b"
URL_API = "https://nasfaq.biz/api/"
COINS = ['aki', 'amelia', 'aqua', 'ayame', 'azki', 'botan', 'calliope', 'choco', 'civia', 'coco', 'flare', 'fubuki', 'gura', 'haato', 'himemoriluna', 'hololive', 'inanis', 'iofi', 'kanata', 'kiara', 'korone', 'lamy', 'marine', 'matsuri', 'mel', 'melfissa', 'miko', 'mio', 'moona', 'nana', 'nene', 'noel', 'okayu', 'ollie', 'pekora', 'polka', 'reine', 'risu', 'roboco', 'rushia', 'shion', 'sora', 'subaru', 'suisei', 'towa', 'ui', 'watame', 'laplus', 'lui', 'koyori', 'chloe', 'iroha', 'irys', 'sana', 'fauna', 'kronii', 'mumei', 'baelz']

View File

@ -0,0 +1,35 @@
import psycopg2
from common import *
import psycopg2.extras
def get_db_connection():
return psycopg2.connect(cursor_factory=psycopg2.extras.RealDictCursor,
database="nasfaq", user = "steaky", host = "127.0.0.1", port = "5432")
def fetch_history(cursor, ts_low, ts_high, coins = COINS, userids = None):
if coins == None: return {}
query = "SELECT * FROM HISTORY WHERE (TIMESTAMP > {} AND TIMESTAMP <= {}) AND (".format(ts_low, ts_high)
# Filter by coins
for i in range(len(coins)):
if i == len(coins) - 1:
query += "COIN = '{}' )".format(coins[i])
else:
query += "COIN = '{}' OR ".format(coins[i])
# Filter by userids
if userids: query += "AND ("
for i in range(len(userids)):
if i == len(userids) - 1:
query += "userid = '{}' )".format(userids[i])
else:
query += "userid = '{}' OR ".format(userids[i])
query += ";"
print(query)
cursor.execute(query)
res = cursor.fetchall()
return res

View File

@ -0,0 +1,29 @@
from common import *
import requests
def fetch_coin_prices():
url = URL_API + "getMarketInfo?all"
r = requests.get(url)
return r.json()['coinInfo']['data']
def fetch_coin_history():
url = URL_API + "getMarketInfo?all&history"
r = requests.get(url)
return r.json()['coinInfo']['data']
def fetch_coin_qty_all():
rop = {}
url = URL_API + "getUserWallet?userid=" + USERID
r = requests.get(url)
for key, item in r.json()['wallet']['coins'].items():
rop[key] = item['amt']
return rop
def fetch_balance():
url = URL_API + "getUserWallet?userid=" + USERID
r = requests.get(url)
return r.json()['wallet']['balance']

50
plots/coin_prices/main.py Normal file
View File

@ -0,0 +1,50 @@
import matplotlib.pyplot as plt
import matplotlib.dates as md
import datetime as dt
import time
import random
from common import *
from db_handle import *
from http_handle import *
if __name__ == '__main__':
C = get_db_connection()
cur = C.cursor()
## Fetch data
days = 0
delta = 30
ts_high = time.mktime(time.localtime()) * 1000 - days*24*60*60*1000
ts_low = ts_high - delta*60*60*24*1000
coins = ['subaru']
userids = ['c9b0830c-8cf7-4e1d-b8f3-60bbc85160fa', '990400b5-f0e2-4b91-8aa6-5a74dc16bcc5']
hist = fetch_history(cur, ts_low, ts_high, coins, userids)
# Analyse data
T0, T1, Y0, Y1 = [], [], [], []
for tr in hist:
if tr['userid'][:5] == userids[0][:5]:
Y0.append(tr["quantity"])
T0.append(tr["timestamp"]// 1000)
else:
Y1.append(tr["quantity"])
T1.append(tr["timestamp"]// 1000)
dates0 = [dt.datetime.fromtimestamp(ts) for ts in T0]
X0 = md.date2num(dates0)
dates1 = [dt.datetime.fromtimestamp(ts) for ts in T1]
X1 = md.date2num(dates1)
ax=plt.gca()
xfmt = md.DateFormatter('%H:%M:%S')
ax.xaxis.set_major_formatter(xfmt)
# Plot data
plt.plot(X1, Y1, "r+")
plt.plot(X0, Y0, "b+")
plt.show()

View File

@ -0,0 +1 @@
SELECT * FROM HISTORY WHERE (TIMESTAMP > 1655399889000.0 AND TIMESTAMP <= 1655486289000.0) AND (COIN = 'civia' )AND (userid = '10dcbbd5-f761-44f3-92ce-ea5b01f1d747' OR userid = '193ecdf2-689e-40da-b245-d28db3a02d2b' );