Compare commits

...

2 Commits

Author SHA1 Message Date
f81314adc1 Updated .gitignore 2022-12-10 15:36:36 +01:00
6b11549a99 Github sync 2022-12-10 15:35:16 +01:00
65 changed files with 5700 additions and 4165 deletions

57
.gitignore vendored Normal file
View File

@ -0,0 +1,57 @@
#### Python
*.pyc
#### C
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf

165
MutualFundsJSON.txt Normal file
View File

@ -0,0 +1,165 @@
/***************** MUTUAL FUNDS *******************************/
42[ "mutualFundPortfolioUpdate",
{"fund":"6796531e-8668-49d1-bb01-f07db4264ce6",
"tUpdate":
{"event":"Transaction Update",
"transactions":[{
"coin":"miko",
"type":0,
"userid":"6796531e-8668-49d1-bb01-f07db4264ce6",
"quantity":1,
"timestamp":1655806346312,
"completed":false,
"price":null,"fund":true
}],
"portfolio":null
}
}
]
42[ "mutualFundPortfolioUpdate",
{"fund":"6796531e-8668-49d1-bb01-f07db4264ce6",
"tUpdate":
{"event":"Transaction Update",
"transactions":[{
"coin":"miko","type":0,"userid":"6796531e-8668-49d1-bb01-f07db4264ce6","quantity":1,"timestamp":1655806346312,"completed":true,"price":32820.18,"fund":true}],
"portfolio":{
"miko":{"amount":352,"timestamp":1655806346312,"meanPurchasePrice":29645.69}
}
}
}]
42[ "mutualFundBalanceUpdate",
{"funds":
{"6796531e-8668-49d1-bb01-f07db4264ce6":4924583.6}
}
]
42[ "mutualFundRunningHistoryUpdate",
{"f9df4304-e084-494a-b918-9b5ab1c6842b":
{"volume":1000,"price":100000,"saleValue":100000,"networth":100000000,"members":1,"timestamp":1655806533376},
"851a75f1-19cf-4aad-b3f0-95ebf1973e4b":
{"volume":81000,"price":2810,"saleValue":2810,"networth":224663343.5,"members":4,"timestamp":1655806533380},
...
}
]
42[ "mutualFundStatUpdate",
{"funds":
{"f9df4304-e084-494a-b918-9b5ab1c6842b":
{"volume":1000,"price":100000,"saleValue":100000,"networth":100000000,"members":1},
"851a75f1-19cf-4aad-b3f0-95ebf1973e4b":
{"volume":81000,"price":2810,"saleValue":2810,"networth":224663343.5,"members":4},
...
}
}
]
42[ "mutualFundMembersUpdate",
{"type":"add",
"fund":"851a75f1-19cf-4aad-b3f0-95ebf1973e4b",
"username":"Bombshell Blondes Banking",
"userid":"726b5a5e-9dc0-438e-b6be-fb901a51f482"}
]
42[ "mutualFundOrderUpdate",
{"fund":"851a75f1-19cf-4aad-b3f0-95ebf1973e4b","buys":0,"sells":0}
]
42[" mutualFundMakePublicUpdate",
{"fund":"f9df4304-e084-494a-b918-9b5ab1c6842b"}
]
XHR:
{"success":true,"funds":
{"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":
{"id":"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f",
"name":"Miyatsuko",
"dateCreated":"2022-06-20T21:04:21.129Z",
"icon":"inanis",
"missionStatement":"Abayo Chocoball",
"tag":"造"
,"color":"e0ae2f",
"ceo":"10dcbbd5-f761-44f3-92ce-ea5b01f1d747",
"balance":1228441634.6,
"released":true,
"members":
[ {"id":"10dcbbd5-f761-44f3-92ce-ea5b01f1d747","username":"[造] Miyatsuko-Priestess Treasury","boardMember":true},
{"id":"135c986e-0cc3-45d1-ad9a-77725a99d44f","username":"[造] Touching Fluffy Tails Management Co.","boardMember":true},
...
],
"portfolio":
{ "kronii":{"amount":8,"timestamp":1655760060678,"meanPurchasePrice":16719.18},
"inanis":{"amount":8,"timestamp":1655760516709,"meanPurchasePrice":27564.07},
...
}
},
...,
...,
...
},
"bulletins":
{"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":[],
"7b24a25b-8f78-42b6-85ff-1d3f62c7132a":[
{"id":"3e7a8c16-2e44-42aa-91ee-57674378982f",
"fund":"7b24a25b-8f78-42b6-85ff-1d3f62c7132a",
"author":"f7c3b57a-8b4f-4715-9f37-dfcc81fb0c18",
"authorName":"[狐] Kitsune Holdings 🌽🦊🍔",
"timestamp":"2022-06-20T22:54:11.252Z",
"message":"OhaKOOOOOOOOOOOOOOOOOOON"
}
],
...
},
"fundHistory":{
"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":
[{"timestamp":"2022-06-20T21:04:21.129Z","volume":100000,"price":16000,"networth":1600000000,"members":1}],
"81570634-37bb-4e10-b2cb-ea1bf955fc53":
[{"timestamp":"2022-06-20T21:04:24.109Z","volume":1000,"price":25000,"networth":25000000,"members":1}],
...
},
"fundStats":{
"4a4afb61-7a1e-4f99-bbec-fd50d0529c5f":{
"volume":100000,
"price":16000,
"saleValue":16000,
"networth":1584327512,
"members":10,
"history":[
{"volume":100000,"price":16000,"saleValue":16000,"networth":1600000000,"members":1,"timestamp":1655759061132},
{"volume":100000,"price":16000,"saleValue":16000,"networth":1600000000,"members":8,"timestamp":1655759714668},
...
]
},
...
}
"orders":{
"779ee96f-5a24-4244-b1ce-a8cc540f0cbd":
{"buys":6980,"sells":0},
"6796531e-8668-49d1-bb01-f07db4264ce6":
{"buys":12157,"sells":0},
...
}
"fundPayouts":{},
"fundsToDissolve":{}

View File

@ -1,11 +0,0 @@
#include "api.h"
class nsfq_proxy_api {
public:
private:
/* HTTP wrapper to interface with the server. */
/* Websocket used to interface with the server. */
websocket_endpoint ws_endpoint;
}

View File

@ -1,22 +0,0 @@
#ifndef _API_H_
#define _API_H_
#include <iostream>
#include <string>
#include <stdlib.h>
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp>
#include <websocketpp/common/thread.hpp>
#include <websocketpp/common/memory.hpp>
#include <nlohmann/json.hpp>
#include "../parser/parser.h"
#include "../common/common.h"
#include "../safe_queue/safe_queue.h"
#include "../my_ssl/my_ssl.h"
#endif

View File

@ -0,0 +1,60 @@
#include "complete_db.h"
ws_msg_parsed<WS_EVENT_TRANSACTION> raw_msg_parse(nlohmann::json op) {
ws_msg_parsed<WS_EVENT_TRANSACTION> rop;
rop.coin = op["coin"];
rop.type = op["type"];
rop.userid = op["userid"];
rop.quantity = op["quantity"];
rop.timestamp = op["timestamp"];
rop.completed = op["completed"];
rop.timestamp = op["timestamp"];
rop.price = op["price"];
return rop;
}
int complete_db(long ts_up, unsigned int days, bool check) {
long tmp_ts;
std::string BASE_URL = "https://nasfaq.biz/api/getHistory?timestamp={}";
std::string TMP_URL;
nlohmann::json res_json;
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> tmp;
nsfq_http::http_connector::connector c;
/* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
return 1;
}
/* Update database if needed */
tmp_ts = ts_up;
for(int i = 0; i < days; i++) {
std::cout << "Fetching history for ts = " << tmp_ts << std::endl;
tmp_ts -= 60*60*24*1000;
TMP_URL = fmt::format(BASE_URL, tmp_ts);
c.get((const char*)TMP_URL.c_str());
res_json = nlohmann::json::parse(c.get_buffer());
tmp = {};
if(!res_json.empty()) {
for(auto const & x:res_json["history"]["transactions"]) {
tmp.transaction_list.push_back(raw_msg_parse(x));
}
}
if(check) db::push_uhistory(&C, tmp);
else db::push_history(&C, tmp);
}
return 0;
}

View File

@ -0,0 +1,7 @@
#include "../common/common.h"
#include "../http/http_connector.h"
#include "../sql/db_init.h"
#include "../sql/db_handle.h"
int complete_db(long ts_up, unsigned int days, bool check);

View File

@ -1,9 +0,0 @@
#include "client.h"
namespace proxy {
client::client(void) {
m_q = SafeQueue<std::string>;
m_endpoint =
}
}

View File

@ -1,24 +0,0 @@
#ifndef _CLIENT_H_
#define _CLIENT_H_
#include "../common/common.h"
#include "../ws/ssl_ws.h"
namespace proxy {
/**
NASFAQ proxy client.
*/
class client {
public:
client(void);
~client(void);
void init(void);
private:
SafeQueue<std::string> q;
ws::websocket_endpoint endpoint;
}
}
#endif

View File

@ -20,11 +20,12 @@ std::ostream& operator<< (std::ostream& stream, WS_MSG const & type) {
} }
/** /**
//TODO: GOES INTO pretty_print.cpp
Pretty prints COIN_PRICE events. Pretty prints COIN_PRICE events.
*/ */
template<> template<>
std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> const & op) { std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_COIN_PRICE> const & op) {
stream << "WS_EVENT_COIN_PRICE_UPDATE:" stream << "WS_EVENT_COIN_PRICE:"
<< " Coin: " << op.coin << " Coin: " << op.coin
<< " Price: " << op.price << " Price: " << op.price
<< " SaleValue: " << op.saleValue << " SaleValue: " << op.saleValue
@ -49,11 +50,46 @@ std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_TRANSACT
} }
/** /**
Pretty prints the transactions held in the WS_EVENT_HISTORY_UPDATE vector. Pretty prints portfolio_coin_t.
*/
std::ostream& operator<< (std::ostream& stream, portfolio_coin_t const & op) {
stream << "portfolio_coin_t:"
<< " Coin: " << op.coin
<< " Amount: " << op.amount
<< " Timestamp: " << op.ts
<< " MPP: " << op.mpp;
return stream;
}
/**
Pretty prints WS_EVENT_MF_PORTFOLIO events.
*/ */
template<> template<>
std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> const & op) { std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> const & op) {
stream << "WS_EVENT_HISTORY_UPDATE:\n"; stream << "WS_EVENT_MF_PORTFOLIO:"
<< " Fund: " << op.fund
<< " Event: " << op.event << std::endl
<< "Transactions: " << std::endl;
for(auto & element : op.transactions) {
stream << "\t" << element << std::endl;
}
stream << "Portfolio: " << std::endl;
for(auto & element : op.portfolio) {
stream << "\t" << element << std::endl;
}
return stream;
}
/**
Pretty prints the transactions held in the WS_EVENT_HISTORY vector.
*/
template<>
std::ostream& operator<< (std::ostream& stream, ws_msg_parsed<WS_EVENT_HISTORY> const & op) {
stream << "WS_EVENT_HISTORY:\n";
for(auto & element : op.transaction_list) { for(auto & element : op.transaction_list) {
stream << "\t" << element << std::endl; stream << "\t" << element << std::endl;
} }

View File

@ -13,7 +13,7 @@
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
//#include "../safe_queue/safe_queue.h" // TODO: REFACTOR ALL THIS TRASH HOLY SHIT
/*********************************************************************************** /***********************************************************************************
WEBSOCKET MESSAGE STRUCTURES WEBSOCKET MESSAGE STRUCTURES
@ -58,7 +58,7 @@ struct ws_msg_parsed;
coinPriceUpdate structure coinPriceUpdate structure
*/ */
template <> template <>
struct ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> { struct ws_msg_parsed<WS_EVENT_COIN_PRICE> {
std::string coin; std::string coin;
float price; float price;
float saleValue; float saleValue;
@ -66,7 +66,7 @@ struct ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> {
}; };
/** /**
Auxiliary type for WS_EVENT_HISTORY_UPDATE. Auxiliary type for WS_EVENT_HISTORY.
This holds individual transactions as handed by the websocket in a list. This holds individual transactions as handed by the websocket in a list.
*/ */
template <> template <>
@ -78,16 +78,43 @@ struct ws_msg_parsed<WS_EVENT_TRANSACTION> {
long timestamp; long timestamp;
bool completed; bool completed;
float price; float price;
bool fund;
}; };
/** /**
historyUpdate structure holding transactions. historyUpdate structure holding transactions.
*/ */
template<> template<>
struct ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> { struct ws_msg_parsed<WS_EVENT_HISTORY> {
std::vector<ws_msg_parsed<WS_EVENT_TRANSACTION>> transaction_list; std::vector<ws_msg_parsed<WS_EVENT_TRANSACTION>> transaction_list;
}; };
/***********************************************************************************
MUTUAL FUNDS
***********************************************************************************/
/**
Portfolio type TODO: Make this global, could be used with users' wallets
nasfaq::types::portfolio::coin
*/
typedef struct portfolio_coin_t {
std::string coin; //TODO: make this an enum
uint amount;
long ts;
float mpp;
} portfolio_coin_t ;
/**
Portfolio update
*/
template <>
struct ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> {
std::string fund;
std::string event; // TODO: type for this
std::vector<ws_msg_parsed<WS_EVENT_TRANSACTION>> transactions;
std::vector<portfolio_coin_t> portfolio;
};
/*********************************************************************************** /***********************************************************************************
WEBSOCKET MESSAGE STRUCTURES FUNCTIONS WEBSOCKET MESSAGE STRUCTURES FUNCTIONS
***********************************************************************************/ ***********************************************************************************/

View File

@ -0,0 +1,30 @@
#include "formatting.h"
/*
See https://stackoverflow.com/questions/2896600/how-to-replace-all-occurrences-of-a-character-in-string
*/
void replace_all(std::string& str, const std::string& from, const std::string& to) {
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != std::string::npos) {
str.replace(start_pos, from.length(), to);
start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
}
}
/*
Splits a string "{...}, ..., {...}" in an array ["{...}", ..., "{...}"].
ONLY HANDLES DICTIONARIES OF DEPTH 1.
*/
std::vector<std::string> tokenize_json_array(std::string op, std::string token) {
int start = 0;
int end = op.find("}");
std::vector<std::string> ret;
while( end != -1 ) {
ret.push_back(op.substr(start, end - start + 1));
start = end + token.size() + 1; // + 1 accounts for the ",".
end = op.find(token, start);
}
return ret;
}

