프로그램/Python Project

[Python Project] 비트코인 자동거래 -로컬파일핸들러-

 

코드구조

  • class cleaner
    • clean_history() : 동작기록 삭제, bitmain에서 호출되며, 과거 기록 삭제에 사용
  • class key_loader
    • key_set() : 로컬파일 key.dat에서 key 불러옴 [acc,sec]형태로 반환 이거 안씀 왜만들었지?
    • key_save() : [acc,sec]형태로 파라미터를 받아 key.dat update
    • key_loader() :  로컬파일 key.dat에서 key 불러옴 [acc,sec]형태로 반환
  • class config_loader
    • loading() : config.dat에서 설정 복사해서 config(dictionary)생성, 반환
    • config_save() : config(dictionary) 파라미터로 받아 config.dat update
    • get, get_config() : 안씁니다. 초기 디버그용
  • class history_maker
    • init_history() : 해당 트레이드봇의 동작로그에 맨처음 적을 말 씁니다.
    • write_history() : 해당 트레이드봇의 동작로그 기록
    • load_history() : 해당 트레이드봇의 동작로그 불러옴
    • get_history_list() : 트레이드봇의 동작기록들 목록 로딩
    • open_history() : 해당 트레이드봇의 동작기록 열기(메모장으로) 
  • class fomula
    • read_raw_buy_parser() : 트레이드봇의 구매조건 읽어옴 raw_parser로 리스트 생성 후 반환
    • read_raw_sell_parser() : 트레이드봇의 판매조건 읽어옴 raw_parser로 리스트 생성 후 반환
    • raw_parser() : 위 두함수로 읽어온 결과 한줄한줄씩 해석가능 리스트로 바꾸어줌
    • read_buy_parser() : 트레이드봇의 구매조건을 읽어와 람다식 함수 리스트생성
    • read_sell_parser() : 트레이드봇의 판매조건을 읽어와 람다식 함수 리스트생성
    • make_fomula() : 아래 make_fomula_sentense을 한줄한줄 read_XX_parser로 읽어온 리스트에 적용
    • make_fomula_sentense() : read_XX_parser로 가져온 한줄에 대한 람다식 생성
    • raw_to_fom() : _criteria.dat에 저장가능한 형태로 변경
    • save_config() : 변경된 설정 저장
    • get_fomula() : 사용가능한 함수반환
    • get_operatator() : 사용가능한 연산자반환
    • get_fom_type() : fomula에 사용가능한 파라미터 수 리스트로 반환
  • init_fomula
    • read_raw_data() : 초기 진입 조건 _init_criteria.dat 읽어옴, list로 반환
    • save_init() : 초기 진입 조건 _init_criteria.dat 저장함
    • criteria_list() : 어 이거 왜 만들었지?

 


코드

### local_loader.py
### 21. 6. 28. add class local_loader, func gey_key()
### 21. 7. 3.  add umm... many class, func

import fileinput
import datetime
import sys
import os
import subprocess
import shutil
from algo_util import *

######################################################
###                                                ###
### This class is to clean previous log            ###
###                                                ###
######################################################
class cleaner :

    def clean_history() :
        pwd = os.getcwd()
        today=datetime.datetime.now()
        now = str(today.year)+"_"+str(today.month)+"_"+str(today.day)
        try :
            os.mkdir("./bot_history/"+now)
        except FileExistsError as e:
            print(e)
        os.chdir("./bot_history/")
        filelist = [ x for x in os.listdir() if ".log" in x ]
        for file in filelist :
            try :
                shutil.move(file,"./"+now)
            except shutil.Error as e:
                shutil.copy(file,"./"+now)
                os.remove(file)
                print(e)
        os.chdir(pwd)

######################################################
###                                                ###
### This class is to handle user's trading key     ###
###                                                ###
######################################################
class key_loader :

    def key_set() :
        with open("./key.dat","r") as f:
            data = f.read().split("\n")
            acc = data[0].split("=")[1].strip()
            sec = data[1].split("=")[1].strip()
        return [acc,sec]

    def key_save(value) :
        with fileinput.FileInput("./key.dat",inplace = True) as f:
            for line in f :
                if("acc" in line) :
                    line = line.replace(line,"acc="+value[0])
                elif("sec" in line) :
                    line = line.replace(line,"sec="+value[1])

    def key_loader() :
        ret = list()
        with open("./key.dat","r") as f:
            data = f.read().split("\n")
            for i in range(len(data)) :
                if("acc=" in data[i]) :
                    ret.append(data[i].split("=")[1])
                elif("sec=" in data[i]) :
                    ret.append(data[i].split("=")[1])

        if(len(ret)!=2) :
            log_maker.write_error_log("failed to load key")
        else :
            return ret
        
