Commit c5c8e088 authored by wozz's avatar wozz

update for numblocks subscription

parent dc91efe3
......@@ -28,6 +28,8 @@ func (scm *stratum_command_msg) FromJsonString(in_string string) {
type stratum_command_resp struct {
Id int `json:"id"`
Result interface{} `json:"result"`
Params interface{} `json:"params"`
Method string `json:"method"`
}
func (scr *stratum_command_resp) FromJsonString(in_string string) {
......@@ -43,6 +45,7 @@ type incoming_conn struct {
Conn net.Conn
MsgIds map[int]stratum_command_msg
ConnId int
Subscriptions map[string]struct{}
}
type cache_resp struct {
......@@ -96,6 +99,7 @@ func GetCacheKey(i interface{}) (string, bool) {
func CacheManager(ch *CommHub) {
TransactionCache := make(map[interface{}]string)
UnspentCache := make(map[interface{}]unspent_cache)
NumBlocks := 0
for {
select {
case cr := <-ch.CacheReq:
......@@ -115,7 +119,7 @@ func CacheManager(ch *CommHub) {
if keyok {
cached_data, ok := UnspentCache[key]
if ok {
log.Println("Data in Cache:", cached_data.Data)
log.Println("Data in Cache:", strings.TrimSpace(cached_data.Data))
if cached_data.ExpTime < int(time.Now().Unix()) {
log.Println("Data in Cache Expired:", int(time.Now().Unix()) - cached_data.ExpTime)
} else {
......@@ -123,6 +127,16 @@ func CacheManager(ch *CommHub) {
}
}
}
case "blockchain.numblocks.subscribe":
sr := stratum_command_resp{
Id: 1,
Result: NumBlocks,
}
msg, err := json.Marshal(sr)
if err != nil {
log.Fatal("json error numblock subscription", err)
}
resp = cache_resp{true, string(msg)}
}
cr.retChan <-resp
case cl := <-ch.CacheLoad:
......@@ -138,6 +152,14 @@ func CacheManager(ch *CommHub) {
unsp_data := unspent_cache{cl.Resp, exptime}
UnspentCache[key] = unsp_data
}
case "blockchain.numblocks.subscribe":
switch p := cl.StCmdMsg.Params.(type) {
case []interface{}:
switch q := p[0].(type) {
case int:
NumBlocks = q
}
}
}
}
}
......@@ -145,7 +167,7 @@ func CacheManager(ch *CommHub) {
func checkCache(stcmd stratum_command_msg, ch *CommHub) cache_resp {
switch stcmd.Method {
case "blockchain.transaction.get", "blockchain.address.listunspent":
case "blockchain.transaction.get", "blockchain.address.listunspent", "blockchain.numblocks.subscribe":
log.Println("Checking Cache for Method:", stcmd.Method)
cacheRespChan := make(chan cache_resp)
ch.CacheReq <-cache_req{stcmd, cacheRespChan}
......@@ -159,7 +181,7 @@ func checkCache(stcmd stratum_command_msg, ch *CommHub) cache_resp {
func loadCache(stcmd stratum_command_msg, resp string, ch *CommHub) {
switch stcmd.Method {
case "blockchain.transaction.get", "blockchain.address.listunspent":
case "blockchain.transaction.get", "blockchain.address.listunspent", "blockchain.numblocks.subscribe":
log.Println("Add to Cache", strings.TrimSpace(resp))
ch.CacheLoad <- cache_load{stcmd, resp}
}
......@@ -171,17 +193,28 @@ func serverRespHandler(ch *CommHub) {
case m := <-ch.ServerIn:
var s stratum_command_resp
s.FromJsonString(m)
for _, n := range ch.IncomingConns {
log.Println("conn number:", n.ConnId)
stcmdmsg, ok := n.MsgIds[s.Id]
if ok {
loadCache(stcmdmsg, m, ch)
s.Id = stcmdmsg.Id
newMsg, err := json.Marshal(s)
if err != nil {
log.Fatal("JSON Error,", err)
if s.Method == "blockchain.numblocks.subscribe" {
stcmdmsg := stratum_command_msg{1, s.Params, s.Method}
loadCache(stcmdmsg, m, ch)
for _, n := range ch.IncomingConns {
_, found := n.Subscriptions["blockchain.numblocks.subscribe"]
if found {
n.Conn.Write(append([]byte(m), []byte("\n")...))
}
}
} else {
for _, n := range ch.IncomingConns {
log.Println("conn number:", n.ConnId)
stcmdmsg, ok := n.MsgIds[s.Id]
if ok {
loadCache(stcmdmsg, m, ch)
s.Id = stcmdmsg.Id
newMsg, err := json.Marshal(s)
if err != nil {
log.Fatal("JSON Error,", err)
}
n.Conn.Write(append(newMsg, []byte("\n")...))
}
n.Conn.Write(append(newMsg, []byte("\n")...))
}
}
}
......@@ -210,11 +243,35 @@ func handleConnection(inConn incoming_conn, ch *CommHub) {
stratCmd.FromJsonString(message)
if stratCmd.Method == "server.version" {
log.Println("Client version message")
sr := stratum_command_resp{stratCmd.Id, "1.0"}
sr := stratum_command_resp{
Id: stratCmd.Id,
Result: "1.0",
}
msg, _ := json.Marshal(sr)
inConn.Conn.Write(append(msg, []byte("\n")...))
continue
}
if stratCmd.Method == "blockchain.numblocks.subscribe" {
_, already_sub := inConn.Subscriptions["blockchain.numblocks.subscribe"]
if already_sub {
CacheResp := checkCache(stratCmd, ch)
if CacheResp.CacheOK {
var sr stratum_command_resp
sr.FromJsonString(CacheResp.CacheVal)
sr.Id = stratCmd.Id
newMsg, err := json.Marshal(sr)
if err != nil {
log.Fatal("json marshal err", err)
}
inConn.Conn.Write(append(newMsg, []byte("\n")...))
} else {
log.Println("ERROR: cached response not found (should be found)")
}
continue
} else {
inConn.Subscriptions["blockchain.numblocks.subscribe"] = struct{}{}
}
}
log.Println("Method Call:", stratCmd.Method)
log.Println("Params:", stratCmd.Params)
CacheResp := checkCache(stratCmd, ch)
......@@ -310,6 +367,7 @@ func main () {
Conn: conn,
MsgIds: make(map[int]stratum_command_msg),
ConnId: connId,
Subscriptions: make(map[string]struct{}),
}
commhub.IncomingConns = append(commhub.IncomingConns, []incoming_conn{incConn}...)
connId++
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment