코드구조
- class cleaner
- clean_history() : 동작기록 삭제, bitmain에서 호출되며, 과거 기록 삭제에 사용
- class key_loader
- key_set() : 로컬파일 key.dat에서 key 불러옴 [acc,sec]형태로 반환 이거 안씀 왜만들었지?
- key_save() : [acc,sec]형태로 파라미터를 받아 key.dat update
- key_loader() : 로컬파일 key.dat에서 key 불러옴 [acc,sec]형태로 반환
- class config_loader
- loading() : config.dat에서 설정 복사해서 config(dictionary)생성, 반환
- config_save() : config(dictionary) 파라미터로 받아 config.dat update
- get, get_config() : 안씁니다. 초기 디버그용
- class history_maker
- init_history() : 해당 트레이드봇의 동작로그에 맨처음 적을 말 씁니다.
- write_history() : 해당 트레이드봇의 동작로그 기록
- load_history() : 해당 트레이드봇의 동작로그 불러옴
- get_history_list() : 트레이드봇의 동작기록들 목록 로딩
- open_history() : 해당 트레이드봇의 동작기록 열기(메모장으로)
- class fomula
- read_raw_buy_parser() : 트레이드봇의 구매조건 읽어옴 raw_parser로 리스트 생성 후 반환
- read_raw_sell_parser() : 트레이드봇의 판매조건 읽어옴 raw_parser로 리스트 생성 후 반환
- raw_parser() : 위 두함수로 읽어온 결과 한줄한줄씩 해석가능 리스트로 바꾸어줌
- read_buy_parser() : 트레이드봇의 구매조건을 읽어와 람다식 함수 리스트생성
- read_sell_parser() : 트레이드봇의 판매조건을 읽어와 람다식 함수 리스트생성
- make_fomula() : 아래 make_fomula_sentense을 한줄한줄 read_XX_parser로 읽어온 리스트에 적용
- make_fomula_sentense() : read_XX_parser로 가져온 한줄에 대한 람다식 생성
- raw_to_fom() : _criteria.dat에 저장가능한 형태로 변경
- save_config() : 변경된 설정 저장
- get_fomula() : 사용가능한 함수반환
- get_operatator() : 사용가능한 연산자반환
- get_fom_type() : fomula에 사용가능한 파라미터 수 리스트로 반환
- init_fomula
- read_raw_data() : 초기 진입 조건 _init_criteria.dat 읽어옴, list로 반환
- save_init() : 초기 진입 조건 _init_criteria.dat 저장함
- criteria_list() : 어 이거 왜 만들었지?
코드
### local_loader.py
### 21. 6. 28. add class local_loader, func gey_key()
### 21. 7. 3. add umm... many class, func
import fileinput
import datetime
import sys
import os
import subprocess
import shutil
from algo_util import *
######################################################
### ###
### This class is to clean previous log ###
### ###
######################################################
class cleaner :
def clean_history() :
pwd = os.getcwd()
today=datetime.datetime.now()
now = str(today.year)+"_"+str(today.month)+"_"+str(today.day)
try :
os.mkdir("./bot_history/"+now)
except FileExistsError as e:
print(e)
os.chdir("./bot_history/")
filelist = [ x for x in os.listdir() if ".log" in x ]
for file in filelist :
try :
shutil.move(file,"./"+now)
except shutil.Error as e:
shutil.copy(file,"./"+now)
os.remove(file)
print(e)
os.chdir(pwd)
######################################################
### ###
### This class is to handle user's trading key ###
### ###
######################################################
class key_loader :
def key_set() :
with open("./key.dat","r") as f:
data = f.read().split("\n")
acc = data[0].split("=")[1].strip()
sec = data[1].split("=")[1].strip()
return [acc,sec]
def key_save(value) :
with fileinput.FileInput("./key.dat",inplace = True) as f:
for line in f :
if("acc" in line) :
line = line.replace(line,"acc="+value[0])
elif("sec" in line) :
line = line.replace(line,"sec="+value[1])
def key_loader() :
ret = list()
with open("./key.dat","r") as f:
data = f.read().split("\n")
for i in range(len(data)) :
if("acc=" in data[i]) :
ret.append(data[i].split("=")[1])
elif("sec=" in data[i]) :
ret.append(data[i].split("=")[1])
if(len(ret)!=2) :
log_maker.write_error_log("failed to load key")
else :
return ret
######################################################
### ###
### This class is to handle config ###
### ###
######################################################
class config_loader :
def loading() :
config = dict()
with open("./config.dat","r") as f:
data = f.read().split("\n")
for i in range(len(data)) :
if(len(data[i])<3) :
continue
if(data[i][0]=="#" or data[i]=="") :
continue
else :
try :
data_s = data[i].split("=")[1].strip()
if(float(data_s) != int(data_s)) :
config[data[i].split("=")[0]] = float(data_s)
else :
config[data[i].split("=")[0]] = int(data_s)
except ValueError :
config[data[i].split("=")[0]] = data[i].split("=")[1].strip()
return config
def get(self,wh) :
if(wh in self.config) :
return self.config[wh]
def config_save(cd) :
setter = list(cd.keys())
with fileinput.FileInput("./config.dat",inplace=True,backup=".bak") as f:
for line in f :
set_flag = False
for i in range(len(setter)) :
if(setter[i] in line) :
set_flag = True
line = line.replace(line,setter[i]+"="+str(cd[setter[i]]))
sys.stdout.write(line+"\n")
del setter[i]
break
if(set_flag==False) :
sys.stdout.write(line)
if(len(setter) == 0 ) :
break
def get_config() :
config = dict()
with open("./config.dat","r") as f:
data = f.read().split("\n")
for i in range(len(data)) :
if ("=" in data[i]) :
config[data[i].split("=")[0].strip()] = data[i].split("=")[1].strip()
return config
######################################################
### ###
### This class is to write running log ###
### error log writer is in utility.py ###
### ###
######################################################
class history_maker :
def init_history(fiat) :
now = datetime.datetime.now()
try :
with("./bot_history/"+fiat+"_history.log","w") as f:
f.write("--------"+fiat+"--------\n")
f.write(str(now)+"\n\n")
except FileNotFoundError as e:
os.mkdir("./bot_history")
self.init_history(fiat)
def write_history(fiat,string) :
now = datetime.datetime.now()
try :
with open("./bot_history/"+fiat+"_history.log","a") as f :
f.write("<"+str(now)+"> "+string+"\n\n")
except FileNotFoundError as e:
print(e)
def load_history(fiat) :
with open("./bot_history/"+fiat+"_history.log","a") as f:
data = f.read().split("\n")
return data
def get_history_list() :
list_tmp = [ x for x in os.listdir("./bot_history/") if x.endswith(".log")]
for i in range(len(list_tmp)) :
list_tmp[i] = list_tmp[i].split("_")[0]
return list_tmp
def open_history(fiat) :
subprocess.Popen("notepad.exe ./bot_history/"+fiat+"_history.log")
######################################################
### ###
### This script is for parse criteria ###
### interact with algo_util and trading_bot ###
### fomula express as form's of ###
### EX> A/a M B/b (& another fom) ###
### ###
######################################################
class fomula :
fomula_dict = {"0":"현재가","1":"최소가","2":"최대가","3":"평균가","4":"이동평균가","5":"마지막구매가","6":"마지막판매가"}
fom_type_dict = {0:[],1:["0","1","2","3","5","6"],2:["4"]}
operator_dict = {"1":"<","2":"<=","3":">","4":">=","5":"==","6":"!="}
def read_raw_buy_parser(fiat="") :
buy_criteria = list()
with open("./criteria/"+fiat+"_buy_criteria.dat","r") as f:
data = f.read().split("\n")
for i in range(len(data)) :
if(data[i].strip() == "") :
continue
elif(data[i][0]=="#") :
continue
else :
buy_criteria.append(fomula.raw_parser(data[i].split(":")[1]))
return buy_criteria
def read_raw_sell_parser(fiat="") :
sell_criteria = list()
with open("./criteria/"+fiat+"_sell_criteria.dat","r") as f:
data = f.read().split("\n")
for i in range(len(data)) :
if(data[i].strip() == "") :
continue
elif(data[i][0]=="#") :
continue
else :
sell_criteria.append(fomula.raw_parser(data[i].split(":")[1]))
return sell_criteria
def raw_parser(line) :
res = list()
tmp_line = line.split("&")
for i in range(len(tmp_line)) :
apt_list = list()
tmp = tmp_line[i].strip()
a1,fo,a2 = tmp.split(" ")
if("/" in a1) :
a1_args = a1.split("/")[1:]
a1 = a1.split("/")[0]
apt_list.append(fomula.fomula_dict[a1])
apt_list.append(a1_args)
else :
apt_list.append(fomula.fomula_dict[a1])
apt_list.append(fomula.operator_dict[fo])
if("/" in a2) :
a2_args = a2.split("/")[1:]
a2 = a2.split("/")[0]
apt_list.append(fomula.fomula_dict[a2])
apt_list.append(a2_args)
else :
apt_list.append(fomula.fomula_dict[a2])
res.append(apt_list)
return res
def read_buy_parser(fom_algo,arg,fiat="") :
buy_criteria = list()
with open("./criteria/"+fiat+"_buy_criteria.dat","r") as f:
data = f.read().split("\n")
for i in range(len(data)) :
if(data[i].strip() == "") :
continue
elif(data[i][0]=="#") :
continue
else :
buy_criteria.append(fomula.make_fomula(data[i].split(":")[1],fom_algo,arg))
return buy_criteria
def read_sell_parser(fom_algo, arg, fiat="") :
sell_criteria = list()
with open("./criteria/"+fiat+"_sell_criteria.dat","r") as f:
data = f.read().split("\n")
for i in range(len(data)) :
if(data[i].strip() == "") :
continue
elif(data[i][0]=="#") :
continue
else :
sell_criteria.append(fomula.make_fomula(data[i].split(":")[1],fom_algo,arg))
return sell_criteria
def make_fomula(fom,fom_algo,arg) :
fom = fom.split("&")
fom_list = list()
for i in range(len(fom)) :
fom_list.append(fomula.make_fomula_sentence(fom[i],fom_algo,arg))
return fom_list
def make_fomula_sentence(fom,fom_algo,arg) :
fom = fom.strip()
a1,fo,a2 = fom.split(" ")
buy_money = arg["buying"]
sell_money = arg["selling"]
a1_args = [0,0]
a2_args = [0,0]
if("/" in a1) :
a1_args = a1.split("/")[1:]
a1 = a1.split("/")[0]
if("/" in a2) :
a2_args = a2.split("/")[1:]
a2 = a2.split("/")[0]
if(a1 == "0") :
a1_foo = lambda x : fom_algo.current_val()
elif(a1 == "1") :
a1_foo = lambda x : fom_algo.min_val(x)*float((100-float(a1_args[0]))/100)
elif(a1 == "2") :
a1_foo = lambda x : fom_algo.max_val(x)*float((100-float(a1_args[0]))/100)
elif(a1 == "3") :
a1_foo = lambda x : fom_algo.avg_val(x)*float((100-float(a1_args[0]))/100)
elif(a1 == "4") :
if(a1_args[0] != 0) :
a1_foo = lambda x : fom_algo.rolling_mean(x,a1_args[1])*float((100-float(a1_args[0]))/100)
else :
a1_foo = lambda x : fom_algo.rolling_mean(x)*float((100-float(a1_args[0]))/100)
elif(a1 == "5") :
a1_foo = lambda x : buy_money*float((100-float(a1_args[0]))/100)
elif(a1 == "6") :
a1_foo = lambda x : sell_money*float((100-float(a1_args[0]))/100)
if(a2 == "0") :
a2_foo = lambda x : fom_algo.current_val()
elif(a2 == "1") :
a2_foo = lambda x : fom_algo.min_val(x)*float((100-float(a2_args[0]))/100)
elif(a2 == "2") :
a2_foo = lambda x : fom_algo.max_val(x)*float((100-float(a2_args[0]))/100)
elif(a2 == "3") :
a2_foo = lambda x : fom_algo.avg_val(x)*float((100-float(a2_args[0]))/100)
elif(a2 == "4") :
if(a2_args[1]!=0) :
a2_foo = lambda x : fom_algo.rolling_mean(x,a2_args[1])*float((100-float(a2_args[0]))/100)
else :
a2_foo = lambda x : fom_algo.rolling_mean(x)*float((100-float(a2_args[0]))/100)
elif(a2 == "5") :
a2_foo = lambda x : buy_money*float((100-float(a2_args[0]))/100)
elif(a2 == "6") :
a2_foo = lambda x : sell_money*float((100-float(a2_args[0]))/100)
if(fo == "1") :
fom = lambda x : True if a1_foo(x) < a2_foo(x) else False
elif(fo == "2") :
fom = lambda x : True if a1_foo(x) <= a2_foo(x) else False
elif(fo == "3") :
fom = lambda x : True if a1_foo(x) > a2_foo(x) else False
elif(fo == "4") :
fom = lambda x : True if a1_foo(x) >= a2_foo(x) else False
elif(fo == "5") :
fom = lambda x : True if a1_foo(x) == a2_foo(x) else False
elif(fo == "6") :
fom = lambda x : True if a1_foo(x) != a2_foo(x) else False
return fom
def raw_to_fom(raw) :
output = ""
if(raw==[]) :
return ""
rev_fomula = dict([(value,key) for key,value in fomula.fomula_dict.items()])
rev_ope = dict([(value,key) for key,value in fomula.operator_dict.items()])
for i in range(len(raw)):
offset = 0
output += rev_fomula[raw[i][0]]
if(isinstance(raw[i][1], list)) :
for j in range(len(raw[i][1])) :
output += "/"+raw[i][1][j]
offset += 1
output += " "
output += rev_ope[raw[i][1+offset]] + " "
output += rev_fomula[raw[i][2+offset]]
if(isinstance(raw[i][-1],list)) :
for j in range(len(raw[i][-1])) :
output += "/"+raw[i][-1][j]
output += " & "
output = output[:-3]
return output
def save_config(index,raw,bs_type,fiat="") :
try :
if(raw == []) :
cnt = 1
with fileinput.FileInput("./criteria/"+fiat+"_"+bs_type+"_criteria.dat",inplace=True) as f:
for line in f :
try :
find_index = line.split(":")[0].strip()
setter = line.split(":")[1].strip()
except Exception as e:
continue
if(str(index)+" :" in line) :
continue
line = line.replace(line,str(cnt)+" : "+setter)
cnt+=1
sys.stdout.write(line+"\n")
return
trans = str(index)+ " : " + fomula.raw_to_fom(raw)+"\n"
setter = 0
set_flag=False
with fileinput.FileInput("./criteria/"+fiat+"_"+bs_type+"_criteria.dat", inplace=True) as f:
for line in f :
setter = line.split(":")[0].strip()
if(str(index)==setter):
line = line.replace(line, trans)
sys.stdout.write(line)
set_flag=True
if(int(setter)<index) :
with open("./criteria/"+fiat+"_"+bs_type+"_criteria.dat","a") as f:
if(set_flag==True) :
f.write("\n"+trans)
else :
f.write(trans)
except Exception as e:
print(e)
def get_fomula(self) :
return self.fomula_dict
def get_operator(self) :
return self.operator_dict
def get_fom_type(self) :
return self.fom_type_dict
class init_fomula :
def read_raw_data() :
crit = list()
with open("./criteria/_init_criteria.dat","r") as f:
data = f.read().split("\n")
for i in range(len(data)) :
if(i == 0):
tmp_line = data[i].split(" ")
crit.append([tmp_line[0].strip(),[tmp_line[1].split("/")[0].strip(),tmp_line[1].split("/")[1].strip()],tmp_line[2].strip()])
elif(i == 1) :
tmp_line = data[i].split(" ")
crit.append([tmp_line[0].strip(),[tmp_line[1].split("/")[0].strip(),tmp_line[1].split("/")[1].strip()]])
return crit
def save_init(res) :
with fileinput.FileInput("./criteria/_init_criteria.dat", inplace = True) as f :
cnt = 0
for line in f:
if(cnt == 0) :
write_line = res[cnt][0]+" "+res[cnt][1][0]+"/"+res[cnt][1][1]+" "+res[cnt][2]
line = line.replace(line,write_line+"\n")
sys.stdout.write(line)
elif(cnt == 1) :
write_line = res[cnt][0]+" "+res[cnt][1][0]+"/"+res[cnt][1][1]
line = line.replace(line,write_line+"\n")
sys.stdout.write(line)
cnt+=1
def criteria_list() :
with open("./criteria/_init_criteria.dat","r") as f:
data = f.read().split("\n")
return data
변경점
- 21. 6. 28. add class local_loader, func gey_key()
- 21. 7. 3. add many class(fomula, init_fomual, history_maker, config_loader,cleaner)
'프로그램 > Python Project' 카테고리의 다른 글
[Python] comcbt 중복문제 제거하기 (0) | 2023.03.05 |
---|---|
[Python Project] 비트코인 자동거래 -시현- (24) | 2021.07.04 |
[Python Project] 비트코인 자동거래 -사전준비- (6) | 2021.06.28 |
[Python Project] 이상형 월드컵 나만의 UI만들기 -시현- (1) | 2021.06.27 |
[Python Project] 이상형 월드컵 나만의 UI만들기 - 기능도- (0) | 2021.06.27 |