View File

@ -0,0 +1,16 @@
#ifndef _FORMATTING_H_
#define _FORMATTING_H_
#include <iostream>
#include <string>
#include <stdlib.h>
#include "../common/common.h"
#include <nlohmann/json.hpp>
void replace_all(std::string&, const std::string&, const std::string&);
std::vector<std::string> tokenize_json_array(std::string, std::string token = "}");
#endif

View File

@ -1,13 +1,20 @@
X(WS_EVENT_COIN_PRICE_UPDATE)
X(WS_EVENT_TRANSACTION)
X(WS_EVENT_HISTORY_UPDATE)
X(WS_EVENT_BROKER_FEE_UPDATE)
X(WS_EVENT_TODAY_PRICES_UPDATE)
X(WS_EVENT_LEADERBOARD_UPDATE)
X(WS_EVENT_FLOOR_UPDATE)
X(WS_EVENT_AUCTION_UPDATE)
X(WS_EVENT_BENCHMARK_UPDATE)
X(WS_EVENT_SUPERCHAT_UPDATE)
X(WS_EVENT_GACHA_UPDATE)
X(WS_EVENT_ADD_MESSAGE_GLOBAL)
X(WS_EVENT_UNKNOWN) X(WS_EVENT_UNKNOWN)
X(WS_EVENT_COIN_PRICE)
X(WS_EVENT_TRANSACTION)
X(WS_EVENT_HISTORY)
X(WS_EVENT_BROKER_FEE)
X(WS_EVENT_TODAY_PRICES)
X(WS_EVENT_LEADERBOARD)
X(WS_EVENT_FLOOR)
X(WS_EVENT_AUCTION)
X(WS_EVENT_BENCHMARK)
X(WS_EVENT_SUPERCHAT)
X(WS_EVENT_GACHA)
X(WS_EVENT_ADD_MESSAGE_GLOBAL)
X(WS_EVENT_MF_PORTFOLIO)
X(WS_EVENT_MF_BALANCE)
X(WS_EVENT_MF_RUNNINGHISTORY)
X(WS_EVENT_MF_STAT)
X(WS_EVENT_MF_MEMBERS)
X(WS_EVENT_MF_ORDER)
X(WS_EVENT_MF_MAKEPUBLIC)

View File