######################################################
###                                                ###
### This class is to handle config                 ###
###                                                ###
######################################################   
class config_loader :

    def loading() :
        config = dict()
        with open("./config.dat","r") as f:
            data = f.read().split("\n")

            for i in range(len(data)) :
                if(len(data[i])<3) :
                    continue
                if(data[i][0]=="#" or data[i]=="") :
                    continue
                else :
                    try :
                        data_s = data[i].split("=")[1].strip()
                        if(float(data_s) != int(data_s)) :
                            config[data[i].split("=")[0]] = float(data_s)
                        else :
                            config[data[i].split("=")[0]] = int(data_s)
                    except ValueError :
                        config[data[i].split("=")[0]] = data[i].split("=")[1].strip()

        return config

    def get(self,wh) :
        if(wh in self.config) :
            return self.config[wh]

    def config_save(cd) :
        setter = list(cd.keys())
        with fileinput.FileInput("./config.dat",inplace=True,backup=".bak") as f:
            for line in f :
                set_flag = False
                for i in range(len(setter)) :
                    if(setter[i] in line) :
                        set_flag = True
                        line = line.replace(line,setter[i]+"="+str(cd[setter[i]]))
                        sys.stdout.write(line+"\n")
                        del setter[i]
                        break
                if(set_flag==False) :
                    sys.stdout.write(line)
                if(len(setter) == 0 ) :
                    break


    def get_config() :
        config = dict()
        with open("./config.dat","r") as f:
            data = f.read().split("\n")

            for i in range(len(data)) :
                if ("=" in data[i]) :
                    config[data[i].split("=")[0].strip()] = data[i].split("=")[1].strip()

        return config
        
######################################################
###                                                ###
### This class is to write running log             ###
### error log writer is in utility.py              ###
###                                                ###
######################################################
class history_maker :

    def init_history(fiat) :
        now = datetime.datetime.now()
        try :
            with("./bot_history/"+fiat+"_history.log","w") as f:
                f.write("--------"+fiat+"--------\n")
                f.write(str(now)+"\n\n")
        except FileNotFoundError as e:
            os.mkdir("./bot_history")
            self.init_history(fiat)

    def write_history(fiat,string) :
        now = datetime.datetime.now()
        try :
            with open("./bot_history/"+fiat+"_history.log","a") as f :
                f.write("<"+str(now)+"> "+string+"\n\n")
        except FileNotFoundError as e:
            print(e)

    def load_history(fiat) :
        with open("./bot_history/"+fiat+"_history.log","a") as f:
            data = f.read().split("\n")
        return data

    def get_history_list() :
        list_tmp = [ x for x in os.listdir("./bot_history/") if x.endswith(".log")]
        for i in range(len(list_tmp)) :
            list_tmp[i] = list_tmp[i].split("_")[0]
        return list_tmp

    def open_history(fiat) :
        subprocess.Popen("notepad.exe ./bot_history/"+fiat+"_history.log")


