diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f76c937 --- /dev/null +++ b/.gitignore @@ -0,0 +1,54 @@ +##### C + +# Prerequisites +*.d + +# Object files +*.o +*.ko +*.obj +*.elf + +# Linker output +*.ilk +*.map +*.exp + +# Precompiled Headers +*.gch +*.pch + +# Libraries +*.lib +*.a +*.la +*.lo + +# Shared objects (inc. Windows DLLs) +*.dll +*.so +*.so.* +*.dylib + +# Executables +*.exe +*.out +*.app +*.i*86 +*.x86_64 +*.hex + +# Debug files +*.dSYM/ +*.su +*.idb +*.pdb + +# Kernel Module Compile Results +*.mod* +*.cmd +.tmp_versions/ +modules.order +Module.symvers +Mkfile.old +dkms.conf diff --git a/MutualFundsJSON.txt b/MutualFundsJSON.txt new file mode 100644 index 0000000..62b1cac --- /dev/null +++ b/MutualFundsJSON.txt @@ -0,0 +1,165 @@ + + + + + + + + + + + + + + + +/***************** MUTUAL FUNDS *******************************/ + +42[ "mutualFundPortfolioUpdate", + {"fund":"6796531e-8668-49d1-bb01-f07db4264ce6", + "tUpdate": + {"event":"Transaction Update", + "transactions":[{ + "coin":"miko", + "type":0, + "userid":"6796531e-8668-49d1-bb01-f07db4264ce6", + "quantity":1, + "timestamp":1655806346312, + "completed":false, + "price":null,"fund":true + }], + "portfolio":null + } + } +] + +42[ "mutualFundPortfolioUpdate", + {"fund":"6796531e-8668-49d1-bb01-f07db4264ce6", + "tUpdate": + {"event":"Transaction Update", + "transactions":[{ + "coin":"miko","type":0,"userid":"6796531e-8668-49d1-bb01-f07db4264ce6","quantity":1,"timestamp":1655806346312,"completed":true,"price":32820.18,"fund":true}], + "portfolio":{ + "miko":{"amount":352,"timestamp":1655806346312,"meanPurchasePrice":29645.69} + } + } + }] + +42[ "mutualFundBalanceUpdate", + {"funds": + {"6796531e-8668-49d1-bb01-f07db4264ce6":4924583.6} + } +] + +42[ "mutualFundRunningHistoryUpdate", + {"f9df4304-e084-494a-b918-9b5ab1c6842b": + {"volume":1000,"price":100000,"saleValue":100000,"networth":100000000,"members":1,"timestamp":1655806533376}, + "851a75f1-19cf-4aad-b3f0-95ebf1973e4b": + {"volume":81000,"price":2810,"saleValue":2810,"networth":224663343.5,"members":4,"timestamp":1655806533380}, + ... + } +] + +42[ "mutualFundStatUpdate", + {"funds": + {"f9df4304-e084-494a-b918-9b5ab1c6842b": + {"volume":1000,"price":100000,"saleValue":100000,"networth":100000000,"members":1}, + "851a75f1-19cf-4aad-b3f0-95ebf1973e4b": + {"volume":81000,"price":2810,"saleValue":2810,"networth":224663343.5,"members":4}, + ... + } + } +] + +42[ "mutualFundMembersUpdate", + {"type":"add", + "fund":"851a75f1-19cf-4aad-b3f0-95ebf1973e4b", + "username":"Bombshell Blondes Banking", + "userid":"726b5a5e-9dc0-438e-b6be-fb901a51f482"} +] + +42[ "mutualFundOrderUpdate", + {"fund":"851a75f1-19cf-4aad-b3f0-95ebf1973e4b","buys":0,"sells":0} +] + +42[" mutualFundMakePublicUpdate", + {"fund":"f9df4304-e084-494a-b918-9b5ab1c6842b"} +] + + +XHR: + +{"success":true,"funds": +{"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f": + {"id":"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f", + "name":"Miyatsuko", + "dateCreated":"2022-06-20T21:04:21.129Z", + "icon":"inanis", + "missionStatement":"Abayo Chocoball", + "tag":"造" + ,"color":"e0ae2f", + "ceo":"10dcbbd5-f761-44f3-92ce-ea5b01f1d747", + "balance":1228441634.6, + "released":true, + "members": + [ {"id":"10dcbbd5-f761-44f3-92ce-ea5b01f1d747","username":"[造] Miyatsuko-Priestess Treasury","boardMember":true}, + {"id":"135c986e-0cc3-45d1-ad9a-77725a99d44f","username":"[造] Touching Fluffy Tails Management Co.","boardMember":true}, + ... + ], + "portfolio": + { "kronii":{"amount":8,"timestamp":1655760060678,"meanPurchasePrice":16719.18}, + "inanis":{"amount":8,"timestamp":1655760516709,"meanPurchasePrice":27564.07}, + ... + } + }, + ..., + ..., + ... +}, +"bulletins": + {"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":[], + "7b24a25b-8f78-42b6-85ff-1d3f62c7132a":[ + {"id":"3e7a8c16-2e44-42aa-91ee-57674378982f", + "fund":"7b24a25b-8f78-42b6-85ff-1d3f62c7132a", + "author":"f7c3b57a-8b4f-4715-9f37-dfcc81fb0c18", + "authorName":"[狐] Kitsune Holdings 🌽🦊🍔", + "timestamp":"2022-06-20T22:54:11.252Z", + "message":"OhaKOOOOOOOOOOOOOOOOOOON" + } + ], + ... + }, + +"fundHistory":{ + "4a4afb61-7a1e-4f99-bbec-fd50d0529c5f": + [{"timestamp":"2022-06-20T21:04:21.129Z","volume":100000,"price":16000,"networth":1600000000,"members":1}], + "81570634-37bb-4e10-b2cb-ea1bf955fc53": + [{"timestamp":"2022-06-20T21:04:24.109Z","volume":1000,"price":25000,"networth":25000000,"members":1}], + ... + }, +"fundStats":{ + "4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":{ + "volume":100000, + "price":16000, + "saleValue":16000, + "networth":1584327512, + "members":10, + "history":[ + {"volume":100000,"price":16000,"saleValue":16000,"networth":1600000000,"members":1,"timestamp":1655759061132}, + {"volume":100000,"price":16000,"saleValue":16000,"networth":1600000000,"members":8,"timestamp":1655759714668}, + ... + ] + }, + ... +} + + +"orders":{ + "779ee96f-5a24-4244-b1ce-a8cc540f0cbd": + {"buys":6980,"sells":0}, + "6796531e-8668-49d1-bb01-f07db4264ce6": + {"buys":12157,"sells":0}, + ... +} +"fundPayouts":{}, +"fundsToDissolve":{} diff --git a/connections/api/api.cpp b/connections/api/api.cpp deleted file mode 100644 index 348c5d8..0000000 --- a/connections/api/api.cpp +++ /dev/null @@ -1,11 +0,0 @@ -#include "api.h" - -class nsfq_proxy_api { -public: - -private: - /* HTTP wrapper to interface with the server. */ - - /* Websocket used to interface with the server. */ - websocket_endpoint ws_endpoint; -} diff --git a/connections/api/api.h b/connections/api/api.h deleted file mode 100644 index 224a668..0000000 --- a/connections/api/api.h +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef _API_H_ -#define _API_H_ - -#include -#include -#include - -#include -#include - -#include -#include - -#include - -#include "../parser/parser.h" -#include "../common/common.h" -#include "../safe_queue/safe_queue.h" -#include "../my_ssl/my_ssl.h" - -#endif - diff --git a/connections/aux/complete_db.cpp b/connections/aux/complete_db.cpp new file mode 100644 index 0000000..55cb9b1 --- /dev/null +++ b/connections/aux/complete_db.cpp @@ -0,0 +1,60 @@ +#include "complete_db.h" + + +ws_msg_parsed raw_msg_parse(nlohmann::json op) { + + ws_msg_parsed rop; + rop.coin = op["coin"]; + rop.type = op["type"]; + rop.userid = op["userid"]; + rop.quantity = op["quantity"]; + rop.timestamp = op["timestamp"]; + rop.completed = op["completed"]; + rop.timestamp = op["timestamp"]; + rop.price = op["price"]; + + return rop; +} + +int complete_db(long ts_up, unsigned int days, bool check) { + long tmp_ts; + std::string BASE_URL = "https://nasfaq.biz/api/getHistory?timestamp={}"; + std::string TMP_URL; + nlohmann::json res_json; + ws_msg_parsed tmp; + nsfq_http::http_connector::connector c; + + /* Open up database connection. */ + pqxx::connection C("dbname = nasfaq user = steaky \ + hostaddr = 127.0.0.1 port = 5432"); + + if (C.is_open()) { + std::cout << "Opened database successfully: " << C.dbname() << std::endl; + } else { + std::cout << "Can't open database" << std::endl; + return 1; + } + + /* Update database if needed */ + tmp_ts = ts_up; + for(int i = 0; i < days; i++) { + std::cout << "Fetching history for ts = " << tmp_ts << std::endl; + tmp_ts -= 60*60*24*1000; + TMP_URL = fmt::format(BASE_URL, tmp_ts); + c.get((const char*)TMP_URL.c_str()); + + + res_json = nlohmann::json::parse(c.get_buffer()); + + tmp = {}; + if(!res_json.empty()) { + for(auto const & x:res_json["history"]["transactions"]) { + tmp.transaction_list.push_back(raw_msg_parse(x)); + } + } + + if(check) db::push_uhistory(&C, tmp); + else db::push_history(&C, tmp); + } + return 0; +} diff --git a/connections/aux/complete_db.h b/connections/aux/complete_db.h new file mode 100644 index 0000000..0323658 --- /dev/null +++ b/connections/aux/complete_db.h @@ -0,0 +1,7 @@ +#include "../common/common.h" +#include "../http/http_connector.h" +#include "../sql/db_init.h" +#include "../sql/db_handle.h" + +int complete_db(long ts_up, unsigned int days, bool check); + diff --git a/connections/client/client.cpp b/connections/client/client.cpp deleted file mode 100644 index c242dbc..0000000 --- a/connections/client/client.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "client.h" - -namespace proxy { - client::client(void) { - m_q = SafeQueue; - m_endpoint = - - } -} diff --git a/connections/client/client.h b/connections/client/client.h deleted file mode 100644 index 917f027..0000000 --- a/connections/client/client.h +++ /dev/null @@ -1,24 +0,0 @@ -#ifndef _CLIENT_H_ -#define _CLIENT_H_ - -#include "../common/common.h" -#include "../ws/ssl_ws.h" - -namespace proxy { - /** - NASFAQ proxy client. - */ - class client { - public: - client(void); - ~client(void); - - void init(void); - private: - SafeQueue q; - ws::websocket_endpoint endpoint; - } -} - -#endif - diff --git a/connections/common/common.cpp b/connections/common/common.cpp index 65d41dd..3d519fa 100644 --- a/connections/common/common.cpp +++ b/connections/common/common.cpp @@ -20,11 +20,12 @@ std::ostream& operator<< (std::ostream& stream, WS_MSG const & type) { } /** + //TODO: GOES INTO pretty_print.cpp Pretty prints COIN_PRICE events. */ template<> -std::ostream& operator<< (std::ostream& stream, ws_msg_parsed const & op) { - stream << "WS_EVENT_COIN_PRICE_UPDATE:" +std::ostream& operator<< (std::ostream& stream, ws_msg_parsed const & op) { + stream << "WS_EVENT_COIN_PRICE:" << " Coin: " << op.coin << " Price: " << op.price << " SaleValue: " << op.saleValue @@ -49,11 +50,46 @@ std::ostream& operator<< (std::ostream& stream, ws_msg_parsed -std::ostream& operator<< (std::ostream& stream, ws_msg_parsed const & op) { - stream << "WS_EVENT_HISTORY_UPDATE:\n"; +std::ostream& operator<< (std::ostream& stream, ws_msg_parsed const & op) { + stream << "WS_EVENT_MF_PORTFOLIO:" + << " Fund: " << op.fund + << " Event: " << op.event << std::endl + << "Transactions: " << std::endl; + + for(auto & element : op.transactions) { + stream << "\t" << element << std::endl; + } + + stream << "Portfolio: " << std::endl; + + for(auto & element : op.portfolio) { + stream << "\t" << element << std::endl; + } + + return stream; +} + +/** + Pretty prints the transactions held in the WS_EVENT_HISTORY vector. +*/ +template<> +std::ostream& operator<< (std::ostream& stream, ws_msg_parsed const & op) { + stream << "WS_EVENT_HISTORY:\n"; for(auto & element : op.transaction_list) { stream << "\t" << element << std::endl; } diff --git a/connections/common/common.h b/connections/common/common.h index 03adbef..4d95197 100644 --- a/connections/common/common.h +++ b/connections/common/common.h @@ -13,7 +13,7 @@ #include -//#include "../safe_queue/safe_queue.h" +// TODO: REFACTOR ALL THIS TRASH HOLY SHIT /*********************************************************************************** WEBSOCKET MESSAGE STRUCTURES @@ -58,7 +58,7 @@ struct ws_msg_parsed; coinPriceUpdate structure */ template <> -struct ws_msg_parsed { +struct ws_msg_parsed { std::string coin; float price; float saleValue; @@ -66,7 +66,7 @@ struct ws_msg_parsed { }; /** - Auxiliary type for WS_EVENT_HISTORY_UPDATE. + Auxiliary type for WS_EVENT_HISTORY. This holds individual transactions as handed by the websocket in a list. */ template <> @@ -78,16 +78,43 @@ struct ws_msg_parsed { long timestamp; bool completed; float price; + bool fund; }; /** historyUpdate structure holding transactions. */ template<> -struct ws_msg_parsed { +struct ws_msg_parsed { std::vector> transaction_list; }; +/*********************************************************************************** + MUTUAL FUNDS +***********************************************************************************/ +/** + Portfolio type TODO: Make this global, could be used with users' wallets + nasfaq::types::portfolio::coin +*/ +typedef struct portfolio_coin_t { + std::string coin; //TODO: make this an enum + uint amount; + long ts; + float mpp; +} portfolio_coin_t ; + + +/** + Portfolio update +*/ +template <> +struct ws_msg_parsed { + std::string fund; + std::string event; // TODO: type for this + std::vector> transactions; + std::vector portfolio; +}; + /*********************************************************************************** WEBSOCKET MESSAGE STRUCTURES FUNCTIONS ***********************************************************************************/ diff --git a/connections/common/formatting.cpp b/connections/common/formatting.cpp new file mode 100644 index 0000000..ac312a5 --- /dev/null +++ b/connections/common/formatting.cpp @@ -0,0 +1,30 @@ +#include "formatting.h" + +/* + See https://stackoverflow.com/questions/2896600/how-to-replace-all-occurrences-of-a-character-in-string +*/ +void replace_all(std::string& str, const std::string& from, const std::string& to) { + size_t start_pos = 0; + while((start_pos = str.find(from, start_pos)) != std::string::npos) { + str.replace(start_pos, from.length(), to); + start_pos += to.length(); // Handles case where 'to' is a substring of 'from' + } +} + +/* + Splits a string "{...}, ..., {...}" in an array ["{...}", ..., "{...}"]. + ONLY HANDLES DICTIONARIES OF DEPTH 1. +*/ +std::vector tokenize_json_array(std::string op, std::string token) { + int start = 0; + int end = op.find("}"); + std::vector ret; + + while( end != -1 ) { + ret.push_back(op.substr(start, end - start + 1)); + start = end + token.size() + 1; // + 1 accounts for the ",". + end = op.find(token, start); + } + + return ret; +} diff --git a/connections/common/formatting.h b/connections/common/formatting.h new file mode 100644 index 0000000..d1dbd7f --- /dev/null +++ b/connections/common/formatting.h @@ -0,0 +1,16 @@ +#ifndef _FORMATTING_H_ +#define _FORMATTING_H_ + +#include +#include +#include + +#include "../common/common.h" +#include + +void replace_all(std::string&, const std::string&, const std::string&); + +std::vector tokenize_json_array(std::string, std::string token = "}"); + +#endif + diff --git a/connections/common/ws_msg.def b/connections/common/ws_msg.def index ce0eb6f..e413824 100644 --- a/connections/common/ws_msg.def +++ b/connections/common/ws_msg.def @@ -1,13 +1,20 @@ -X(WS_EVENT_COIN_PRICE_UPDATE) -X(WS_EVENT_TRANSACTION) -X(WS_EVENT_HISTORY_UPDATE) -X(WS_EVENT_BROKER_FEE_UPDATE) -X(WS_EVENT_TODAY_PRICES_UPDATE) -X(WS_EVENT_LEADERBOARD_UPDATE) -X(WS_EVENT_FLOOR_UPDATE) -X(WS_EVENT_AUCTION_UPDATE) -X(WS_EVENT_BENCHMARK_UPDATE) -X(WS_EVENT_SUPERCHAT_UPDATE) -X(WS_EVENT_GACHA_UPDATE) -X(WS_EVENT_ADD_MESSAGE_GLOBAL) X(WS_EVENT_UNKNOWN) +X(WS_EVENT_COIN_PRICE) +X(WS_EVENT_TRANSACTION) +X(WS_EVENT_HISTORY) +X(WS_EVENT_BROKER_FEE) +X(WS_EVENT_TODAY_PRICES) +X(WS_EVENT_LEADERBOARD) +X(WS_EVENT_FLOOR) +X(WS_EVENT_AUCTION) +X(WS_EVENT_BENCHMARK) +X(WS_EVENT_SUPERCHAT) +X(WS_EVENT_GACHA) +X(WS_EVENT_ADD_MESSAGE_GLOBAL) +X(WS_EVENT_MF_PORTFOLIO) +X(WS_EVENT_MF_BALANCE) +X(WS_EVENT_MF_RUNNINGHISTORY) +X(WS_EVENT_MF_STAT) +X(WS_EVENT_MF_MEMBERS) +X(WS_EVENT_MF_ORDER) +X(WS_EVENT_MF_MAKEPUBLIC) diff --git a/connections/parser/parser.cpp b/connections/parser/parser.cpp index 48b5ffc..7e24f48 100644 --- a/connections/parser/parser.cpp +++ b/connections/parser/parser.cpp @@ -1,110 +1,10 @@ #include "parser.h" - namespace parser { -WS_MSG msg_type_detect(std::string op) { - WS_MSG ret; - - if (op.substr(0, 30).find("coinPriceUpdate") != std::string::npos) { - ret = WS_EVENT_COIN_PRICE_UPDATE; - } else if (op.substr(0, 30).find("historyUpdate") != std::string::npos) { - ret = WS_EVENT_HISTORY_UPDATE; - } else if (op.substr(0, 30).find("todayPricespdate") != std::string::npos) { - ret = WS_EVENT_TODAY_PRICES_UPDATE; - } else if (op.substr(0, 30).find("brokerFeeUpdate") != std::string::npos) { - ret = WS_EVENT_BROKER_FEE_UPDATE; - } else { - ret = WS_EVENT_UNKNOWN; - } - return ret; -} - -template -ws_msg_parsed parse(std::string rmsg) {}; - -template <> -ws_msg_parsed raw_msg_parse(std::string rmsg) { - nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */ - - ws_msg_parsed rop; - rop.coin = jparsed["coin"]; - rop.price = (jparsed["info"])["price"]; - rop.saleValue = (jparsed["info"])["saleValue"]; - rop.inCirculation = (jparsed["info"])["inCirculation"]; - - return rop; -} - -/**************************************************************** - Helper functions should go in common or something more general -****************************************************************/ - -/* - See https://stackoverflow.com/questions/2896600/how-to-replace-all-occurrences-of-a-character-in-string -*/ -static inline void replace_all(std::string& str, const std::string& from, const std::string& to) { - size_t start_pos = 0; - while((start_pos = str.find(from, start_pos)) != std::string::npos) { - str.replace(start_pos, from.length(), to); - start_pos += to.length(); // Handles case where 'to' is a substring of 'from' - } -} - -/* - Splits a string "{...}, ..., {...}" in an array ["{...}", ..., "{...}"]. - ONLY HANDLES DICTIONARIES OF DEPTH 1. -*/ -std::vector tokenize_json_array(std::string op, std::string token = "}") { - int start = 0; - int end = op.find("}"); - std::vector ret; - - while( end != -1 ) { - ret.push_back(op.substr(start, end - start + 1)); - start = end + token.size() + 1; // + 1 accounts for the ",". - end = op.find(token, start); - } - - return ret; -} - -template <> -ws_msg_parsed raw_msg_parse(std::string rmsg) { - nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */ - - ws_msg_parsed rop; - rop.coin = jparsed["coin"]; - rop.type = jparsed["type"]; - rop.userid = jparsed["userid"]; - rop.quantity = jparsed["quantity"]; - rop.timestamp = jparsed["timestamp"]; - rop.completed = jparsed["completed"]; - rop.timestamp = jparsed["timestamp"]; - rop.price = jparsed["price"]; - - return rop; -} - -template<> -ws_msg_parsed raw_msg_parse(std::string rmsg) { - std::vector raw_vect; - ws_msg_parsed rop; - - /* Replace \" by " */ - replace_all(rmsg, "\\\"", "\""); - - /* Extract array */ - raw_vect = tokenize_json_array(rmsg); - - /* Create the output array by parsing each transaction elements */ - for(auto & raw_tr : raw_vect) { - rop.transaction_list.push_back(raw_msg_parse(raw_tr)); - } - - return rop; -} - +/******************************************************************************* + Parser object +*******************************************************************************/ parser::parser(ws::connection_metadata::ptr metadata, pqxx::connection* C) : m_metadata(metadata) , m_process_queue_state(false) @@ -112,7 +12,6 @@ parser::parser(ws::connection_metadata::ptr metadata, pqxx::connection* C) {} parser::~parser() { - } void parser::process_queue() { @@ -125,25 +24,35 @@ void parser::process_queue() { while(m_process_queue_state) { raw_data = m_metadata->pop_message(); - type = msg_type_detect(raw_data); + + type = types::extract(raw_data); + + // TODO: make a function for each of these cases and use a switch as in parser_aux /* POSTGRESQL STUFF GOES HERE */ - if (type == WS_EVENT_COIN_PRICE_UPDATE) { + if (type == WS_EVENT_COIN_PRICE) { /* 18 = 3 + 15 */ raw_data = raw_data.substr(18, raw_data.length()-18); - ws_msg_parsed parsed_msg = raw_msg_parse(raw_data); + ws_msg_parsed parsed_msg = single(raw_data); std::cout << parsed_msg << std::endl; db::push_coin_price(m_connection, parsed_msg); - } else if (type == WS_EVENT_HISTORY_UPDATE) { + } else if (type == WS_EVENT_HISTORY) { raw_data = raw_data.substr(18, raw_data.length()-18); - ws_msg_parsed parsed_msg = raw_msg_parse(raw_data); + ws_msg_parsed parsed_msg = single(raw_data); std::cout << parsed_msg << std::endl; db::push_history(m_connection, parsed_msg); //std::cout << "\x1B[31mTexting\033[0m\t\t" << std::endl; } + else if (type == WS_EVENT_MF_PORTFOLIO) { + raw_data = raw_data.substr(28, raw_data.length()-28); + //raw_data = payload::extract(raw_data); + ws_msg_parsed parsed_msg = single(raw_data); + + std::cout << parsed_msg << std::endl; + } } } @@ -168,9 +77,140 @@ void parser::process_queue_thread_join() pthread_join(m_process_queue_thread, 0); } - void parser::process_queue_stop() { m_process_queue_state = false; } +/******************************************************************************* + Message parsing +*******************************************************************************/ +//template +//ws_msg_parsed parse(std::string rmsg) {}; + +/* + Takes as input a raw string associated to a coinPriceUpdate ws event and returns a parsed representation of it. +*/ +template <> +ws_msg_parsed single(std::string rmsg) { + nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */ + + ws_msg_parsed rop; + rop.coin = jparsed["coin"]; + rop.price = (jparsed["info"])["price"]; + rop.saleValue = (jparsed["info"])["saleValue"]; + rop.inCirculation = (jparsed["info"])["inCirculation"]; + + return rop; } + +/* + Takes as input a raw string associated to a transaction ws event and returns a parsed representation of it. +*/ +template <> +ws_msg_parsed single(std::string rmsg) { + nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */ + + ws_msg_parsed rop; + + rop.coin = jparsed["coin"]; + rop.type = jparsed["type"]; + rop.userid = jparsed["userid"]; + rop.quantity = jparsed["quantity"]; + rop.timestamp = jparsed["timestamp"]; + rop.completed = jparsed["completed"]; + rop.timestamp = jparsed["timestamp"]; + rop.price = jparsed["price"]; + // rop.fund = jparsed["fund"]; + + return rop; +} + +/* + Same as above but takes a json as input. +*/ +template <> +ws_msg_parsed single_j(nlohmann::json op) { + + ws_msg_parsed rop; + + rop.coin = op["coin"]; + rop.type = op["type"]; + rop.userid = op["userid"]; + rop.quantity = op["quantity"]; + rop.timestamp = op["timestamp"]; + rop.completed = op["completed"]; + rop.timestamp = op["timestamp"]; + if(op["price"] == nullptr) { + rop.price = 0; + } else { + rop.price = op["price"]; + } + // rop.fund = op["fund"]; + + return rop; +} + +/* + Takes as input a raw string associated to a historyUpdate ws event and returns a parsed representation of it. +*/ +template<> +ws_msg_parsed single(std::string rmsg) { + std::vector raw_vect; + ws_msg_parsed rop; + + /* Replace \" by " */ + replace_all(rmsg, "\\\"", "\""); + + /* Extract array */ + raw_vect = tokenize_json_array(rmsg); + + /* Create the output array by parsing each transaction elements */ + for(auto & raw_tr : raw_vect) { + rop.transaction_list.push_back(single(raw_tr)); + } + + return rop; +} +///////////////////////////////// TODO: PUT THIS FUCKING TRASH SOMEWHERE ELSE ///////// +/* + Takes as input a json and returns its representation as portfolio_coin_t. +*/ +portfolio_coin_t parse_portfolio_coin(std::string coin, nlohmann::json op) { + portfolio_coin_t rop; + + rop.coin = coin; + rop.amount = op["amount"]; + rop.ts = op["timestamp"]; + rop.mpp = op["meanPurchasePrice"]; + + return rop; +} +//////////////////////////////////////////////////////////////////////////// + +/* + Takes as input a raw string associated to a mutualFundPortfolioUpdate as ws event and returns a parsed representation of it. +*/ +template <> +ws_msg_parsed single(std::string rmsg) { + ws_msg_parsed rop; + + nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */ + + rop.fund = jparsed["fund"]; + rop.event = (jparsed["tUpdate"])["event"]; + + for(auto & raw_tr : (jparsed["tUpdate"])["transactions"]) { + + rop.transactions.push_back(single_j(raw_tr)); + } + + if((jparsed["tUpdate"])["portfolio"] != NULL) { + for(auto & item : (jparsed["tUpdate"])["portfolio"].items()) { + rop.portfolio.push_back(parse_portfolio_coin(item.key(), item.value())); + } + } + + return rop; +} + +} //parser diff --git a/connections/parser/parser.h b/connections/parser/parser.h index ef8cc2e..30b0dd9 100644 --- a/connections/parser/parser.h +++ b/connections/parser/parser.h @@ -9,24 +9,17 @@ #include "../sql/db_handle.h" #include "../common/common.h" +#include "../common/formatting.h" #include "../ws/ssl_ws.h" +#include "./parser_aux.h" +//TODO: Remove ws here and use a safequeue + namespace parser { -WS_MSG msg_type_detect(std::string); - -template -ws_msg_parsed raw_msg_parse(std::string); - -template <> -ws_msg_parsed raw_msg_parse(std::string); - -template<> -ws_msg_parsed raw_msg_parse(std::string); - -template<> -ws_msg_parsed raw_msg_parse(std::string); - +/******************************************************************************* + Parser object +*******************************************************************************/ class parser { public: parser(ws::connection_metadata::ptr, pqxx::connection*); @@ -45,7 +38,30 @@ private: static void* process_queue_helper(void*); }; -} +/******************************************************************************* + Message parsing +*******************************************************************************/ +template +ws_msg_parsed single(std::string); +template +ws_msg_parsed single_j(nlohmann::json); + +template <> +ws_msg_parsed single(std::string); + +template<> +ws_msg_parsed single(std::string); + +template <> +ws_msg_parsed single_j(nlohmann::json ); + +template<> +ws_msg_parsed single(std::string); + +template <> +ws_msg_parsed single(std::string); + +} // parser #endif diff --git a/connections/parser/parser_aux.cpp b/connections/parser/parser_aux.cpp new file mode 100644 index 0000000..64c68b6 --- /dev/null +++ b/connections/parser/parser_aux.cpp @@ -0,0 +1,57 @@ +#include "parser_aux.h" + +/******************************************************************************* + Types. +*******************************************************************************/ +namespace parser::types { +/* + Takes a string as input and returns a WS_MSG corresponding to the underlying event type. + Returns WS_EVENT_UNKNOWN if no match is detected. + TODO: handle malformed strings, only one ",... +*/ +WS_MSG extract(std::string op) { + WS_MSG ret; + std::size_t pos_beg; + std::size_t pos_end; + std::string raw_type; + + // Find raw_type as first occurence of "RAW_TYPE" in op + pos_beg = op.find_first_of('"'); + pos_end = op.find_first_of('"', pos_beg + 1); + raw_type = op.substr(pos_beg+1, pos_end - pos_beg-1); + + + switch(map_rawIdent[raw_type]) { + + case(0): ret = WS_EVENT_UNKNOWN; break; + case(1): ret = WS_EVENT_COIN_PRICE; break; + case(2): ret = WS_EVENT_HISTORY; break; + case(3): ret = WS_EVENT_TODAY_PRICES; break; + case(4): ret = WS_EVENT_BROKER_FEE; break; + + case(10): ret = WS_EVENT_MF_PORTFOLIO; break; + case(11): ret = WS_EVENT_MF_BALANCE; break; + case(12): ret = WS_EVENT_MF_RUNNINGHISTORY; break; + case(13): ret = WS_EVENT_MF_STAT; break; + case(14): ret = WS_EVENT_MF_MEMBERS; break; + case(15): ret = WS_EVENT_MF_ORDER; break; + case(16): ret = WS_EVENT_MF_MAKEPUBLIC; break; + + default: ret = WS_EVENT_UNKNOWN; + } + return ret; +} +} // parser::type + +namespace parser::payload { + std::string extract(std::string op) { + std::string ret; + std::size_t tmp; + + // Find second occurence of ". Raw payloads are of the form 42["RAW_TYPE",PAYLOAD ] + tmp = op.find_first_of('"', 3) + 2; // Wew hopefully that doesn't break + ret = op.substr(tmp, op.length() - tmp); + + return ret; + } +} diff --git a/connections/parser/parser_aux.h b/connections/parser/parser_aux.h new file mode 100644 index 0000000..dde5f65 --- /dev/null +++ b/connections/parser/parser_aux.h @@ -0,0 +1,41 @@ +#ifndef _PARSER_AUX_H_ +#define _PARSER_AUX_H_ + +#include "../common/common.h" + +/******************************************************************************* + Types +*******************************************************************************/ +namespace parser::types { + +/* + Raw message identifiers and int conversion. +*/ +static std::map map_rawIdent = { + {"unknown", 0}, + {"coinPriceUpdate", 1}, + {"historyUpdate", 2}, + {"todayPricespdate", 3}, + {"brokerFeeUpdate", 4}, + {"mutualFundPortfolioUpdate", 10}, + {"mutualFundBalanceUpdate", 11}, + {"mutualFundRunningHistoryUpdate", 12}, + {"mutualFundStatUpdate" , 13}, + {"mutualFundMembersUpdate" , 14}, + {"mutualFundOrderUpdate" , 15}, + {"mutualFundMakePublicUpdate" , 16} +}; + +/* + Takes a string as input and returns its corresponding WS_MSG type. +*/ +WS_MSG extract(std::string); + +} // parser::types + +namespace parser::payload { + std::string extract(std::string); + + } // parser::payload +#endif + diff --git a/connections/safe_queue/safe_queue.cpp b/connections/safe_queue/safe_queue.cpp deleted file mode 100644 index 7501531..0000000 --- a/connections/safe_queue/safe_queue.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include "safe_queue.h" - -template -TestQueue::TestQueue(void) - : q() -{} - -template -TestQueue::~TestQueue(void) -{} - -template -SafeQueue::SafeQueue(void) - : q() - , m() - , c() -{} - -template -SafeQueue::~SafeQueue(void) -{} - -// Add an element to the queue. -template -void SafeQueue::enqueue(T t) -{ - std::lock_guard lock(m); - q.push(t); - c.notify_one(); -} - -// Get the "front"-element. -// If the queue is empty, wait till a element is avaiable. -template -T SafeQueue::dequeue(void) { - std::unique_lock lock(m); - while(q.empty()) - { - // release lock as long as the wait and reaquire it afterwards. - c.wait(lock); - } - T val = q.front(); - q.pop(); - return val; -} - -template -bool SafeQueue::empty(void) { - return q.empty(); -} diff --git a/connections/safe_queue/safe_queue.h b/connections/safe_queue/safe_queue.h deleted file mode 100644 index c972965..0000000 --- a/connections/safe_queue/safe_queue.h +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef _SAFE_QUEUE_H_ -#define _SAFE_QUEUE_H_ - -#include -#include -#include - -// A threadsafe-queue. -template -class TestQueue -{ -public: - TestQueue(); - ~TestQueue(); -private: - std::queue q; -}; - -// A threadsafe-queue. -template -class SafeQueue -{ -public: - SafeQueue(void); - ~SafeQueue(void); - - void enqueue(T); - T dequeue(void); - bool empty(void); -private: - std::queue q; - mutable std::mutex m; - std::condition_variable c; -}; -#endif - diff --git a/connections/sql/db_handle.cpp b/connections/sql/db_handle.cpp index 46df4a2..913e04a 100644 --- a/connections/sql/db_handle.cpp +++ b/connections/sql/db_handle.cpp @@ -1,122 +1,34 @@ #include "db_handle.h" namespace db { - namespace query { - /* Query templates for HISTORY. */ - //TODO: Move these definitions to .h file. - const std::string query_template_transaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\ - "VALUES('{}', {}, '{}', {}, {}, {}, {});"; - const std::string query_template_history_ts = "SELECT * FROM HISTORY WHERE TIMESTAMP <= {} AND TIMESTAMP > {};"; - const std::string query_template_userid_ts = "SELECT DISTINCT USERID FROM HISTORY WHERE TIMESTAMP <= {} AND TIMESTAMP > {};"; + void commit(pqxx::connection *C, std::string query) { + /* Create a transactional object. */ + pqxx::work W(*C); - const std::string query_template_slope_ts_top = "SELECT PRICE, TIMESTAMP FROM HISTORY "\ - "WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\ - "ORDER BY TIMESTAMP ASC "\ - "LIMIT 1;"; - const std::string query_template_transactions_userid_ts = "SELECT * FROM HISTORY WHERE USERID = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {};"; - - const std::string query_template_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\ - "WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\ - "ORDER BY TIMESTAMP DESC "\ - "LIMIT 1;"; - - const std::string query_template_last_n_spaced_prices = "select PRICE, TIMESTAMP FROM last_n_spaced_prices('{}', {}, {});"; - - const std::string query_template_user = "INSERT INTO USERS(USERID, USERNAME, ICON, NETWORTH) "\ - "VALUES('{}', E'{}', '{}', {}) " - "ON CONFLICT (USERID) DO UPDATE SET "\ - "NETWORTH = {};"; - - const std::string query_template_userid_to_username = "select USERNAME FROM USERS WHERE USERID = '{}'"; - - /******************************************************************************** - GLOBAL QUERIES - ********************************************************************************/ - - std::string make_push_query_transaction( ws_msg_parsed op ) { - return fmt::format(query_template_transaction, - op.coin, - op.type, - op.userid, - op.quantity, - op.timestamp, - op.completed, - op.price); - } - - std::string make_pull_query_history_ts( long ts_high, long ts_low ) { - return fmt::format(query_template_history_ts, ts_high, ts_low); - } - - std::string make_pull_query_userid_list_ts( long ts_high, long ts_low ) { - return fmt::format(query_template_userid_ts, ts_high, ts_low); - } - - std::string make_pull_query_transactions_userid_ts( std::string userid, long ts_high, long ts_low ) { - return fmt::format(query_template_transactions_userid_ts, userid, ts_high, ts_low); - } - - std::string make_pull_query_top_price_cycle_ts( std::string coin, long ts_high, long ts_low ) { - return fmt::format(query_template_slope_ts_top, coin, ts_high, ts_low); - } - - std::string make_pull_query_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low ) { - return fmt::format(query_template_slope_ts_bot, coin, ts_high, ts_low); - } - - - /* Query templates for COIN PRICE. */ - const std::string query_template_coin_price = "INSERT INTO COINPRICE(COIN, PRICE, SALEVALUE, INCIRCULATION, LASTUPDATE) "\ - "VALUES('{}', {}, {}, {}, {}) "\ - "ON CONFLICT (COIN) DO UPDATE SET "\ - "PRICE = {}, SALEVALUE = {}, INCIRCULATION = {}, LASTUPDATE = {};"; - - std::string make_push_query_coin_price( ws_msg_parsed op ) { - return fmt::format(query_template_coin_price, - op.coin, - op.price, - op.saleValue, - op.inCirculation, - std::time(NULL)*1000, - op.price, - op.saleValue, - op.inCirculation, - std::time(NULL)*1000); - } - - /* Query templates for math functions. */ - std::string make_pull_query_last_n_spaced_prices( std::string coin, int nb_cycles, int delta) { - return fmt::format(query_template_last_n_spaced_prices, - coin, - nb_cycles, - delta); - } - - std::string make_push_query_user(user_t user){ - return fmt::format(query_template_user, - user.userid, - user.username, - user.icon, - user.networth, - user.networth); - } - - std::string make_pull_query_userid_to_username(std::string userid) { - return fmt::format(query_template_userid_to_username, - userid); - } + /* Execute SQL query */ + W.exec( query ); + W.commit(); } +} - /******************************************************************************** - HISTORY - ********************************************************************************/ - +namespace db::push { /* Add transactions contained in history update to the database. */ - void push_history( pqxx::connection* C, ws_msg_parsed op ) { + void push_history( pqxx::connection* C, ws_msg_parsed op ) { std::string query; for(auto& element : op.transaction_list){ - query += query::make_push_query_transaction(element); + query += query::fmt::p_query_transaction(element); + } + + db::commit(C, query); + } + + /* Same as push_history but checks for unique timestamp. */ + void push_uhistory( pqxx::connection* C, ws_msg_parsed op ) { + std::string query; + + for(auto& element : op.transaction_list){ + query += query::make_push_query_utransaction(element); } /* Create a transactional object. */ @@ -127,17 +39,18 @@ namespace db { W.commit(); } - /* Returns the last cycle in form of a ws_msg_parsed struct. */ - ws_msg_parsed pull_last_cycle(pqxx::connection* C) { + /* Returns the last cycle in form of a ws_msg_parsed struct. */ + ws_msg_parsed pull_last_cycle(pqxx::connection* C) { std::string query; long ts_high, ts_low; ws_msg_parsed tmp; - ws_msg_parsed rop; + ws_msg_parsed rop; ts_high = time(NULL)*1000; ts_low = ts_high - 600*1000; query = query::make_pull_query_history_ts(ts_high, ts_low); + //std::cout << query << std::endl; /* Create a non-transactional object. */ pqxx::nontransaction N(*C); @@ -178,15 +91,14 @@ namespace db { for (pqxx::result::const_iterator c = R.begin(); c != R.end(); ++c) { rop.push_back(c[0].as()); } - return rop; } /* Fetches all transactions by userid during period ts_low to ts_high. */ - ws_msg_parsed pull_transactions_userid_ts( pqxx::connection *C, std::string userid, long ts_high, long ts_low) { + ws_msg_parsed pull_transactions_userid_ts( pqxx::connection *C, std::string userid, long ts_high, long ts_low) { std::string query; ws_msg_parsed tmp; - ws_msg_parsed rop; + ws_msg_parsed rop; query = query::make_pull_query_transactions_userid_ts(userid, ts_high, ts_low); @@ -213,19 +125,21 @@ namespace db { } /* Fetches last cycle's transactions for userid. */ - ws_msg_parsed pull_last_cycle_userid( pqxx::connection *C, std::string userid) { + ws_msg_parsed pull_last_cycle_userid( pqxx::connection *C, std::string userid) { long ts_high, ts_low; + ws_msg_parsed rop; ts_high = time(NULL)*1000; - ts_low = ts_high - 600*1000; + ts_low = ts_high - 60*10*1000; - return pull_transactions_userid_ts(C, userid, ts_high, ts_low); + rop = pull_transactions_userid_ts(C, userid, ts_high, ts_low); + return rop; } /******************************************************************************** COIN PRICE ********************************************************************************/ - void push_coin_price( pqxx::connection* C, ws_msg_parsed op) { + void push_coin_price( pqxx::connection* C, ws_msg_parsed op) { std::string query; query = query::make_push_query_coin_price(op); diff --git a/connections/sql/db_handle.h b/connections/sql/db_handle.h index 05e0181..68333d2 100644 --- a/connections/sql/db_handle.h +++ b/connections/sql/db_handle.h @@ -19,12 +19,13 @@ namespace db { /******************************************************************************** HISTORY ********************************************************************************/ - void push_history( pqxx::connection*, ws_msg_parsed ); + void push_history( pqxx::connection*, ws_msg_parsed ); + void push_uhistory( pqxx::connection*, ws_msg_parsed ); - ws_msg_parsed pull_last_cycle( pqxx::connection*); + ws_msg_parsed pull_last_cycle( pqxx::connection*); std::vector pull_userid_list_ts( pqxx::connection *, long, long); - ws_msg_parsed pull_transactions_userid_ts( pqxx::connection *, std::string, long, long); - ws_msg_parsed pull_last_cycle_userid( pqxx::connection *, std::string); + ws_msg_parsed pull_transactions_userid_ts( pqxx::connection *, std::string, long, long); + ws_msg_parsed pull_last_cycle_userid( pqxx::connection *, std::string); float pull_last_cycle_slope(pqxx::connection*, std::string); @@ -33,7 +34,7 @@ namespace db { /******************************************************************************** COIN PRICE ********************************************************************************/ - void push_coin_price( pqxx::connection*, ws_msg_parsed ); + void push_coin_price( pqxx::connection*, ws_msg_parsed ); /******************************************************************************** COIN PRICE @@ -44,6 +45,7 @@ namespace db { USERS ********************************************************************************/ void push_users( pqxx::connection*, std::vector ); + std::vector pull_userids_ts(pqxx::connection *C, long ts_low, long ts_high); std::string userid_to_username(pqxx::connection*, std::string userid); } #endif diff --git a/connections/sql/db_init.cpp b/connections/sql/db_init.cpp index fd14ecd..285179a 100644 --- a/connections/sql/db_init.cpp +++ b/connections/sql/db_init.cpp @@ -1,137 +1,112 @@ #include "db_init.h" +/******************************************************************************* + Wrapper for single querying +*******************************************************************************/ + +int single_query(pqxx::connection& C, std::string query) { + try { + pqxx::work W(C); + W.exec( query ); + W.commit(); + } catch (const std::exception &e) { + std::cerr << e.what() << std::endl; + return 1; + } + return 0; +} + namespace db { +namespace create { -int create_table_history(pqxx::connection& C) { - std::string query; +/******************************************************************************* + Table creation queries +*******************************************************************************/ +namespace table { - try { - /* Create a transactional object. */ - pqxx::work W(C); - - /* Create sql query for history table creation */ - query = "CREATE TABLE IF NOT EXISTS HISTORY(" \ - "ID SERIAL PRIMARY KEY NOT NULL," \ - "COIN CHAR(32) NOT NULL,"\ - "TYPE INT,"\ - "USERID CHAR(128) NOT NULL,"\ - "QUANTITY INT,"\ - "TIMESTAMP BIGINT NOT NULL,"\ - "COMPLETED BOOLEAN,"\ - "PRICE REAL);"; - - /* Execute SQL query */ - W.exec( query ); - W.commit(); - } catch (const std::exception &e) { - std::cerr << e.what() << std::endl; - return 1; - } - return 0; +int history(pqxx::connection& C) { + // TODO: make timestamp the primary key + // make sizes macros + std::string query = "CREATE TABLE IF NOT EXISTS HISTORY(" \ + "ID SERIAL PRIMARY KEY NOT NULL," \ + "COIN CHAR(32) NOT NULL,"\ + "TYPE INT,"\ + "USERID CHAR(128) NOT NULL,"\ + "QUANTITY INT,"\ + "TIMESTAMP BIGINT NOT NULL,"\ + "COMPLETED BOOLEAN,"\ + "PRICE REAL);"; + return single_query(C, query); } -int create_table_coin_price(pqxx::connection& C) { - std::string query; - - try { - /* Create a transactional object. */ - pqxx::work W(C); - - /* Create sql query for history table creation */ - query = "CREATE TABLE IF NOT EXISTS COINPRICE(" \ - "COIN CHAR(32) PRIMARY KEY,"\ - "PRICE REAL," \ - "SALEVALUE REAL," \ - "INCIRCULATION INT, "\ - "LASTUPDATE BIGINT);"; - - /* Execute SQL query */ - W.exec( query ); - W.commit(); - } catch (const std::exception &e) { - std::cerr << e.what() << std::endl; - return 1; - } - return 0; +int coin_price(pqxx::connection& C) { + std::string query = "CREATE TABLE IF NOT EXISTS COINPRICE(" \ + "COIN CHAR(32) PRIMARY KEY,"\ + "PRICE REAL," \ + "SALEVALUE REAL," \ + "INCIRCULATION INT, "\ + "LASTUPDATE BIGINT);"; + return single_query(C, query); } -int create_last_n_anchor_prices_function(pqxx::connection& C) { - std::string query; - - try { - /* Create a transactional object. */ - pqxx::work W(C); - - /* Create sql query for spaced prices function, delta is in seconds*/ - query = "CREATE OR REPLACE FUNCTION last_n_spaced_prices(var_coin CHAR(32), var_n INT, var_delta INT) RETURNS SETOF HISTORY AS $$" \ - "DECLARE "\ - " row HISTORY%ROWTYPE; "\ - " cur_date BIGINT = 9223372036854775806; "\ - " nb_dates INT = 0; "\ - " nb_dates_max INT = var_n; "\ - "BEGIN "\ - " FOR row IN "\ - " SELECT * "\ - " FROM HISTORY "\ - " WHERE HISTORY.COIN = var_coin "\ - " ORDER BY TIMESTAMP DESC "\ - " LOOP "\ - " IF nb_dates = nb_dates_max "\ - " THEN "\ - " EXIT; "\ - " END IF; "\ - " "\ - " IF row.TIMESTAMP <= cur_date - var_delta*1000 OR cur_date IS NULL "\ - " THEN "\ - " cur_date := row.TIMESTAMP; "\ - " nb_dates := nb_dates + 1; "\ - " RETURN NEXT row; "\ - " END IF; "\ - " END LOOP; "\ - "END; "\ - "$$ LANGUAGE plpgsql; "; - - /* Execute SQL query */ - W.exec( query ); - W.commit(); - } catch (const std::exception &e) { - std::cerr << e.what() << std::endl; - return 1; - } - return 0; +int users(pqxx::connection& C) { + std::string query = "CREATE TABLE IF NOT EXISTS USERS(" \ + "USERID CHAR(128) PRIMARY KEY,"\ + "USERNAME TEXT NOT NULL,"\ + "ICON CHAR(32) NOT NULL,"\ + "NETWORTH REAL);"; + return single_query(C, query); } +} // db::create::table -int create_table_users(pqxx::connection& C) { - std::string query; - - try { - /* Create a transactional object. */ - pqxx::work W(C); - - /* Create sql query for history table creation */ - query = "CREATE TABLE IF NOT EXISTS USERS(" \ - "USERID CHAR(128) PRIMARY KEY,"\ - "USERNAME TEXT NOT NULL,"\ - "ICON CHAR(32) NOT NULL,"\ - "NETWORTH REAL);"; - - /* Execute SQL query */ - W.exec( query ); - W.commit(); - } catch (const std::exception &e) { - std::cerr << e.what() << std::endl; - return 1; - } - return 0; +/******************************************************************************* + Function creation queries +*******************************************************************************/ +namespace function { +int last_n_anchor_prices_function(pqxx::connection& C) { + /* Create sql query for spaced prices function, delta is in seconds*/ + std::string query = "CREATE OR REPLACE FUNCTION last_n_spaced_prices(var_coin CHAR(32), var_n INT, var_delta INT) RETURNS SETOF HISTORY AS $$" \ + "DECLARE "\ + " row HISTORY%ROWTYPE; "\ + " cur_date BIGINT = 9223372036854775806; "\ + " nb_dates INT = 0; "\ + " nb_dates_max INT = var_n; "\ + "BEGIN "\ + " FOR row IN "\ + " SELECT * "\ + " FROM HISTORY "\ + " WHERE HISTORY.COIN = var_coin "\ + " ORDER BY TIMESTAMP DESC "\ + " LOOP "\ + " IF nb_dates = nb_dates_max "\ + " THEN "\ + " EXIT; "\ + " END IF; "\ + " "\ + " IF row.TIMESTAMP <= cur_date - var_delta*1000 OR cur_date IS NULL "\ + " THEN "\ + " cur_date := row.TIMESTAMP; "\ + " nb_dates := nb_dates + 1; "\ + " RETURN NEXT row; "\ + " END IF; "\ + " END LOOP; "\ + "END; "\ + "$$ LANGUAGE plpgsql; "; + return single_query(C, query); } +} // db::create::function +/******************************************************************************* + Database creation query +*******************************************************************************/ +namespace database{ -int populate() { +int all(void) { try { - pqxx::connection C("dbname = nasfaq user = steaky \ - hostaddr = 127.0.0.1 port = 5432"); + // TODO: GLOBAL INFO + pqxx::connection C("dbname = nasfaq user = steaky hostaddr = 127.0.0.1 port = 5432"); if (C.is_open()) { std::cout << "Opened database successfully: " << C.dbname() << std::endl; @@ -140,13 +115,11 @@ int populate() { return 1; } - /* Create tables. */ - create_table_history(C); - create_table_coin_price(C); - create_table_users(C); + table::history(C); + table::coin_price(C); + table::users(C); - /* Create functions. */ - create_last_n_anchor_prices_function(C); + function::last_n_anchor_prices_function(C); } catch (const std::exception &e) { std::cerr << e.what() << std::endl; @@ -154,4 +127,6 @@ int populate() { } return 0; } +} // db::create::database +} // db::create } diff --git a/connections/sql/db_init.h b/connections/sql/db_init.h index 20bf5b0..96d9721 100644 --- a/connections/sql/db_init.h +++ b/connections/sql/db_init.h @@ -3,16 +3,9 @@ #include #include -#include -#include -#include "../http/users.h" - -namespace db { - int create_table_history(pqxx::connection&); - int create_table_coin_price(pqxx::connection&); - int create_table_users(pqxx::connection&); - int populate(); +namespace db::create::database { + int all(void); } #endif diff --git a/connections/sql/queries.cpp b/connections/sql/queries.cpp new file mode 100644 index 0000000..ec6740e --- /dev/null +++ b/connections/sql/queries.cpp @@ -0,0 +1,82 @@ +#include "queries.h" + +namespace db::queries::fmt { + +std::string p_transaction( ws_msg_parsed op ) { + return fmt::format(p_transaction, + op.coin, + op.type, + op.userid, + op.quantity, + op.timestamp, + op.completed, + op.price); +} + +std::string p_utransaction( ws_msg_parsed op ) { + return fmt::format(p_utransaction, + op.coin, + op.type, + op.userid, + op.quantity, + op.timestamp, + op.completed, + op.price, + op.timestamp); +} + +std::string f_history_ts( long ts_high, long ts_low ) { + return fmt::format(f_history_ts, ts_high, ts_low); +} + +std::string f_userid_list_ts( long ts_high, long ts_low ) { + return fmt::format(f_userid_ts, ts_high, ts_low); +} + +std::string f_transactions_userid_ts( std::string userid, long ts_high, long ts_low ) { + return fmt::format(f_transactions_userid_ts, userid, ts_high, ts_low); +} + +std::string f_top_price_cycle_ts( std::string coin, long ts_high, long ts_low ) { + return fmt::format(f_slope_ts_top, coin, ts_high, ts_low); +} + +std::string f_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low ) { + return fmt::format(f_slope_ts_bot, coin, ts_high, ts_low); +} + +std::string p_coin_price( ws_msg_parsed op ) { + return fmt::format(p_coin_price, + op.coin, + op.price, + op.saleValue, + op.inCirculation, + std::time(NULL)*1000, + op.price, + op.saleValue, + op.inCirculation, + std::time(NULL)*1000); +} + +/* Query templates for math functions. */ +std::string f_last_n_spaced_prices( std::string coin, int nb_cycles, int delta) { + return fmt::format(f_last_n_spaced_prices, + coin, + nb_cycles, + delta); +} + +std::string p_user(user_t user){ + return fmt::format(p_user, + user.userid, + user.username, + user.icon, + user.networth, + user.networth); +} + +std::string f_query_userid_to_username(std::string userid) { + return fmt::format(f_userid_to_username, + userid); +} +} // db::queries::fmt diff --git a/connections/sql/queries.h b/connections/sql/queries.h new file mode 100644 index 0000000..bf5f4c7 --- /dev/null +++ b/connections/sql/queries.h @@ -0,0 +1,75 @@ +#ifndef _QUERIES_H_ +#define _QUERIES_H_ + +#include "../common/common.h" + +namespace db { +namespace query { +namespace tmplt { + /* Push a user in USERS */ + const std::string p_user = "INSERT INTO USERS(USERID, USERNAME, ICON, NETWORTH) "\ + "VALUES('{}', E'{}', '{}', {}) " + "ON CONFLICT (USERID) DO UPDATE SET "\ + "NETWORTH = {};"; + + /* Push a transaction in HISTORY */ + const std::string p_transaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\ + "VALUES('{}', {}, '{}', {}, {}, {}, {});"; + + /* Push a transaction in HISTORY with unicity check (for db completion) */ + const std::string p_utransaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\ + "SELECT '{}', {}, '{}', {}, {}, {}, {} " \ + "WHERE NOT EXISTS (SELECT TIMESTAMP FROM HISTORY WHERE TIMESTAMP = {} limit 1);"; + + /* Update coin price. */ + const std::string p_coin_price = "INSERT INTO COINPRICE(COIN, PRICE, SALEVALUE, INCIRCULATION, LASTUPDATE) "\ + "VALUES('{}', {}, {}, {}, {}) "\ + "ON CONFLICT (COIN) DO UPDATE SET "\ + "PRICE = {}, SALEVALUE = {}, INCIRCULATION = {}, LASTUPDATE = {};"; + + const std::string f_history_ts = "SELECT * FROM HISTORY "\ + "WHERE TIMESTAMP <= {} AND TIMESTAMP > {};"; + + const std::string f_userid_ts = "SELECT DISTINCT USERID FROM HISTORY " \ + "WHERE TIMESTAMP <= {} AND TIMESTAMP > {};"; + + const std::string f_slope_ts_top = "SELECT PRICE, TIMESTAMP FROM HISTORY "\ + "WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\ + "ORDER BY TIMESTAMP ASC "\ + "LIMIT 1;"; + + const std::string f_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\ + "WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\ + "ORDER BY TIMESTAMP DESC "\ + "LIMIT 1;"; + + const std::string f_transactions_userid_ts = "SELECT * FROM HISTORY "\ + "WHERE USERID = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {};"; + + const std::string f_last_n_spaced_prices = "select PRICE, TIMESTAMP FROM last_n_spaced_prices('{}', {}, {});"; + + + const std::string f_id_to_username = "select USERNAME FROM USERS WHERE USERID = '{}'"; +} // db::query::tmpt + +namespace db::queries::fmt { + std::string p_transaction( ws_msg_parsed op ); + std::string p_utransaction( ws_msg_parsed op ); + std::string p_coin_price( ws_msg_parsed op ); + std::string p_user(user_t user); + + std::string f_history_ts( long ts_high, long ts_low ); + std::string f_userid_list_ts( long ts_high, long ts_low ); + std::string f_transactions_userid_ts( std::string userid, long ts_high, long ts_low ); + std::string f_top_price_cycle_ts( std::string coin, long ts_high, long ts_low ); + std::string f_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low ); + std::string f_last_n_spaced_prices( std::string coin, int nb_cycles, int delta); + std::string f_query_userid_to_username(std::string userid); +} // db::query::fmt + + + + + +#endif + diff --git a/connections/test/aux/complete_db.sh b/connections/test/aux/complete_db.sh new file mode 100755 index 0000000..3c39f1c --- /dev/null +++ b/connections/test/aux/complete_db.sh @@ -0,0 +1,8 @@ +g++ ../../common/common.cpp \ + ../../aux/complete_db.cpp \ + ../../http/http_connector.cpp \ + ../../http/users.cpp \ + ../../sql/db_init.cpp \ + ../../sql/db_handle.cpp \ + ./test_complete_db.cpp \ + -o test_complete_db -lpqxx -lpq -lfmt -lcurl -lcrypto -lssl -lboost_system -lboost_random -lboost_thread -lpthread && ./test_complete_db diff --git a/connections/test/aux/test_complete_db b/connections/test/aux/test_complete_db new file mode 100755 index 0000000..c890db4 Binary files /dev/null and b/connections/test/aux/test_complete_db differ diff --git a/connections/test/aux/test_complete_db.cpp b/connections/test/aux/test_complete_db.cpp new file mode 100644 index 0000000..f3b0aba --- /dev/null +++ b/connections/test/aux/test_complete_db.cpp @@ -0,0 +1,9 @@ +#include "test_complete_db.h" + +int main(void) { + long ts_up = 1655503200000; + unsigned int days = 30; + + complete_db(ts_up, days, false); + return 1; +} diff --git a/connections/test/aux/test_complete_db.h b/connections/test/aux/test_complete_db.h new file mode 100644 index 0000000..de77440 --- /dev/null +++ b/connections/test/aux/test_complete_db.h @@ -0,0 +1,2 @@ +#include "../../aux/complete_db.h" + diff --git a/connections/test/db_init b/connections/test/db_init new file mode 100755 index 0000000..15c3b25 Binary files /dev/null and b/connections/test/db_init differ diff --git a/connections/test/graph b/connections/test/graph index b0e1b9c..88baa10 100755 Binary files a/connections/test/graph and b/connections/test/graph differ diff --git a/connections/test/graph.cpp b/connections/test/graph.cpp index e7d61fa..89af5dc 100644 --- a/connections/test/graph.cpp +++ b/connections/test/graph.cpp @@ -2,10 +2,53 @@ #include #include "../common/common.h" +#include "../sql/db_init.h" +#include "../sql/db_handle.h" +#include "../../maths/maths.h" #include "../../maths/graph.h" +std::map> make_map(pqxx::connection *C) { + // Timestamps + long ts_high = time(NULL)*1000; + long ts_low = ts_high - 60*10*1000; + float thresh = 10; + + /* Create graph (test zone) */ + std::map> res; + std::vector userids = db::pull_userid_list_ts(C, ts_high, ts_low); + std::cout << "Number of userids active during delta_ts: " << userids.size() << std::endl; + + res = norm::diff_map(C, userids, ts_high, ts_low, thresh); + + return res; +} + +void make_json(pqxx::connection *C, std::map> op) { + norm::diff_save_json(C, op); +} + + +void make_graph(std::map> op) { + make_graph_from_map(op); +} + + int main(void) { - test(); + /* Open up database connection. */ + pqxx::connection C("dbname = nasfaq user = steaky \ + hostaddr = 127.0.0.1 port = 5432"); + + if (C.is_open()) { + std::cout << "Opened database successfully: " << C.dbname() << std::endl; + } else { + std::cout << "Can't open database" << std::endl; + } + + std::map> res; + + res = make_map(&C); + make_json(&C, res); + //make_graph(res); return 1; } diff --git a/connections/test/graph.html b/connections/test/graph.html index 465f39b..a9edce6 100644 --- a/connections/test/graph.html +++ b/connections/test/graph.html @@ -13,7 +13,7 @@ #mynetwork { width: 100%; - height: 750px; + height: 980px; background-color: #222222; border: 1px solid lightgray; position: relative; @@ -21,82 +21,7 @@ } - #loadingBar { - position:absolute; - top:0px; - left:0px; - width: 100%; - height: 750px; - background-color:rgba(200,200,200,0.8); - -webkit-transition: all 0.5s ease; - -moz-transition: all 0.5s ease; - -ms-transition: all 0.5s ease; - -o-transition: all 0.5s ease; - transition: all 0.5s ease; - opacity:1; - } - #bar { - position:absolute; - top:0px; - left:0px; - width:20px; - height:20px; - margin:auto auto auto auto; - border-radius:11px; - border:2px solid rgba(30,30,30,0.05); - background: rgb(0, 173, 246); /* Old browsers */ - box-shadow: 2px 0px 4px rgba(0,0,0,0.4); - } - - #border { - position:absolute; - top:10px; - left:10px; - width:500px; - height:23px; - margin:auto auto auto auto; - box-shadow: 0px 0px 4px rgba(0,0,0,0.2); - border-radius:10px; - } - - #text { - position:absolute; - top:8px; - left:530px; - width:30px; - height:50px; - margin:auto auto auto auto; - font-size:22px; - color: #000000; - } - - div.outerBorder { - position:relative; - top:400px; - width:600px; - height:44px; - margin:auto auto auto auto; - border:8px solid rgba(0,0,0,0.1); - background: rgb(252,252,252); /* Old browsers */ - background: -moz-linear-gradient(top, rgba(252,252,252,1) 0%, rgba(237,237,237,1) 100%); /* FF3.6+ */ - background: -webkit-gradient(linear, left top, left bottom, color-stop(0%,rgba(252,252,252,1)), color-stop(100%,rgba(237,237,237,1))); /* Chrome,Safari4+ */ - background: -webkit-linear-gradient(top, rgba(252,252,252,1) 0%,rgba(237,237,237,1) 100%); /* Chrome10+,Safari5.1+ */ - background: -o-linear-gradient(top, rgba(252,252,252,1) 0%,rgba(237,237,237,1) 100%); /* Opera 11.10+ */ - background: -ms-linear-gradient(top, rgba(252,252,252,1) 0%,rgba(237,237,237,1) 100%); /* IE10+ */ - background: linear-gradient(to bottom, rgba(252,252,252,1) 0%,rgba(237,237,237,1) 100%); /* W3C */ - filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#fcfcfc', endColorstr='#ededed',GradientType=0 ); /* IE6-9 */ - border-radius:72px; - box-shadow: 0px 0px 10px rgba(0,0,0,0.2); - } - - - - #config { - float: left; - width: 400px; - height: 600px; - } @@ -107,17 +32,6 @@
-
-
-
0%
-
-
-
-
-
- - -