@ -1,110 +1,10 @@
#include "parser.h" #include "parser.h"
namespace parser { namespace parser {
WS_MSG msg_type_detect(std::string op) { /*******************************************************************************
WS_MSG ret; Parser object
*******************************************************************************/
if (op.substr(0, 30).find("coinPriceUpdate") != std::string::npos) {
ret = WS_EVENT_COIN_PRICE_UPDATE;
} else if (op.substr(0, 30).find("historyUpdate") != std::string::npos) {
ret = WS_EVENT_HISTORY_UPDATE;
} else if (op.substr(0, 30).find("todayPricespdate") != std::string::npos) {
ret = WS_EVENT_TODAY_PRICES_UPDATE;
} else if (op.substr(0, 30).find("brokerFeeUpdate") != std::string::npos) {
ret = WS_EVENT_BROKER_FEE_UPDATE;
} else {
ret = WS_EVENT_UNKNOWN;
}
return ret;
}
template <WS_MSG E>
ws_msg_parsed<E> parse(std::string rmsg) {};
template <>
ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> raw_msg_parse<WS_EVENT_COIN_PRICE_UPDATE>(std::string rmsg) {
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> rop;
rop.coin = jparsed["coin"];
rop.price = (jparsed["info"])["price"];
rop.saleValue = (jparsed["info"])["saleValue"];
rop.inCirculation = (jparsed["info"])["inCirculation"];
return rop;
}
/****************************************************************
Helper functions should go in common or something more general
****************************************************************/
/*
See https://stackoverflow.com/questions/2896600/how-to-replace-all-occurrences-of-a-character-in-string
*/
static inline void replace_all(std::string& str, const std::string& from, const std::string& to) {
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != std::string::npos) {
str.replace(start_pos, from.length(), to);
start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
}
}
/*
Splits a string "{...}, ..., {...}" in an array ["{...}", ..., "{...}"].
ONLY HANDLES DICTIONARIES OF DEPTH 1.
*/
std::vector<std::string> tokenize_json_array(std::string op, std::string token = "}") {
int start = 0;
int end = op.find("}");
std::vector<std::string> ret;
while( end != -1 ) {
ret.push_back(op.substr(start, end - start + 1));
start = end + token.size() + 1; // + 1 accounts for the ",".
end = op.find(token, start);
}
return ret;
}
template <>
ws_msg_parsed<WS_EVENT_TRANSACTION> raw_msg_parse<WS_EVENT_TRANSACTION>(std::string rmsg) {
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
ws_msg_parsed<WS_EVENT_TRANSACTION> rop;
rop.coin = jparsed["coin"];
rop.type = jparsed["type"];
rop.userid = jparsed["userid"];
rop.quantity = jparsed["quantity"];
rop.timestamp = jparsed["timestamp"];
rop.completed = jparsed["completed"];
rop.timestamp = jparsed["timestamp"];
rop.price = jparsed["price"];
return rop;
}
template<>
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> raw_msg_parse<WS_EVENT_HISTORY_UPDATE>(std::string rmsg) {
std::vector<std::string> raw_vect;
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> rop;
/* Replace \" by " */
replace_all(rmsg, "\\\"", "\"");
/* Extract array */
raw_vect = tokenize_json_array(rmsg);
/* Create the output array by parsing each transaction elements */
for(auto & raw_tr : raw_vect) {
rop.transaction_list.push_back(raw_msg_parse<WS_EVENT_TRANSACTION>(raw_tr));
}
return rop;
}
parser::parser(ws::connection_metadata::ptr metadata, pqxx::connection* C) parser::parser(ws::connection_metadata::ptr metadata, pqxx::connection* C)
: m_metadata(metadata) : m_metadata(metadata)
, m_process_queue_state(false) , m_process_queue_state(false)
@ -112,7 +12,6 @@ parser::parser(ws::connection_metadata::ptr metadata, pqxx::connection* C)
{} {}
parser::~parser() { parser::~parser() {
} }
void parser::process_queue() { void parser::process_queue() {
@ -125,25 +24,35 @@ void parser::process_queue() {
while(m_process_queue_state) { while(m_process_queue_state) {
raw_data = m_metadata->pop_message(); raw_data = m_metadata->pop_message();
type = msg_type_detect(raw_data);
type = types::extract(raw_data);
// TODO: make a function for each of these cases and use a switch as in parser_aux
/* POSTGRESQL STUFF GOES HERE */ /* POSTGRESQL STUFF GOES HERE */
if (type == WS_EVENT_COIN_PRICE_UPDATE) { if (type == WS_EVENT_COIN_PRICE) {
/* 18 = 3 + 15 */ /* 18 = 3 + 15 */
raw_data = raw_data.substr(18, raw_data.length()-18); raw_data = raw_data.substr(18, raw_data.length()-18);
ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> parsed_msg = raw_msg_parse<WS_EVENT_COIN_PRICE_UPDATE>(raw_data); ws_msg_parsed<WS_EVENT_COIN_PRICE> parsed_msg = single<WS_EVENT_COIN_PRICE>(raw_data);
std::cout << parsed_msg << std::endl; std::cout << parsed_msg << std::endl;
db::push_coin_price(m_connection, parsed_msg); db::push_coin_price(m_connection, parsed_msg);
} else if (type == WS_EVENT_HISTORY_UPDATE) { } else if (type == WS_EVENT_HISTORY) {
raw_data = raw_data.substr(18, raw_data.length()-18); raw_data = raw_data.substr(18, raw_data.length()-18);
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> parsed_msg = raw_msg_parse<WS_EVENT_HISTORY_UPDATE>(raw_data); ws_msg_parsed<WS_EVENT_HISTORY> parsed_msg = single<WS_EVENT_HISTORY>(raw_data);
std::cout << parsed_msg << std::endl; std::cout << parsed_msg << std::endl;
db::push_history(m_connection, parsed_msg); db::push_history(m_connection, parsed_msg);
//std::cout << "\x1B[31mTexting\033[0m\t\t" << std::endl; //std::cout << "\x1B[31mTexting\033[0m\t\t" << std::endl;
} }
else if (type == WS_EVENT_MF_PORTFOLIO) {
raw_data = raw_data.substr(28, raw_data.length()-28);
//raw_data = payload::extract(raw_data);
ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> parsed_msg = single<WS_EVENT_MF_PORTFOLIO>(raw_data);
std::cout << parsed_msg << std::endl;
}
} }
} }
@ -168,9 +77,140 @@ void parser::process_queue_thread_join()
pthread_join(m_process_queue_thread, 0); pthread_join(m_process_queue_thread, 0);
} }
void parser::process_queue_stop() { void parser::process_queue_stop() {
m_process_queue_state = false; m_process_queue_state = false;
} }
/*******************************************************************************
Message parsing
*******************************************************************************/
//template <WS_MSG E>
//ws_msg_parsed<E> parse(std::string rmsg) {};
/*
Takes as input a raw string associated to a coinPriceUpdate ws event and returns a parsed representation of it.
*/
template <>
ws_msg_parsed<WS_EVENT_COIN_PRICE> single<WS_EVENT_COIN_PRICE>(std::string rmsg) {
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
ws_msg_parsed<WS_EVENT_COIN_PRICE> rop;
rop.coin = jparsed["coin"];
rop.price = (jparsed["info"])["price"];
rop.saleValue = (jparsed["info"])["saleValue"];
rop.inCirculation = (jparsed["info"])["inCirculation"];
return rop;
} }
/*
Takes as input a raw string associated to a transaction ws event and returns a parsed representation of it.
*/
template <>
ws_msg_parsed<WS_EVENT_TRANSACTION> single<WS_EVENT_TRANSACTION>(std::string rmsg) {
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
ws_msg_parsed<WS_EVENT_TRANSACTION> rop;
rop.coin = jparsed["coin"];
rop.type = jparsed["type"];
rop.userid = jparsed["userid"];
rop.quantity = jparsed["quantity"];
rop.timestamp = jparsed["timestamp"];
rop.completed = jparsed["completed"];
rop.timestamp = jparsed["timestamp"];
rop.price = jparsed["price"];
// rop.fund = jparsed["fund"];
return rop;
}
/*
Same as above but takes a json as input.
*/
template <>
ws_msg_parsed<WS_EVENT_TRANSACTION> single_j<WS_EVENT_TRANSACTION>(nlohmann::json op) {
ws_msg_parsed<WS_EVENT_TRANSACTION> rop;
rop.coin = op["coin"];
rop.type = op["type"];
rop.userid = op["userid"];
rop.quantity = op["quantity"];
rop.timestamp = op["timestamp"];
rop.completed = op["completed"];
rop.timestamp = op["timestamp"];
if(op["price"] == nullptr) {
rop.price = 0;
} else {
rop.price = op["price"];
}
// rop.fund = op["fund"];
return rop;
}
/*
Takes as input a raw string associated to a historyUpdate ws event and returns a parsed representation of it.
*/
template<>
ws_msg_parsed<WS_EVENT_HISTORY> single<WS_EVENT_HISTORY>(std::string rmsg) {
std::vector<std::string> raw_vect;
ws_msg_parsed<WS_EVENT_HISTORY> rop;
/* Replace \" by " */
replace_all(rmsg, "\\\"", "\"");
/* Extract array */
raw_vect = tokenize_json_array(rmsg);
/* Create the output array by parsing each transaction elements */
for(auto & raw_tr : raw_vect) {
rop.transaction_list.push_back(single<WS_EVENT_TRANSACTION>(raw_tr));
}
return rop;
}
///////////////////////////////// TODO: PUT THIS FUCKING TRASH SOMEWHERE ELSE /////////
/*
Takes as input a json and returns its representation as portfolio_coin_t.
*/
portfolio_coin_t parse_portfolio_coin(std::string coin, nlohmann::json op) {
portfolio_coin_t rop;
rop.coin = coin;
rop.amount = op["amount"];
rop.ts = op["timestamp"];
rop.mpp = op["meanPurchasePrice"];
return rop;
}
////////////////////////////////////////////////////////////////////////////
/*
Takes as input a raw string associated to a mutualFundPortfolioUpdate as ws event and returns a parsed representation of it.
*/
template <>
ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> single<WS_EVENT_MF_PORTFOLIO>(std::string rmsg) {
ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> rop;
nlohmann::json jparsed = nlohmann::json::parse(rmsg); /* Check for errors and emptiness needed */
rop.fund = jparsed["fund"];
rop.event = (jparsed["tUpdate"])["event"];
for(auto & raw_tr : (jparsed["tUpdate"])["transactions"]) {
rop.transactions.push_back(single_j<WS_EVENT_TRANSACTION>(raw_tr));
}
if((jparsed["tUpdate"])["portfolio"] != NULL) {
for(auto & item : (jparsed["tUpdate"])["portfolio"].items()) {
rop.portfolio.push_back(parse_portfolio_coin(item.key(), item.value()));
}
}
return rop;
}
} //parser

View File

@ -9,24 +9,17 @@
#include "../sql/db_handle.h" #include "../sql/db_handle.h"
#include "../common/common.h" #include "../common/common.h"
#include "../common/formatting.h"
#include "../ws/ssl_ws.h" #include "../ws/ssl_ws.h"
#include "./parser_aux.h"
//TODO: Remove ws here and use a safequeue
namespace parser { namespace parser {
WS_MSG msg_type_detect(std::string); /*******************************************************************************
Parser object
template <WS_MSG E> *******************************************************************************/
ws_msg_parsed<E> raw_msg_parse(std::string);
template <>
ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> raw_msg_parse<WS_EVENT_COIN_PRICE_UPDATE>(std::string);
template<>
ws_msg_parsed<WS_EVENT_TRANSACTION> raw_msg_parse<WS_EVENT_TRANSACTION>(std::string);
template<>
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> raw_msg_parse<WS_EVENT_HISTORY_UPDATE>(std::string);
class parser { class parser {
public: public:
parser(ws::connection_metadata::ptr, pqxx::connection*); parser(ws::connection_metadata::ptr, pqxx::connection*);
@ -45,7 +38,30 @@ private:
static void* process_queue_helper(void*); static void* process_queue_helper(void*);
}; };
} /*******************************************************************************
Message parsing
*******************************************************************************/
template <WS_MSG E>
ws_msg_parsed<E> single(std::string);
template <WS_MSG E>
ws_msg_parsed<E> single_j(nlohmann::json);
template <>
ws_msg_parsed<WS_EVENT_COIN_PRICE> single<WS_EVENT_COIN_PRICE>(std::string);
template<>
ws_msg_parsed<WS_EVENT_TRANSACTION> single<WS_EVENT_TRANSACTION>(std::string);
template <>
ws_msg_parsed<WS_EVENT_TRANSACTION> single_j<WS_EVENT_TRANSACTION>(nlohmann::json );
template<>
ws_msg_parsed<WS_EVENT_HISTORY> single<WS_EVENT_HISTORY>(std::string);
template <>
ws_msg_parsed<WS_EVENT_MF_PORTFOLIO> single<WS_EVENT_MF_PORTFOLIO>(std::string);
} // parser
#endif #endif

View File

@ -0,0 +1,57 @@
#include "parser_aux.h"
/*******************************************************************************
Types.
*******************************************************************************/
namespace parser::types {
/*
Takes a string as input and returns a WS_MSG corresponding to the underlying event type.
Returns WS_EVENT_UNKNOWN if no match is detected.
TODO: handle malformed strings, only one ",...
*/
WS_MSG extract(std::string op) {
WS_MSG ret;
std::size_t pos_beg;
std::size_t pos_end;
std::string raw_type;
// Find raw_type as first occurence of "RAW_TYPE" in op
pos_beg = op.find_first_of('"');
pos_end = op.find_first_of('"', pos_beg + 1);
raw_type = op.substr(pos_beg+1, pos_end - pos_beg-1);
switch(map_rawIdent[raw_type]) {
case(0): ret = WS_EVENT_UNKNOWN; break;
case(1): ret = WS_EVENT_COIN_PRICE; break;
case(2): ret = WS_EVENT_HISTORY; break;
case(3): ret = WS_EVENT_TODAY_PRICES; break;
case(4): ret = WS_EVENT_BROKER_FEE; break;
case(10): ret = WS_EVENT_MF_PORTFOLIO; break;
case(11): ret = WS_EVENT_MF_BALANCE; break;
case(12): ret = WS_EVENT_MF_RUNNINGHISTORY; break;
case(13): ret = WS_EVENT_MF_STAT; break;
case(14): ret = WS_EVENT_MF_MEMBERS; break;
case(15): ret = WS_EVENT_MF_ORDER; break;
case(16): ret = WS_EVENT_MF_MAKEPUBLIC; break;
default: ret = WS_EVENT_UNKNOWN;
}
return ret;
}
} // parser::type
namespace parser::payload {
std::string extract(std::string op) {
std::string ret;
std::size_t tmp;
// Find second occurence of ". Raw payloads are of the form 42["RAW_TYPE",PAYLOAD ]
tmp = op.find_first_of('"', 3) + 2; // Wew hopefully that doesn't break
ret = op.substr(tmp, op.length() - tmp);
return ret;
}
}

View File