######################################################
###                                                ###
### This script is for parse criteria              ###
### interact with algo_util and trading_bot        ###
### fomula express as form's of                    ###
### EX> A/a M B/b (& another fom)                  ###
###                                                ###
######################################################
class fomula :

    fomula_dict = {"0":"현재가","1":"최소가","2":"최대가","3":"평균가","4":"이동평균가","5":"마지막구매가","6":"마지막판매가"}
    fom_type_dict = {0:[],1:["0","1","2","3","5","6"],2:["4"]}
    operator_dict = {"1":"<","2":"<=","3":">","4":">=","5":"==","6":"!="}

    def read_raw_buy_parser(fiat="") :

        buy_criteria = list()
        
        with open("./criteria/"+fiat+"_buy_criteria.dat","r") as f:
            data = f.read().split("\n")
            for i in range(len(data)) :
                if(data[i].strip() == "") :
                    continue
                elif(data[i][0]=="#") :
                    continue
                else :
                    buy_criteria.append(fomula.raw_parser(data[i].split(":")[1]))
                                        
        return buy_criteria

    def read_raw_sell_parser(fiat="") :

        sell_criteria = list()
        
        with open("./criteria/"+fiat+"_sell_criteria.dat","r") as f:
            data = f.read().split("\n")
            for i in range(len(data)) :
                if(data[i].strip() == "") :
                    continue
                elif(data[i][0]=="#") :
                    continue
                else :
                    sell_criteria.append(fomula.raw_parser(data[i].split(":")[1]))
                                        
        return sell_criteria


    def raw_parser(line) :
        res = list()
        tmp_line = line.split("&")
        for i in range(len(tmp_line)) :
            apt_list = list()
            tmp  = tmp_line[i].strip()
            a1,fo,a2 = tmp.split(" ")
            if("/" in a1) :
                a1_args = a1.split("/")[1:]
                a1 = a1.split("/")[0]
                apt_list.append(fomula.fomula_dict[a1])
                apt_list.append(a1_args)
            else :
                apt_list.append(fomula.fomula_dict[a1])
                
            apt_list.append(fomula.operator_dict[fo])
            
            if("/" in a2) :
                a2_args = a2.split("/")[1:]
                a2 = a2.split("/")[0]
                apt_list.append(fomula.fomula_dict[a2])
                apt_list.append(a2_args)
            else :
                apt_list.append(fomula.fomula_dict[a2])
    
            
            res.append(apt_list)
        return res
        

    def read_buy_parser(fom_algo,arg,fiat="") :

        buy_criteria = list()
        
        with open("./criteria/"+fiat+"_buy_criteria.dat","r") as f:
            data = f.read().split("\n")
            for i in range(len(data)) :
                if(data[i].strip() == "") :
                    continue
                elif(data[i][0]=="#") :
                    continue
                else :
                    buy_criteria.append(fomula.make_fomula(data[i].split(":")[1],fom_algo,arg))            
        return buy_criteria

    def read_sell_parser(fom_algo, arg, fiat="") :    
        sell_criteria = list()
        with open("./criteria/"+fiat+"_sell_criteria.dat","r") as f:
            data = f.read().split("\n")
            for i in range(len(data)) :
                if(data[i].strip() == "") :
                    continue
                elif(data[i][0]=="#") :
                    continue
                else :
                    sell_criteria.append(fomula.make_fomula(data[i].split(":")[1],fom_algo,arg))
                                        
        return sell_criteria

    def make_fomula(fom,fom_algo,arg) :

        fom = fom.split("&")
        fom_list = list()

        for i in range(len(fom)) :
            fom_list.append(fomula.make_fomula_sentence(fom[i],fom_algo,arg))

        return fom_list

    def make_fomula_sentence(fom,fom_algo,arg) :
        fom = fom.strip()
        a1,fo,a2 = fom.split(" ")

        buy_money = arg["buying"]
        sell_money = arg["selling"]

        a1_args = [0,0]
        a2_args = [0,0]
        
        if("/" in a1) :
            a1_args = a1.split("/")[1:]
            a1 = a1.split("/")[0]
        if("/" in a2) :
            a2_args = a2.split("/")[1:]
            a2 = a2.split("/")[0]

        if(a1 == "0") :
            a1_foo = lambda x : fom_algo.current_val()
        elif(a1 == "1") :
            a1_foo = lambda x : fom_algo.min_val(x)*float((100-float(a1_args[0]))/100)
        elif(a1 == "2") :
            a1_foo = lambda x : fom_algo.max_val(x)*float((100-float(a1_args[0]))/100)
        elif(a1 == "3") :
            a1_foo = lambda x : fom_algo.avg_val(x)*float((100-float(a1_args[0]))/100)
        elif(a1 == "4") :
            if(a1_args[0] != 0) :
                a1_foo = lambda x : fom_algo.rolling_mean(x,a1_args[1])*float((100-float(a1_args[0]))/100)
            else :
                a1_foo = lambda x : fom_algo.rolling_mean(x)*float((100-float(a1_args[0]))/100)
        elif(a1 == "5") :
            a1_foo = lambda x : buy_money*float((100-float(a1_args[0]))/100)
        elif(a1 == "6") :
            a1_foo = lambda x : sell_money*float((100-float(a1_args[0]))/100)

        if(a2 == "0") :
            a2_foo = lambda x : fom_algo.current_val()
        elif(a2 == "1") :
            a2_foo = lambda x : fom_algo.min_val(x)*float((100-float(a2_args[0]))/100)
        elif(a2 == "2") :
            a2_foo = lambda x : fom_algo.max_val(x)*float((100-float(a2_args[0]))/100)
        elif(a2 == "3") :
            a2_foo = lambda x : fom_algo.avg_val(x)*float((100-float(a2_args[0]))/100)
        elif(a2 == "4") :
            if(a2_args[1]!=0) :
                a2_foo = lambda x : fom_algo.rolling_mean(x,a2_args[1])*float((100-float(a2_args[0]))/100)
            else :
                a2_foo = lambda x : fom_algo.rolling_mean(x)*float((100-float(a2_args[0]))/100)
        elif(a2 == "5") :
            a2_foo = lambda x : buy_money*float((100-float(a2_args[0]))/100)
        elif(a2 == "6") :
            a2_foo = lambda x : sell_money*float((100-float(a2_args[0]))/100)

        if(fo == "1") :
            fom = lambda x : True if a1_foo(x) < a2_foo(x) else False
        elif(fo == "2") :
            fom = lambda x : True if a1_foo(x) <= a2_foo(x) else False
        elif(fo == "3") :
            fom = lambda x : True if a1_foo(x) > a2_foo(x) else False
        elif(fo == "4") :
            fom = lambda x : True if a1_foo(x) >= a2_foo(x) else False
        elif(fo == "5") :
            fom = lambda x : True if a1_foo(x) == a2_foo(x) else False
        elif(fo == "6") :
            fom = lambda x : True if a1_foo(x) != a2_foo(x) else False

        return fom

    def raw_to_fom(raw) :
        output = ""
        
        if(raw==[]) :
            return ""
        
        rev_fomula = dict([(value,key) for key,value in fomula.fomula_dict.items()])
        rev_ope = dict([(value,key) for key,value in fomula.operator_dict.items()])
        for i in range(len(raw)):
            offset = 0
            output += rev_fomula[raw[i][0]]
            if(isinstance(raw[i][1], list)) :
                for j in range(len(raw[i][1])) :
                    output += "/"+raw[i][1][j]
                offset += 1
            output +=  " "
            output += rev_ope[raw[i][1+offset]] + " "
            output += rev_fomula[raw[i][2+offset]]
            if(isinstance(raw[i][-1],list)) :
                for j in range(len(raw[i][-1])) :
                    output += "/"+raw[i][-1][j]
            output += " & "
        output = output[:-3]
        return output


    def save_config(index,raw,bs_type,fiat="") :
        try :
            if(raw == []) :
                cnt = 1
                with fileinput.FileInput("./criteria/"+fiat+"_"+bs_type+"_criteria.dat",inplace=True) as f:
                    for line in f :
                        try :
                            find_index = line.split(":")[0].strip()
                            setter = line.split(":")[1].strip()
                        except Exception as e:
                            continue
                        if(str(index)+" :" in line) :
                            continue
                        line = line.replace(line,str(cnt)+" : "+setter)
                        cnt+=1
                        sys.stdout.write(line+"\n")
                return
            trans = str(index)+ " : " + fomula.raw_to_fom(raw)+"\n"
            setter = 0
            set_flag=False
            with fileinput.FileInput("./criteria/"+fiat+"_"+bs_type+"_criteria.dat", inplace=True) as f:
                for line in f :
                    setter = line.split(":")[0].strip()
                    if(str(index)==setter):
                        line = line.replace(line, trans)
                    sys.stdout.write(line)
                    set_flag=True
            if(int(setter)<index) :
                with open("./criteria/"+fiat+"_"+bs_type+"_criteria.dat","a") as f:
                    if(set_flag==True) :
                        f.write("\n"+trans)
                    else :
                        f.write(trans)
        except Exception as e:
            print(e)

        
    def get_fomula(self) :
        return self.fomula_dict

    def get_operator(self) :
        return self.operator_dict

    def get_fom_type(self) :
        return self.fom_type_dict
            

class init_fomula :

    def read_raw_data() :
        crit = list()
        with open("./criteria/_init_criteria.dat","r") as f:
            data = f.read().split("\n")
            for i in range(len(data)) :
                if(i == 0):
                    tmp_line = data[i].split(" ")
                    crit.append([tmp_line[0].strip(),[tmp_line[1].split("/")[0].strip(),tmp_line[1].split("/")[1].strip()],tmp_line[2].strip()])
                elif(i == 1) :
                    tmp_line = data[i].split(" ")
                    crit.append([tmp_line[0].strip(),[tmp_line[1].split("/")[0].strip(),tmp_line[1].split("/")[1].strip()]])
                
        return crit

    def save_init(res) :
        with fileinput.FileInput("./criteria/_init_criteria.dat", inplace = True) as f :
            cnt = 0 
            for line in f:
                if(cnt == 0) :
                    write_line = res[cnt][0]+" "+res[cnt][1][0]+"/"+res[cnt][1][1]+" "+res[cnt][2]
                    line = line.replace(line,write_line+"\n")
                    sys.stdout.write(line)
                elif(cnt == 1) :
                    write_line = res[cnt][0]+" "+res[cnt][1][0]+"/"+res[cnt][1][1]
                    line = line.replace(line,write_line+"\n")
                    sys.stdout.write(line)
                cnt+=1 

    def criteria_list() :
        with open("./criteria/_init_criteria.dat","r") as f:
            data = f.read().split("\n")
        return data

변경점

  • 21. 6. 28. add class local_loader, func gey_key()
  • 21. 7. 3. add many class(fomula, init_fomual, history_maker, config_loader,cleaner)