@ -0,0 +1,41 @@
#ifndef _PARSER_AUX_H_
#define _PARSER_AUX_H_
#include "../common/common.h"
/*******************************************************************************
Types
*******************************************************************************/
namespace parser::types {
/*
Raw message identifiers and int conversion.
*/
static std::map<std::string, int> map_rawIdent = {
{"unknown", 0},
{"coinPriceUpdate", 1},
{"historyUpdate", 2},
{"todayPricespdate", 3},
{"brokerFeeUpdate", 4},
{"mutualFundPortfolioUpdate", 10},
{"mutualFundBalanceUpdate", 11},
{"mutualFundRunningHistoryUpdate", 12},
{"mutualFundStatUpdate" , 13},
{"mutualFundMembersUpdate" , 14},
{"mutualFundOrderUpdate" , 15},
{"mutualFundMakePublicUpdate" , 16}
};
/*
Takes a string as input and returns its corresponding WS_MSG type.
*/
WS_MSG extract(std::string);
} // parser::types
namespace parser::payload {
std::string extract(std::string);
} // parser::payload
#endif

View File

@ -1,50 +0,0 @@
#include "safe_queue.h"
template <typename T>
TestQueue<T>::TestQueue(void)
: q()
{}
template <typename T>
TestQueue<T>::~TestQueue(void)
{}
template <typename T>
SafeQueue<T>::SafeQueue(void)
: q()
, m()
, c()
{}
template <typename T>
SafeQueue<T>::~SafeQueue(void)
{}
// Add an element to the queue.
template <typename T>
void SafeQueue<T>::enqueue(T t)
{
std::lock_guard<std::mutex> lock(m);
q.push(t);
c.notify_one();
}
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
template <typename T>
T SafeQueue<T>::dequeue(void) {
std::unique_lock<std::mutex> lock(m);
while(q.empty())
{
// release lock as long as the wait and reaquire it afterwards.
c.wait(lock);
}
T val = q.front();
q.pop();
return val;
}
template <typename T>
bool SafeQueue<T>::empty(void) {
return q.empty();
}

View File

@ -1,36 +0,0 @@
#ifndef _SAFE_QUEUE_H_
#define _SAFE_QUEUE_H_
#include <queue>
#include <mutex>
#include <condition_variable>
// A threadsafe-queue.
template <class T>
class TestQueue
{
public:
TestQueue();
~TestQueue();
private:
std::queue<T> q;
};
// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
SafeQueue(void);
~SafeQueue(void);
void enqueue(T);
T dequeue(void);
bool empty(void);
private:
std::queue<T> q;
mutable std::mutex m;
std::condition_variable c;
};
#endif

View File

@ -1,122 +1,34 @@
#include "db_handle.h" #include "db_handle.h"
namespace db { namespace db {
namespace query { void commit(pqxx::connection *C, std::string query) {
/* Query templates for HISTORY. */ /* Create a transactional object. */
//TODO: Move these definitions to .h file. pqxx::work W(*C);
const std::string query_template_transaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\
"VALUES('{}', {}, '{}', {}, {}, {}, {});";
const std::string query_template_history_ts = "SELECT * FROM HISTORY WHERE TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string query_template_userid_ts = "SELECT DISTINCT USERID FROM HISTORY WHERE TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string query_template_slope_ts_top = "SELECT PRICE, TIMESTAMP FROM HISTORY "\ /* Execute SQL query */
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\ W.exec( query );
"ORDER BY TIMESTAMP ASC "\ W.commit();
"LIMIT 1;";
const std::string query_template_transactions_userid_ts = "SELECT * FROM HISTORY WHERE USERID = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string query_template_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\
"ORDER BY TIMESTAMP DESC "\
"LIMIT 1;";
const std::string query_template_last_n_spaced_prices = "select PRICE, TIMESTAMP FROM last_n_spaced_prices('{}', {}, {});";
const std::string query_template_user = "INSERT INTO USERS(USERID, USERNAME, ICON, NETWORTH) "\
"VALUES('{}', E'{}', '{}', {}) "
"ON CONFLICT (USERID) DO UPDATE SET "\
"NETWORTH = {};";
const std::string query_template_userid_to_username = "select USERNAME FROM USERS WHERE USERID = '{}'";
/********************************************************************************
GLOBAL QUERIES
********************************************************************************/
std::string make_push_query_transaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op ) {
return fmt::format(query_template_transaction,
op.coin,
op.type,
op.userid,
op.quantity,
op.timestamp,
op.completed,
op.price);
} }
}
std::string make_pull_query_history_ts( long ts_high, long ts_low ) { namespace db::push {
return fmt::format(query_template_history_ts, ts_high, ts_low);
}
std::string make_pull_query_userid_list_ts( long ts_high, long ts_low ) {
return fmt::format(query_template_userid_ts, ts_high, ts_low);
}
std::string make_pull_query_transactions_userid_ts( std::string userid, long ts_high, long ts_low ) {
return fmt::format(query_template_transactions_userid_ts, userid, ts_high, ts_low);
}
std::string make_pull_query_top_price_cycle_ts( std::string coin, long ts_high, long ts_low ) {
return fmt::format(query_template_slope_ts_top, coin, ts_high, ts_low);
}
std::string make_pull_query_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low ) {
return fmt::format(query_template_slope_ts_bot, coin, ts_high, ts_low);
}
/* Query templates for COIN PRICE. */
const std::string query_template_coin_price = "INSERT INTO COINPRICE(COIN, PRICE, SALEVALUE, INCIRCULATION, LASTUPDATE) "\
"VALUES('{}', {}, {}, {}, {}) "\
"ON CONFLICT (COIN) DO UPDATE SET "\
"PRICE = {}, SALEVALUE = {}, INCIRCULATION = {}, LASTUPDATE = {};";
std::string make_push_query_coin_price( ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> op ) {
return fmt::format(query_template_coin_price,
op.coin,
op.price,
op.saleValue,
op.inCirculation,
std::time(NULL)*1000,
op.price,
op.saleValue,
op.inCirculation,
std::time(NULL)*1000);
}
/* Query templates for math functions. */
std::string make_pull_query_last_n_spaced_prices( std::string coin, int nb_cycles, int delta) {
return fmt::format(query_template_last_n_spaced_prices,
coin,
nb_cycles,
delta);
}
std::string make_push_query_user(user_t user){
return fmt::format(query_template_user,
user.userid,
user.username,
user.icon,
user.networth,
user.networth);
}
std::string make_pull_query_userid_to_username(std::string userid) {
return fmt::format(query_template_userid_to_username,
userid);
}
}
/********************************************************************************
HISTORY
********************************************************************************/
/* Add transactions contained in history update to the database. */ /* Add transactions contained in history update to the database. */
void push_history( pqxx::connection* C, ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> op ) { void push_history( pqxx::connection* C, ws_msg_parsed<WS_EVENT_HISTORY> op ) {
std::string query; std::string query;
for(auto& element : op.transaction_list){ for(auto& element : op.transaction_list){
query += query::make_push_query_transaction(element); query += query::fmt::p_query_transaction(element);
}
db::commit(C, query);
}
/* Same as push_history but checks for unique timestamp. */
void push_uhistory( pqxx::connection* C, ws_msg_parsed<WS_EVENT_HISTORY> op ) {
std::string query;
for(auto& element : op.transaction_list){
query += query::make_push_query_utransaction(element);
} }
/* Create a transactional object. */ /* Create a transactional object. */
@ -127,17 +39,18 @@ namespace db {
W.commit(); W.commit();
} }
/* Returns the last cycle in form of a ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> struct. */ /* Returns the last cycle in form of a ws_msg_parsed<WS_EVENT_HISTORY> struct. */
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_last_cycle(pqxx::connection* C) { ws_msg_parsed<WS_EVENT_HISTORY> pull_last_cycle(pqxx::connection* C) {
std::string query; std::string query;
long ts_high, ts_low; long ts_high, ts_low;
ws_msg_parsed<WS_EVENT_TRANSACTION> tmp; ws_msg_parsed<WS_EVENT_TRANSACTION> tmp;
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> rop; ws_msg_parsed<WS_EVENT_HISTORY> rop;
ts_high = time(NULL)*1000; ts_high = time(NULL)*1000;
ts_low = ts_high - 600*1000; ts_low = ts_high - 600*1000;
query = query::make_pull_query_history_ts(ts_high, ts_low); query = query::make_pull_query_history_ts(ts_high, ts_low);
//std::cout << query << std::endl;
/* Create a non-transactional object. */ /* Create a non-transactional object. */
pqxx::nontransaction N(*C); pqxx::nontransaction N(*C);
@ -178,15 +91,14 @@ namespace db {
for (pqxx::result::const_iterator c = R.begin(); c != R.end(); ++c) { for (pqxx::result::const_iterator c = R.begin(); c != R.end(); ++c) {
rop.push_back(c[0].as<std::string>()); rop.push_back(c[0].as<std::string>());
} }
return rop; return rop;
} }
/* Fetches all transactions by userid during period ts_low to ts_high. */ /* Fetches all transactions by userid during period ts_low to ts_high. */
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_transactions_userid_ts( pqxx::connection *C, std::string userid, long ts_high, long ts_low) { ws_msg_parsed<WS_EVENT_HISTORY> pull_transactions_userid_ts( pqxx::connection *C, std::string userid, long ts_high, long ts_low) {
std::string query; std::string query;
ws_msg_parsed<WS_EVENT_TRANSACTION> tmp; ws_msg_parsed<WS_EVENT_TRANSACTION> tmp;
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> rop; ws_msg_parsed<WS_EVENT_HISTORY> rop;
query = query::make_pull_query_transactions_userid_ts(userid, ts_high, ts_low); query = query::make_pull_query_transactions_userid_ts(userid, ts_high, ts_low);
@ -213,19 +125,21 @@ namespace db {
} }
/* Fetches last cycle's transactions for userid. */ /* Fetches last cycle's transactions for userid. */
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_last_cycle_userid( pqxx::connection *C, std::string userid) { ws_msg_parsed<WS_EVENT_HISTORY> pull_last_cycle_userid( pqxx::connection *C, std::string userid) {
long ts_high, ts_low; long ts_high, ts_low;
ws_msg_parsed<WS_EVENT_HISTORY> rop;
ts_high = time(NULL)*1000; ts_high = time(NULL)*1000;
ts_low = ts_high - 600*1000; ts_low = ts_high - 60*10*1000;
return pull_transactions_userid_ts(C, userid, ts_high, ts_low); rop = pull_transactions_userid_ts(C, userid, ts_high, ts_low);
return rop;
} }
/******************************************************************************** /********************************************************************************
COIN PRICE COIN PRICE
********************************************************************************/ ********************************************************************************/
void push_coin_price( pqxx::connection* C, ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> op) { void push_coin_price( pqxx::connection* C, ws_msg_parsed<WS_EVENT_COIN_PRICE> op) {
std::string query; std::string query;
query = query::make_push_query_coin_price(op); query = query::make_push_query_coin_price(op);

View File

@ -19,12 +19,13 @@ namespace db {
/******************************************************************************** /********************************************************************************
HISTORY HISTORY
********************************************************************************/ ********************************************************************************/
void push_history( pqxx::connection*, ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> ); void push_history( pqxx::connection*, ws_msg_parsed<WS_EVENT_HISTORY> );
void push_uhistory( pqxx::connection*, ws_msg_parsed<WS_EVENT_HISTORY> );
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_last_cycle( pqxx::connection*); ws_msg_parsed<WS_EVENT_HISTORY> pull_last_cycle( pqxx::connection*);
std::vector<std::string> pull_userid_list_ts( pqxx::connection *, long, long); std::vector<std::string> pull_userid_list_ts( pqxx::connection *, long, long);
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_transactions_userid_ts( pqxx::connection *, std::string, long, long); ws_msg_parsed<WS_EVENT_HISTORY> pull_transactions_userid_ts( pqxx::connection *, std::string, long, long);
ws_msg_parsed<WS_EVENT_HISTORY_UPDATE> pull_last_cycle_userid( pqxx::connection *, std::string); ws_msg_parsed<WS_EVENT_HISTORY> pull_last_cycle_userid( pqxx::connection *, std::string);
float pull_last_cycle_slope(pqxx::connection*, std::string); float pull_last_cycle_slope(pqxx::connection*, std::string);
@ -33,7 +34,7 @@ namespace db {
/******************************************************************************** /********************************************************************************
COIN PRICE COIN PRICE
********************************************************************************/ ********************************************************************************/
void push_coin_price( pqxx::connection*, ws_msg_parsed<WS_EVENT_COIN_PRICE_UPDATE> ); void push_coin_price( pqxx::connection*, ws_msg_parsed<WS_EVENT_COIN_PRICE> );
/******************************************************************************** /********************************************************************************
COIN PRICE COIN PRICE
@ -44,6 +45,7 @@ namespace db {
USERS USERS
********************************************************************************/ ********************************************************************************/
void push_users( pqxx::connection*, std::vector<user_t> ); void push_users( pqxx::connection*, std::vector<user_t> );
std::vector<std::string> pull_userids_ts(pqxx::connection *C, long ts_low, long ts_high);
std::string userid_to_username(pqxx::connection*, std::string userid); std::string userid_to_username(pqxx::connection*, std::string userid);
} }
#endif #endif

View File

@ -1,16 +1,33 @@
#include "db_init.h" #include "db_init.h"
namespace db { /*******************************************************************************
Wrapper for single querying
int create_table_history(pqxx::connection& C) { *******************************************************************************/
std::string query;
int single_query(pqxx::connection& C, std::string query) {
try { try {
/* Create a transactional object. */
pqxx::work W(C); pqxx::work W(C);
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
}
/* Create sql query for history table creation */ namespace db {
query = "CREATE TABLE IF NOT EXISTS HISTORY(" \ namespace create {
/*******************************************************************************
Table creation queries
*******************************************************************************/
namespace table {
int history(pqxx::connection& C) {
// TODO: make timestamp the primary key
// make sizes macros
std::string query = "CREATE TABLE IF NOT EXISTS HISTORY(" \
"ID SERIAL PRIMARY KEY NOT NULL," \ "ID SERIAL PRIMARY KEY NOT NULL," \
"COIN CHAR(32) NOT NULL,"\ "COIN CHAR(32) NOT NULL,"\
"TYPE INT,"\ "TYPE INT,"\
@ -19,51 +36,38 @@ int create_table_history(pqxx::connection& C) {
"TIMESTAMP BIGINT NOT NULL,"\ "TIMESTAMP BIGINT NOT NULL,"\
"COMPLETED BOOLEAN,"\ "COMPLETED BOOLEAN,"\
"PRICE REAL);"; "PRICE REAL);";
return single_query(C, query);
/* Execute SQL query */
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
} }
int create_table_coin_price(pqxx::connection& C) { int coin_price(pqxx::connection& C) {
std::string query; std::string query = "CREATE TABLE IF NOT EXISTS COINPRICE(" \
try {
/* Create a transactional object. */
pqxx::work W(C);
/* Create sql query for history table creation */
query = "CREATE TABLE IF NOT EXISTS COINPRICE(" \
"COIN CHAR(32) PRIMARY KEY,"\ "COIN CHAR(32) PRIMARY KEY,"\
"PRICE REAL," \ "PRICE REAL," \
"SALEVALUE REAL," \ "SALEVALUE REAL," \
"INCIRCULATION INT, "\ "INCIRCULATION INT, "\
"LASTUPDATE BIGINT);"; "LASTUPDATE BIGINT);";
return single_query(C, query);
/* Execute SQL query */
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
} }
int create_last_n_anchor_prices_function(pqxx::connection& C) { int users(pqxx::connection& C) {
std::string query; std::string query = "CREATE TABLE IF NOT EXISTS USERS(" \
"USERID CHAR(128) PRIMARY KEY,"\
"USERNAME TEXT NOT NULL,"\
"ICON CHAR(32) NOT NULL,"\
"NETWORTH REAL);";
return single_query(C, query);
try { }
/* Create a transactional object. */ } // db::create::table
pqxx::work W(C);
/*******************************************************************************
Function creation queries
*******************************************************************************/
namespace function {
int last_n_anchor_prices_function(pqxx::connection& C) {
/* Create sql query for spaced prices function, delta is in seconds*/ /* Create sql query for spaced prices function, delta is in seconds*/
query = "CREATE OR REPLACE FUNCTION last_n_spaced_prices(var_coin CHAR(32), var_n INT, var_delta INT) RETURNS SETOF HISTORY AS $$" \ std::string query = "CREATE OR REPLACE FUNCTION last_n_spaced_prices(var_coin CHAR(32), var_n INT, var_delta INT) RETURNS SETOF HISTORY AS $$" \
"DECLARE "\ "DECLARE "\
" row HISTORY%ROWTYPE; "\ " row HISTORY%ROWTYPE; "\
" cur_date BIGINT = 9223372036854775806; "\ " cur_date BIGINT = 9223372036854775806; "\
@ -90,48 +94,19 @@ int create_last_n_anchor_prices_function(pqxx::connection& C) {
" END LOOP; "\ " END LOOP; "\
"END; "\ "END; "\
"$$ LANGUAGE plpgsql; "; "$$ LANGUAGE plpgsql; ";
return single_query(C, query);
/* Execute SQL query */
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
} }
} // db::create::function
int create_table_users(pqxx::connection& C) { /*******************************************************************************
std::string query; Database creation query
*******************************************************************************/
namespace database{
int all(void) {
try { try {
/* Create a transactional object. */ // TODO: GLOBAL INFO
pqxx::work W(C); pqxx::connection C("dbname = nasfaq user = steaky hostaddr = 127.0.0.1 port = 5432");
/* Create sql query for history table creation */
query = "CREATE TABLE IF NOT EXISTS USERS(" \
"USERID CHAR(128) PRIMARY KEY,"\
"USERNAME TEXT NOT NULL,"\
"ICON CHAR(32) NOT NULL,"\
"NETWORTH REAL);";
/* Execute SQL query */
W.exec( query );
W.commit();
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
}
int populate() {
try {
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) { if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl; std::cout << "Opened database successfully: " << C.dbname() << std::endl;
@ -140,13 +115,11 @@ int populate() {
return 1; return 1;
} }
/* Create tables. */ table::history(C);
create_table_history(C); table::coin_price(C);
create_table_coin_price(C); table::users(C);
create_table_users(C);
/* Create functions. */ function::last_n_anchor_prices_function(C);
create_last_n_anchor_prices_function(C);
} catch (const std::exception &e) { } catch (const std::exception &e) {
std::cerr << e.what() << std::endl; std::cerr << e.what() << std::endl;
@ -154,4 +127,6 @@ int populate() {
} }
return 0; return 0;
} }
} // db::create::database
} // db::create
} }

View File

@ -3,16 +3,9 @@
#include <iostream> #include <iostream>
#include <pqxx/pqxx> #include <pqxx/pqxx>
#include <fmt/core.h>
#include <ctime>
#include "../http/users.h" namespace db::create::database {
int all(void);
namespace db {
int create_table_history(pqxx::connection&);
int create_table_coin_price(pqxx::connection&);
int create_table_users(pqxx::connection&);
int populate();
} }
#endif #endif

View File

@ -0,0 +1,82 @@
#include "queries.h"
namespace db::queries::fmt {
std::string p_transaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op ) {
return fmt::format(p_transaction,
op.coin,
op.type,
op.userid,
op.quantity,
op.timestamp,
op.completed,
op.price);
}
std::string p_utransaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op ) {
return fmt::format(p_utransaction,
op.coin,
op.type,
op.userid,
op.quantity,
op.timestamp,
op.completed,
op.price,
op.timestamp);
}
std::string f_history_ts( long ts_high, long ts_low ) {
return fmt::format(f_history_ts, ts_high, ts_low);
}
std::string f_userid_list_ts( long ts_high, long ts_low ) {
return fmt::format(f_userid_ts, ts_high, ts_low);
}
std::string f_transactions_userid_ts( std::string userid, long ts_high, long ts_low ) {
return fmt::format(f_transactions_userid_ts, userid, ts_high, ts_low);
}
std::string f_top_price_cycle_ts( std::string coin, long ts_high, long ts_low ) {
return fmt::format(f_slope_ts_top, coin, ts_high, ts_low);
}
std::string f_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low ) {
return fmt::format(f_slope_ts_bot, coin, ts_high, ts_low);
}
std::string p_coin_price( ws_msg_parsed<WS_EVENT_COIN_PRICE> op ) {
return fmt::format(p_coin_price,
op.coin,
op.price,
op.saleValue,
op.inCirculation,
std::time(NULL)*1000,
op.price,
op.saleValue,
op.inCirculation,
std::time(NULL)*1000);
}
/* Query templates for math functions. */
std::string f_last_n_spaced_prices( std::string coin, int nb_cycles, int delta) {
return fmt::format(f_last_n_spaced_prices,
coin,
nb_cycles,
delta);
}
std::string p_user(user_t user){
return fmt::format(p_user,
user.userid,
user.username,
user.icon,
user.networth,
user.networth);
}
std::string f_query_userid_to_username(std::string userid) {
return fmt::format(f_userid_to_username,
userid);
}
} // db::queries::fmt

75
connections/sql/queries.h Normal file
View File

@ -0,0 +1,75 @@
#ifndef _QUERIES_H_
#define _QUERIES_H_
#include "../common/common.h"
namespace db {
namespace query {
namespace tmplt {
/* Push a user in USERS */
const std::string p_user = "INSERT INTO USERS(USERID, USERNAME, ICON, NETWORTH) "\
"VALUES('{}', E'{}', '{}', {}) "
"ON CONFLICT (USERID) DO UPDATE SET "\
"NETWORTH = {};";
/* Push a transaction in HISTORY */
const std::string p_transaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\
"VALUES('{}', {}, '{}', {}, {}, {}, {});";
/* Push a transaction in HISTORY with unicity check (for db completion) */
const std::string p_utransaction = "INSERT INTO HISTORY(COIN, TYPE, USERID, QUANTITY, TIMESTAMP, COMPLETED, PRICE) "\
"SELECT '{}', {}, '{}', {}, {}, {}, {} " \
"WHERE NOT EXISTS (SELECT TIMESTAMP FROM HISTORY WHERE TIMESTAMP = {} limit 1);";
/* Update coin price. */
const std::string p_coin_price = "INSERT INTO COINPRICE(COIN, PRICE, SALEVALUE, INCIRCULATION, LASTUPDATE) "\
"VALUES('{}', {}, {}, {}, {}) "\
"ON CONFLICT (COIN) DO UPDATE SET "\
"PRICE = {}, SALEVALUE = {}, INCIRCULATION = {}, LASTUPDATE = {};";
const std::string f_history_ts = "SELECT * FROM HISTORY "\
"WHERE TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string f_userid_ts = "SELECT DISTINCT USERID FROM HISTORY " \
"WHERE TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string f_slope_ts_top = "SELECT PRICE, TIMESTAMP FROM HISTORY "\
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\
"ORDER BY TIMESTAMP ASC "\
"LIMIT 1;";
const std::string f_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\
"ORDER BY TIMESTAMP DESC "\
"LIMIT 1;";
const std::string f_transactions_userid_ts = "SELECT * FROM HISTORY "\
"WHERE USERID = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {};";
const std::string f_last_n_spaced_prices = "select PRICE, TIMESTAMP FROM last_n_spaced_prices('{}', {}, {});";
const std::string f_id_to_username = "select USERNAME FROM USERS WHERE USERID = '{}'";
} // db::query::tmpt
namespace db::queries::fmt {
std::string p_transaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op );
std::string p_utransaction( ws_msg_parsed<WS_EVENT_TRANSACTION> op );
std::string p_coin_price( ws_msg_parsed<WS_EVENT_COIN_PRICE> op );
std::string p_user(user_t user);
std::string f_history_ts( long ts_high, long ts_low );
std::string f_userid_list_ts( long ts_high, long ts_low );
std::string f_transactions_userid_ts( std::string userid, long ts_high, long ts_low );
std::string f_top_price_cycle_ts( std::string coin, long ts_high, long ts_low );
std::string f_bot_price_cycle_ts( std::string coin, long ts_high, long ts_low );
std::string f_last_n_spaced_prices( std::string coin, int nb_cycles, int delta);
std::string f_query_userid_to_username(std::string userid);
} // db::query::fmt
#endif

View File

@ -0,0 +1,8 @@
g++ ../../common/common.cpp \
../../aux/complete_db.cpp \
../../http/http_connector.cpp \
../../http/users.cpp \
../../sql/db_init.cpp \
../../sql/db_handle.cpp \
./test_complete_db.cpp \
-o test_complete_db -lpqxx -lpq -lfmt -lcurl -lcrypto -lssl -lboost_system -lboost_random -lboost_thread -lpthread && ./test_complete_db

Binary file not shown.

View File

@ -0,0 +1,9 @@
#include "test_complete_db.h"
int main(void) {
long ts_up = 1655503200000;
unsigned int days = 30;
complete_db(ts_up, days, false);
return 1;
}

View File

@ -0,0 +1,2 @@
#include "../../aux/complete_db.h"

BIN
connections/test/db_init Executable file

Binary file not shown.

Binary file not shown.

View File

@ -2,10 +2,53 @@
#include <graphviz/cgraph.h> #include <graphviz/cgraph.h>
#include "../common/common.h" #include "../common/common.h"
#include "../sql/db_init.h"
#include "../sql/db_handle.h"
#include "../../maths/maths.h"
#include "../../maths/graph.h" #include "../../maths/graph.h"
std::map<std::string, std::map<std::string, float>> make_map(pqxx::connection *C) {
// Timestamps
long ts_high = time(NULL)*1000;
long ts_low = ts_high - 60*10*1000;
float thresh = 10;
/* Create graph (test zone) */
std::map<std::string, std::map<std::string, float>> res;
std::vector<std::string> userids = db::pull_userid_list_ts(C, ts_high, ts_low);
std::cout << "Number of userids active during delta_ts: " << userids.size() << std::endl;
res = norm::diff_map(C, userids, ts_high, ts_low, thresh);
return res;
}
void make_json(pqxx::connection *C, std::map<std::string, std::map<std::string, float>> op) {
norm::diff_save_json(C, op);
}
void make_graph(std::map<std::string, std::map<std::string, float>> op) {
make_graph_from_map(op);
}
int main(void) { int main(void) {
test(); /* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
}
std::map<std::string, std::map<std::string, float>> res;
res = make_map(&C);
make_json(&C, res);
//make_graph(res);
return 1; return 1;
} }

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,7 @@
g++ ../common/common.cpp \ g++ ../common/common.cpp \
../sql/db_init.cpp \
../sql/db_handle.cpp \
../../maths/graph.cpp \ ../../maths/graph.cpp \
../../maths/maths.cpp \
graph.cpp \ graph.cpp \
-L/usr/lib -lgvc -lcgraph -lfmt -lpqxx -lpq -o graph && ./graph -L/usr/lib -lgvc -lcgraph -lfmt -lpqxx -lpq -o graph && ./graph

0
connections/test/log.txt Normal file
View File

Binary file not shown.

View File

@ -2,7 +2,6 @@
#include "../http/http_connector.h" #include "../http/http_connector.h"
#include "../http/users.h" #include "../http/users.h"
#include "../ws/http_handshake.h" #include "../ws/http_handshake.h"
#include "../safe_queue/safe_queue.h"
#include "../ws/ssl_ws.h" #include "../ws/ssl_ws.h"
#include "../sql/db_init.h" #include "../sql/db_init.h"
#include "../sql/db_handle.h" #include "../sql/db_handle.h"
@ -36,7 +35,7 @@ int connect_ws() {
} }
/* Populate database if needed. */ /* Populate database if needed. */
db::populate(); db::create::database::all();
/* Open up database connection. */ /* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \ pqxx::connection C("dbname = nasfaq user = steaky \

View File

@ -0,0 +1,29 @@
#include "../common/common.h"
#include "../sql/db_init.h"
int connect_db() {
bool done = false;
std::string input;
/* Populate database if needed. */
db::create::database::all();
/* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
return 1;
}
return 0;
}
int main(void) {
connect_db();
return 1;
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -4,57 +4,76 @@ import networkx as nx
import pyvis import pyvis
import json import json
THRESHOLD = 10 MAX_DISPLAY = 3
THRESHOLD = 5
def make_net(N, json_path): fun = lambda x : 10-x
with open(json_path, 'r') as f:
json_path = "graph.json"
net = Network(height='980px', width='100%', bgcolor='#222222', font_color='white')
net.barnes_hut()
with open(json_path, 'r') as f:
data = json.load(f) data = json.load(f)
weighted_edges = [] sources, targets, weights = [], [], []
for parent, element in data.items():
for parent, element in data.items():
tmp_targets, tmp_weights = [], []
for child, weight in element.items(): for child, weight in element.items():
if parent == child: pass if parent == child: pass
elif weight > THRESHOLD: pass
else: else:
weighted_edges.append((parent, child, round(weight, 1))) adj_weight = fun(weight)
net.add_node(parent, parent, title = parent)
net.add_node(child, child, title = child)
# Add edges tmp_targets.append(child)
N.add_weighted_edges_from(weighted_edges) tmp_weights.append( round(adj_weight, 2) )
def nudge(pos, x_shift, y_shift): s1, s2 = (list(reversed(t)) for t in zip(*sorted(zip(tmp_weights, tmp_targets))))
return {n:(x + x_shift, y + y_shift) for n,(x,y) in pos.items()} for i in range(min(MAX_DISPLAY, len(s2))):
w = round(s1[i], 2)
if w >= THRESHOLD:
net.add_edge(parent, s2[i], value = w, title = s1[i])
net = nx.Graph() neighbor_map = net.get_adj_list()
make_net(net, "graph.json") edges = net.get_edges()
nodes = net.get_nodes()
pos = nx.circular_layout(net) N_nodes = len(nodes)
N_edges = len(edges)
nx.draw_networkx( weights=[[] for i in range(N_nodes)]
net, pos, edge_color='black', width=1, linewidths=1,
node_size=100, node_color='pink', alpha=0.9,
with_labels = False)
#labels={node: node for node in net.nodes()}
edge_labels = dict([((n1, n2), net[n1][n2]['weight']) #Associating weights to neighbors
for n1, n2 in net.edges]) for i in range(N_nodes): #Loop through nodes
for neighbor in neighbor_map[nodes[i]]: #and neighbors
for j in range(N_edges): #associate weights to the edge between node and neighbor
if (edges[j]['from']==nodes[i] and edges[j]['to']==neighbor) or \
(edges[j]['from']==neighbor and edges[j]['to']==nodes[i]):
weights[i].append(edges[j]['value'])
#nx.draw_networkx_edge_labels( for node,i in zip(net.nodes,range(N_nodes)):
# net, pos,
# edge_labels=edge_labels,
# font_color='red'
#)
node['value']=len(neighbor_map[node['id']])
node['weight']=[str(weights[i][k]) for k in range(len(weights[i]))]
list_neighbor=list(neighbor_map[node['id']])
pretty_net = Network(height='750px', width='100%', bgcolor='#222222', font_color='white') #Sort by score
pretty_net.barnes_hut() w_list = [node['weight'][k] for k in range(node['value'])]
pretty_net.from_nx(net) n_list = [list_neighbor[k] for k in range(node['value'])]
pretty_net.show_buttons(filter_=['physics'])
try:
s_weights, s_neighbors = (list(t) for t in zip(*sorted(zip(w_list, n_list))))
#Concatenating neighbors and weights
hover_str=[s_neighbors[k]+' '+ s_weights[k] for k in range(node['value'])]
#Setting up node title for hovering
node['title']+=' Neighbors:<br>'+'<br>'.join(hover_str)
except ValueError:
pass
pretty_net.show("graph.html") net.toggle_physics(True)
net.show("graph.html")
#plt.axis('off')
#plt.show()

View File

@ -1,10 +1,11 @@
g++ ../safe_queue/safe_queue.cpp \ g++ ../common/common.cpp \
../common/common.cpp \ ../common/formatting.cpp \
../http/http_connector.cpp \ ../http/http_connector.cpp \
../http/users.cpp \ ../http/users.cpp \
../ws/ssl_ws.cpp \ ../ws/ssl_ws.cpp \
../sql/db_init.cpp \ ../sql/db_init.cpp \
../sql/db_handle.cpp \ ../sql/db_handle.cpp \
../parser/parser_aux.cpp \
../parser/parser.cpp \ ../parser/parser.cpp \
../ws/http_handshake.cpp \ ../ws/http_handshake.cpp \
./main.cpp \ ./main.cpp \

View File

@ -0,0 +1,4 @@
g++ ../common/common.cpp \
../sql/db_init.cpp \
./main_db_init.cpp \
-L/usr/lib -lgvc -lcgraph -lfmt -lpqxx -lpq -o db_init && ./db_init

Binary file not shown.

View File

@ -1,62 +0,0 @@
#include "../common/common.h"
#include "../sql/db_init.h"
#include "../sql/db_handle.h"
#include "../../maths/maths.h"
#include "../../maths/graph.h"
int connect_db() {
bool done = false;
std::string input;
/* Populate database if needed. */
db::populate();
/* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
return 1;
}
/* Loop and print data through parser*/
while(!done) {
input = std::cin.get();
if(input == "q" || input == "quit") {
done = true;
} else if (input == "t") {
std::string coin = "miko";
//float s = db::pull_last_cycle_slope(&C, coin);
float ws = optimizer::get_last_n_weighted_slope(&C, coin, 2, 600);
std::cout << "slope: " << ws << std::endl;
} else if (input == "d") {
long ts_low, ts_high;
float threshold;
std::vector<std::string> userids;
std::map<std::string, std::map<std::string, float>> result;
ts_high = time(NULL)*1000;
ts_low = ts_high - 600*1000;
threshold = 10;
userids = db::pull_userid_list_ts(&C, ts_high, ts_low);
result = norm::diff_map(&C, userids, ts_high, ts_low, threshold);
//make_graph_from_map(result);
norm::diff_save_json(&C, result);
}
}
return 0;
}
int main(void) {
connect_db();
return 1;
}

View File

@ -1,7 +0,0 @@
g++ ../common/common.cpp \
../sql/db_init.cpp \
../sql/db_handle.cpp \
../../maths/maths.cpp \
../../maths/graph.cpp \
sql.cpp \
-L/usr/lib -lgvc -lcgraph -lfmt -lpqxx -lpq -o sql && ./sql

View File

@ -6,8 +6,6 @@ namespace ws {
static const char *POLLING_URL_0 = "https://nasfaq.biz/socket/?EIO=4&transport=polling&t=Ny7z439"; static const char *POLLING_URL_0 = "https://nasfaq.biz/socket/?EIO=4&transport=polling&t=Ny7z439";
static const char *POLLING_URL_1 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z472"; static const char *POLLING_URL_1 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z472";
static const char *POLLING_URL_2 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z4Bn&sid="; static const char *POLLING_URL_2 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z4Bn&sid=";
static const char *POLLING_URL_3 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z4Bp&sid=";
static const char *POLLING_URL_4 = "https://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=polling&t=Ny7z4EU&sid=";
nlohmann::json json_ret; nlohmann::json json_ret;
std::string sid; std::string sid;
@ -24,10 +22,6 @@ namespace ws {
/* Post "40" acknowledgement with sid. */ /* Post "40" acknowledgement with sid. */
c.post(POLLING_URL_2 + sid, "40"); c.post(POLLING_URL_2 + sid, "40");
/* Continue handshake (might be unneeded). See XHR trace. */
//c.get(POLLING_URL_3 + sid);
//c.get(POLLING_URL_4 + sid);
return sid; return sid;
} }
} }

View File

@ -67,6 +67,7 @@ namespace ws {
} }
} else if (payload.substr(0, 2) == "42") { } else if (payload.substr(0, 2) == "42") {
// Raw messages are of the form "42[PAYLOAD]"
std::string payload_format = payload.substr(3, payload.length() - 4); std::string payload_format = payload.substr(3, payload.length() - 4);
push_message(payload_format); push_message(payload_format);
} }
@ -85,11 +86,6 @@ namespace ws {
return m_status; return m_status;
} }
/**
Should it use the cdt variable?
If it doesn't lock this thread up maybe, otherwise it should be defined externally.
Isn't the connection in another thread anyways?
*/
bool connection_metadata::msg_queue_empty(void) { bool connection_metadata::msg_queue_empty(void) {
return m_msg_q.empty(); return m_msg_q.empty();
} }
@ -250,8 +246,17 @@ namespace ws {
std::cout << "> Error sending message: " << ec.message() << std::endl; std::cout << "> Error sending message: " << ec.message() << std::endl;
return; return;
} }
}
// metadata_it->second->record_sent_message(message); // TODO This should read to a buffer (void function)
std::string websocket_endpoint::read(int id) const {
con_list::const_iterator metadata_it = m_connection_list.find(id);
if(metadata_it == m_connection_list.end()) {
std::cout << "> No connection found with id " << id << std::endl;
return "";
}
return metadata_it->second->pop_message();
} }
connection_metadata::ptr websocket_endpoint::get_metadata(int id) const { connection_metadata::ptr websocket_endpoint::get_metadata(int id) const {
@ -262,12 +267,5 @@ namespace ws {
return metadata_it->second; return metadata_it->second;
} }
} }
std::string websocket_endpoint::get_queue_front(int id) const {
con_list::const_iterator metadata_it = m_connection_list.find(id);
if(metadata_it == m_connection_list.end()) {
return "";
} else {
return metadata_it->second->pop_message();
}
}
} }

View File

@ -1,23 +1,13 @@
#ifndef _SSL_WS_H_ #ifndef _SSL_WS_H_
#define _SSL_WS_H_ #define _SSL_WS_H_
#include <iostream>
#include <string>
#include <stdlib.h>
//#include "../parser/parser.h"
#include "../common/common.h" #include "../common/common.h"
#include "../safe_queue/safe_queue.h"
#include <websocketpp/config/asio_client.hpp> #include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp> #include <websocketpp/client.hpp>
#include <websocketpp/common/thread.hpp> #include <websocketpp/common/thread.hpp>
#include <websocketpp/common/memory.hpp> #include <websocketpp/common/memory.hpp>
#include <nlohmann/json.hpp>
namespace ws { namespace ws {
typedef websocketpp::client<websocketpp::config::asio_tls_client> client; typedef websocketpp::client<websocketpp::config::asio_tls_client> client;
typedef std::shared_ptr<boost::asio::ssl::context> context_ptr; typedef std::shared_ptr<boost::asio::ssl::context> context_ptr;
@ -49,7 +39,6 @@ namespace ws {
std::string m_uri; std::string m_uri;
std::string m_server; std::string m_server;
std::string m_error_reason; std::string m_error_reason;
//std::vector<std::string> m_messages;
client *m_endpoint; client *m_endpoint;
std::queue<std::string> m_msg_q; std::queue<std::string> m_msg_q;
mutable std::mutex m_msg_q_mutex; mutable std::mutex m_msg_q_mutex;
@ -68,7 +57,7 @@ namespace ws {
void close(int, websocketpp::close::status::value, std::string); void close(int, websocketpp::close::status::value, std::string);
void send(int, std::string); void send(int, std::string);
connection_metadata::ptr get_metadata(int) const; connection_metadata::ptr get_metadata(int) const;
std::string get_queue_front(int) const; std::string read(int) const;
private: private:
typedef std::map<int, connection_metadata::ptr> con_list; typedef std::map<int, connection_metadata::ptr> con_list;
client m_endpoint; client m_endpoint;

View File

@ -0,0 +1,90 @@
#include "../../common/common.h"
#include "../../http/http_connector.h"
#include "../../ws/http_handshake.h"
#include "../../ws/ssl_ws.h"
int connect_ws() {
bool done = false;
std::string input;
ws::websocket_endpoint endpoint;
/* Get session id */
std::string sid = ws::http_handshake::get_sid();
/* Start connection */
std::string uri = "wss://nasfaq.biz/socket/?user=314d0bda-d7f0-4636-aed7-5ea02743604b&EIO=4&transport=websocket&sid=" + sid;
int id = endpoint.connect(uri);
if(id != -1) {
std::cout << ">Created nasfaq websocket connection" << std::endl;
} else {
return -1;
}
sleep(1);
/* Display connection metadata */
ws::connection_metadata::ptr metadata = endpoint.get_metadata(id);
if (metadata) {
std::cout << *metadata << std::endl;
} else {
std::cout << ">Unknown connection id " << id << std::endl;
}
/* Populate database if needed. */
db::populate();
/* Open up database connection. */
pqxx::connection C("dbname = nasfaq user = steaky \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
std::cout << "Opened database successfully: " << C.dbname() << std::endl;
} else {
std::cout << "Can't open database" << std::endl;
return 1;
}
/* Update users. */
db::push_users(&C, users::get_users());
/* Set up parser */
parser::parser p(metadata, &C);
/* Loop and print data through parser*/
while(!done) {
input = std::cin.get();
if(input == "q" || input == "quit") {
done = true;
} else if (input == "p") {
std::cout << "PROCESSING QUEUE " << std::endl;
p.process_queue_start();
} else if (input == "s") {
std::cout << "STOPPING QUEUE PROCESSING" << std::endl;
p.process_queue_stop();
} else if (input == "t") {
std::cout << "TEST" << std::endl;
auto ret = db::pull_last_cycle(&C);
std::cout << ret << std::endl;
}
}
/* Close websocket */
std::stringstream ss(input);
std::string cmd;
int close_code = websocketpp::close::status::normal;
std::string reason;
ss >> cmd >> id >> close_code;
std::getline(ss, reason);
endpoint.close(id, close_code, reason);
return 0;
}
int main(void) {
connect_ws();
return 1;
}

11
connections/ws/test/run.sh Executable file
View File

@ -0,0 +1,11 @@
g++ ../safe_queue/safe_queue.cpp \
../common/common.cpp \
../http/http_connector.cpp \
../http/users.cpp \
../ws/ssl_ws.cpp \
../sql/db_init.cpp \
../sql/db_handle.cpp \
../parser/parser.cpp \
../ws/http_handshake.cpp \
./main.cpp \
-o main -lpqxx -lpq -lfmt -lcurl -lcrypto -lssl -lboost_system -lboost_random -lboost_thread -lpthread && ./main

View File

@ -1,5 +1,7 @@
#include "maths.h" #include "maths.h"
#define DIFF_ALL_MIN_CYCLE_SIZE 0
namespace query { namespace query {
const std::string query_template_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\ const std::string query_template_slope_ts_bot = "SELECT PRICE, TIMESTAMP FROM HISTORY "\
"WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\ "WHERE COIN = '{}' AND TIMESTAMP <= {} AND TIMESTAMP > {} "\
@ -56,36 +58,63 @@ namespace norm {
return rop; return rop;
} }
std::map<std::string, float> diff_all(pqxx::connection *C, cycle_t base_cycle, std::map<std::string, cycle_t> cycles, std::vector<std::string> userid_list, std::string userid, long ts_high, long ts_low, float threshold) { std::map<std::string, float> diff_all(
std::map<std::string, float> result; pqxx::connection *C,
cycle_t tmp1, tmp2; cycle_t base_cycle,
float tmp_res; std::map<std::string, cycle_t> cycles,
long ts_high,
long ts_low,
float threshold){
for(auto const& id : userid_list) { std::map<std::string, float> result;
tmp2 = cycles.find(id)->second; std::string tmp_uid;
if (cycle_size(tmp2) >= 10) { cycle_t tmp_c;
tmp_res = norm(diff_vector(tmp1, tmp2)); float tmp_f;
if (tmp_res < threshold) {
result[id] = tmp_res; for(std::map<std::string,cycle_t>::iterator iter = cycles.begin(); iter != cycles.end(); ++iter) {
std::cout << "Difference between " << userid << " and " << id << " : " << result[id] << std::endl; tmp_uid = iter->first;
} tmp_c = iter->second;
tmp_f = norm(diff_vector(base_cycle, tmp_c));
if(tmp_f && tmp_f < threshold){
result.insert(std::pair<std::string, float>(tmp_uid, tmp_f));
} }
} }
return result; return result;
} }
std::map<std::string, std::map<std::string, float>> diff_map(pqxx::connection *C, std::vector<std::string> userids, long ts_high, long ts_low, float threshold){ std::map<std::string, std::map<std::string, float>> diff_map(
pqxx::connection *C,
std::vector<std::string> userids,
long ts_high,
long ts_low,
float threshold){
std::map<std::string, std::map<std::string, float>> result; std::map<std::string, std::map<std::string, float>> result;
std::map<std::string, cycle_t> cycles; std::map<std::string, cycle_t> cycles;
std::map<std::string, float> tmp; std::map<std::string, float> tmp;
cycle_t tmp_c;
int c_size;
unsigned int pos;
std::string userid;
/* Prepare cycles from db. */ /* Prepare cycles from db. */
for(auto const& id : userids) { pos = 0;
cycles.insert( std::pair<std::string, cycle_t>(id, history_to_cycle(db::pull_last_cycle_userid(C, id))) ); for(auto const& userid:userids) {
tmp_c = history_to_cycle(db::pull_last_cycle_userid(C, userid));
c_size = cycle_size(tmp_c);
/* Only keep players with large cycles */
if (c_size >= DIFF_ALL_MIN_CYCLE_SIZE) {
cycles.insert(std::pair<std::string, cycle_t>(userid, tmp_c));
}
pos++;
} }
for (auto& userid:userids) { /* Compute norms and diffs */
tmp = norm::diff_all(C, cycles[userid], cycles, userids, userid, ts_high, ts_low, threshold); for(std::map<std::string,cycle_t>::iterator iter = cycles.begin(); iter != cycles.end(); ++iter) {
userid = iter->first;
tmp = norm::diff_all(C, cycles[userid], cycles, ts_high, ts_low, threshold);
result.insert( std::pair<std::string, std::map<std::string, float>>( userid, tmp )); result.insert( std::pair<std::string, std::map<std::string, float>>( userid, tmp ));
} }

View File

@ -14,14 +14,32 @@ namespace norm {
float norm(cycle_t); float norm(cycle_t);
cycle_t history_to_cycle(ws_msg_parsed<WS_EVENT_HISTORY_UPDATE>); cycle_t history_to_cycle(ws_msg_parsed<WS_EVENT_HISTORY_UPDATE>);
cycle_t diff_vector(cycle_t, cycle_t); cycle_t diff_vector(cycle_t, cycle_t);
std::map<std::string, float> diff_all(pqxx::connection *C, cycle_t base_cycle, std::map<std::string, cycle_t> cycles, std::vector<std::string> userid_list, std::string userid, long ts_high, long ts_low, float threshold); std::map<std::string, float> diff_all(
std::map<std::string, std::map<std::string, float>> diff_map(pqxx::connection *C, std::vector<std::string> userid_list, long, long, float); pqxx::connection *C,
void diff_save_json( pqxx::connection *, std::map<std::string, std::map<std::string, float>>); cycle_t base_cycle,
std::map<std::string, cycle_t> cycles,
std::vector<std::string> userid_list,
long ts_high,
long ts_low,
float threshold);
std::map<std::string, std::map<std::string, float>> diff_map(
pqxx::connection *C,
std::vector<std::string> userid_list,
long,
long,
float);
void diff_save_json(
pqxx::connection *,
std::map<std::string,std::map<std::string, float>>);
} }
namespace optimizer{ namespace optimizer{
std::tuple<long, long> get_last_nth_cycle_ts(int); std::tuple<long, long> get_last_nth_cycle_ts(int);
float get_last_n_weighted_slope(pqxx::connection*, std::string, int, int); float get_last_n_weighted_slope(
pqxx::connection*,
std::string,
int,
int);
} }
#endif #endif

Binary file not shown.

View File

@ -0,0 +1,7 @@
import json
USERID = "314d0bda-d7f0-4636-aed7-5ea02743604b"
USERID_P1 = "10dcbbd5-f761-44f3-92ce-ea5b01f1d747"
USERID_P2 = "193ecdf2-689e-40da-b245-d28db3a02d2b"
URL_API = "https://nasfaq.biz/api/"
COINS = ['aki', 'amelia', 'aqua', 'ayame', 'azki', 'botan', 'calliope', 'choco', 'civia', 'coco', 'flare', 'fubuki', 'gura', 'haato', 'himemoriluna', 'hololive', 'inanis', 'iofi', 'kanata', 'kiara', 'korone', 'lamy', 'marine', 'matsuri', 'mel', 'melfissa', 'miko', 'mio', 'moona', 'nana', 'nene', 'noel', 'okayu', 'ollie', 'pekora', 'polka', 'reine', 'risu', 'roboco', 'rushia', 'shion', 'sora', 'subaru', 'suisei', 'towa', 'ui', 'watame', 'laplus', 'lui', 'koyori', 'chloe', 'iroha', 'irys', 'sana', 'fauna', 'kronii', 'mumei', 'baelz']

View File

@ -0,0 +1,35 @@
import psycopg2
from common import *
import psycopg2.extras
def get_db_connection():
return psycopg2.connect(cursor_factory=psycopg2.extras.RealDictCursor,
database="nasfaq", user = "steaky", host = "127.0.0.1", port = "5432")
def fetch_history(cursor, ts_low, ts_high, coins = COINS, userids = None):
if coins == None: return {}
query = "SELECT * FROM HISTORY WHERE (TIMESTAMP > {} AND TIMESTAMP <= {}) AND (".format(ts_low, ts_high)
# Filter by coins
for i in range(len(coins)):
if i == len(coins) - 1:
query += "COIN = '{}' )".format(coins[i])
else:
query += "COIN = '{}' OR ".format(coins[i])
# Filter by userids
if userids: query += "AND ("
for i in range(len(userids)):
if i == len(userids) - 1:
query += "userid = '{}' )".format(userids[i])
else:
query += "userid = '{}' OR ".format(userids[i])
query += ";"
print(query)
cursor.execute(query)
res = cursor.fetchall()
return res

View File

@ -0,0 +1,29 @@
from common import *
import requests
def fetch_coin_prices():
url = URL_API + "getMarketInfo?all"
r = requests.get(url)
return r.json()['coinInfo']['data']
def fetch_coin_history():
url = URL_API + "getMarketInfo?all&history"
r = requests.get(url)
return r.json()['coinInfo']['data']
def fetch_coin_qty_all():
rop = {}
url = URL_API + "getUserWallet?userid=" + USERID
r = requests.get(url)
for key, item in r.json()['wallet']['coins'].items():
rop[key] = item['amt']
return rop
def fetch_balance():
url = URL_API + "getUserWallet?userid=" + USERID
r = requests.get(url)
return r.json()['wallet']['balance']

50
plots/coin_prices/main.py Normal file
View File

@ -0,0 +1,50 @@
import matplotlib.pyplot as plt
import matplotlib.dates as md
import datetime as dt
import time
import random
from common import *
from db_handle import *
from http_handle import *
if __name__ == '__main__':
C = get_db_connection()
cur = C.cursor()
## Fetch data
days = 0
delta = 30
ts_high = time.mktime(time.localtime()) * 1000 - days*24*60*60*1000
ts_low = ts_high - delta*60*60*24*1000
coins = ['subaru']
userids = ['c9b0830c-8cf7-4e1d-b8f3-60bbc85160fa', '990400b5-f0e2-4b91-8aa6-5a74dc16bcc5']
hist = fetch_history(cur, ts_low, ts_high, coins, userids)
# Analyse data
T0, T1, Y0, Y1 = [], [], [], []
for tr in hist:
if tr['userid'][:5] == userids[0][:5]:
Y0.append(tr["quantity"])
T0.append(tr["timestamp"]// 1000)
else:
Y1.append(tr["quantity"])
T1.append(tr["timestamp"]// 1000)
dates0 = [dt.datetime.fromtimestamp(ts) for ts in T0]
X0 = md.date2num(dates0)
dates1 = [dt.datetime.fromtimestamp(ts) for ts in T1]
X1 = md.date2num(dates1)
ax=plt.gca()
xfmt = md.DateFormatter('%H:%M:%S')
ax.xaxis.set_major_formatter(xfmt)
# Plot data
plt.plot(X1, Y1, "r+")
plt.plot(X0, Y0, "b+")
plt.show()

View File

@ -0,0 +1 @@
SELECT * FROM HISTORY WHERE (TIMESTAMP > 1655399889000.0 AND TIMESTAMP <= 1655486289000.0) AND (COIN = 'civia' )AND (userid = '10dcbbd5-f761-44f3-92ce-ea5b01f1d747' OR userid = '193ecdf2-689e-40da-b245-d28db3a02d2b' );