特别注意:图中箭头指向处一定要记得勾选上。否则得手动配置环境变量了哦。
Q:如何配置环境变量呢?
A:控制面板—系统与安全—系统—高级系统设置—环境变量—系统变量—双击 path—进入编辑环境变量窗口后在空白处填入 Python 所在路径—一路确定。
检查
本节虽是零基础友好文,但也有对一些知识点的深度拓展,有编程基础的看官也可以选择性观看哦!
C:\机器名\用户名>
>>>
2 #整数 (int)
3.1314526 #浮点数 (float)
True #布尔值 (bool)
"1" #字符串 (str)
[1,2,"a"] #列表(list)
(1,2,"a") #元组(tuple)
{"name":"小明"} #字典(dict)
var1= 12
var2 = 12
var3 = 13
print(var1==var2) #输出True
print(var1==var3) #输出False
var1==var2==
与运算:铁面无私,要求所有都True,否则输出结果就为False。
True and True #True
True and False #False
False and False #False
或运算:要求不高,只要有一个为True输出的结果就为True。
True or True #True
True or False #True
False or False #False
非运算:老是唱反调,输入True,它给你输出False,反之亦然。(特别注意:它是一个单目运算符)
not True #False
not False #True
'"'aaa'"abc"''""'aaa'aaa'"\you’re
"you\' re"
'\\
"you\\'re"
list1= [1,2,3,4,5]
list2 = ["AI悦创","GitChat","Fly"]
print(list1[2]) # 输出:3
print(list2[0]) #输出:AI悦创
# 示例二
lists = ['a','b','c']
lists.append('d')
print(lists)
print(len(lists))
lists.insert(0,'mm')
lists.pop()#删除最后一个元素
print(lists)
# 输出
['a', 'b', 'c', 'd']
4
['mm', 'a', 'b', 'c']
tup1=('aaa',1,'bbb',2)
>>> tup1=(1)
>>> type(tup1)
<class 'int'>
>>> tup2=(1,)
>>> type(tup2)
<class 'tuple'>
列表与元组的区别
偷偷告诉你哦:
l= [1, 2, 3]
l.__sizeof__()
64
tup = (1, 2, 3)
tup.__sizeof__()
48
l= []
l.__sizeof__() // 空列表的存储空间为 40 字节
40
l.append(1)
l.__sizeof__()
72 // 加入了元素 1 之后,列表为其分配了可以存储 4 个元素的空间 (72 - 40)/8 = 4
l.append(2)
l.__sizeof__()
72 // 由于之前分配了空间,所以加入元素 2,列表空间不变
l.append(3)
l.__sizeof__()
72 // 同上
l.append(4)
l.__sizeof__()
72 // 同上
l.append(5)
l.__sizeof__()
104 // 加入元素 5 之后,列表的空间不足,所以又额外分配了可以存储 4 个元素的空间
brands= {"Tencent":"腾讯","Baidu":"百度","Alibaba":"阿里巴巴"}
brands["Tencent"] #获取键值为"Tencent"的value
del brands["Tencent"] #删除腾讯
brands.values[] #得到所有的value值
brands.get("Tencent") # 获取键值为"Tencent"的value
set1={'a','aa','aaa','aaaa'} #{'aaa', 'aa', 'aaaa', 'a'}
set1=set(['a','aa','aaa','aaaa'])
print(set1) #{'aaaa', 'aa', 'a', 'aaa'}
注意:set(){}{ }
>>> s={}
>>> type(s)
<class 'dict'>
拓展
添加:append、insert。
>>> list=["a","b"]
>>> list.append("c") # append(元素),将元素添加到列表里
>>> print(list)
['a', 'b', 'c']
>>> list.insert(0,"d")#insert(索引,元素),将元素添加到指定位置
>>> print(list)
['d', 'a', 'b', 'c']
删除:remove()、pop(索引)、pop()
>>> list.remove("d")#remove(元素),删去list中看不顺眼的元素
>>> list
['a', 'b', 'c']
>>> list.pop(1)
'b'#被删掉的元素
>>> print(list)
['a', 'c']#pop(索引),删去制定位置的元素
>>> list.pop()
'c'#被删掉的元素
>>> print(list)#pop(),默认删去最后一个元素
['a']
修改:list [索引] = 元素
>>> list=['a','c']
>>> list[0]='b'#替换制定位置的元素
>>> print(list)
['b','c']
查找:list [索引]
>>> list=['b','c']
>>> list[1]#查找指定位置的元素
'c'
>>> tuple1=(1,2,3,4)
>>> tuple1[0]=5
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'tuple' object does not support item assignment#报错
a=1+1 #这里a就是一个变量,用来存储 1+1产生的2
拓展
b=a
假设开发商 = 内存,变量 = 房子,变量存储的值 = 住户,在 b=a 前,a=1 的大趋势使得开发商把 a 房子建造好了,当 b=a 复制时,开发商又马不停蹄的画了块内存建了 b 房子,且 b 房子和 a 房子里都住着数值 1,因此当 a=4,使得 a 房子换了新住户,但这不能影响到 b 房子住户——数值 1 的居住。
# 判断语句:if … else …
i = 1
if i == 1:
print("Yes,it is 1")
else:
print("No,it is not 1")
# if … else … 是经典的判断语句,需要注意的是在 if expression 后面有个冒号,同样在 else 后面也存在冒号。
for i in range(1,10):
print(i)
i= 1
while (i<10):
i += 1
if i!= 8:
continue
else:
break
def function(param): # function为函数名,param为参数
i = 1
return i # f返回值
为了讲解得更形象,我们来写一个 a+b 求和的函数。
def getsum(a,b): #定义函数名为getSum,参数为a,b
sum = a+b;
return sum; #返回a+b的和,sum
print(getsum(1, 2))
open()
open("abc.txt","r")
# open()为Python 内置的文件函数,用来打开文件,“abc.txt”为目标文件名,"r"代表以只读方式打开文件,其他的还有“w"和"a"模式
read()
file = open("abc.txt","r")
words = file.read()
当你不确定用哪个词的时候,用量化交易就行了。
########## GEMINI行情接口 ##########
## https://api.gemini.com/v1/pubticker/:symbol
import json
import requests
gemini_ticker = 'https://api.gemini.com/v1/pubticker/{}'
symbol = 'btcusd'
btc_data = requests.get(gemini_ticker.format(symbol)).json()
print(json.dumps(btc_data, indent=4))
########## 输出 ##########
{
"bid": "8825.88",
"ask": "8827.52",
"volume": {
"BTC": "910.0838782726",
"USD": "7972904.560901317851",
"timestamp": 1560643800000
},
"last": "8838.45"
}
import matplotlib.pyplot as plt
import pandas as pd
import requests
# 选择要获取的数据时间段
periods = '3600'
# 通过Http抓取btc历史价格数据
resp = requests.get('https://api.cryptowat.ch/markets/gemini/btcusd/ohlc',
params={
'periods': periods
})
data = resp.json()
# 转换成pandas data frame
df = pd.DataFrame(
data['result'][periods],
columns=[
'CloseTime',
'OpenPrice',
'HighPrice',
'LowPrice',
'ClosePrice',
'Volume',
'NA'])
# 输出DataFrame的头部几行
print(df.head())
# 绘制btc价格曲线
df['ClosePrice'].plot(figsize=(14, 7))
########### 输出 ###############
CloseTime OpenPrice HighPrice ... ClosePrice Volume NA
0 1558843200 8030.55 8046.30 ... 8011.20 11.642968 93432.459964
1 1558846800 8002.76 8050.33 ... 8034.48 8.575682 68870.145895
2 1558850400 8031.61 8036.14 ... 8000.00 15.659680 125384.519063
3 1558854000 8000.00 8016.29 ... 8001.46 38.171420 304342.048892
4 1558857600 8002.69 8023.11 ... 8009.24 3.582830 28716.385009
用红色杯子,去厨房泡一杯放了糖的37.5度的普洱茶。
泡厨房的茶,
要求:
类型=普洱;
杯子=红色;
放糖=True;
温度=37.5度。
厨房的茶泡
GET https://api.gemini.com/v1/pubticker/btcusd
POST https://api.restful.cn/accounts/delete/:username
指向一个资源
DELETE https://api.rest.cn/accounts/:username
POST https://api.gemini.com/v1/order/cancel
无状态一个 HTTP 请求完成一次完整操作
import requests
import json
import base64
import hmac
import hashlib
import datetime
import time
base_url = "https://api.sandbox.gemini.com"
endpoint = "/v1/order/new"
url = base_url + endpoint
gemini_api_key = "account-zmidXEwP72yLSSybXVvn"
gemini_api_secret = "375b97HfE7E4tL8YaP3SJ239Pky9".encode()
t = datetime.datetime.now()
payload_nonce = str(int(time.mktime(t.timetuple())*1000))
payload = {
"request": "/v1/order/new",
"nonce": payload_nonce,
"symbol": "btcusd",
"amount": "5",
"price": "3633.00",
"side": "buy",
"type": "exchange limit",
"options": ["maker-or-cancel"]
}
encoded_payload = json.dumps(payload).encode()
b64 = base64.b64encode(encoded_payload)
signature = hmac.new(gemini_api_secret, b64, hashlib.sha384).hexdigest()
request_headers = {
'Content-Type': "text/plain",
'Content-Length': "0",
'X-GEMINI-APIKEY': gemini_api_key,
'X-GEMINI-PAYLOAD': b64,
'X-GEMINI-SIGNATURE': signature,
'Cache-Control': "no-cache"
}
response = requests.post(url,
data=None,
headers=request_headers)
new_order = response.json()
print(new_order)
########## 输出 ##########
{'order_id': '239088767', 'id': '239088767', 'symbol': 'btcusd', 'exchange': 'gemini', 'avg_execution_price': '0.00', 'side': 'buy', 'type': 'exchange limit', 'timestamp': '1561956976', 'timestampms': 1561956976535, 'is_live': True, 'is_cancelled': False, 'is_hidden': False, 'was_forced': False, 'executed_amount': '0', 'remaining_amount': '5', 'options': ['maker-or-cancel'], 'price': '3633.00', 'original_amount': '5'}
import requests
import timeit
def get_orderbook():
orderbook = requests.get("https://api.gemini.com/v1/book/btcusd").json()
n = 10
latency = timeit.timeit('get_orderbook()', setup='from __main__ import get_orderbook', number=n) * 1.0 / n
print('Latency is {} ms'.format(latency * 1000))
###### 输出 #######
Latency is 196.67642089999663 ms
curl-w "TCP handshake: %{time_connect}s, SSL handshake: %{time_appconnect}s\n" -so /dev/null https://www.gemini.com
TCP handshake: 0.072758s, SSL handshake: 0.119409s
import websocket
import thread
# 在接收到服务器发送消息时调用
def on_message(ws, message):
print('Received: ' + message)
# 在和服务器建立完成连接时调用
def on_open(ws):
# 线程运行函数
def gao():
# 往服务器依次发送0-4,每次发送完休息0.01秒
for i in range(5):
time.sleep(0.01)
msg="{0}".format(i)
ws.send(msg)
print('Sent: ' + msg)
# 休息1秒用于接收服务器回复的消息
time.sleep(1)
# 关闭Websocket的连接
ws.close()
print("Websocket closed")
# 在另一个线程运行gao()函数
thread.start_new_thread(gao, ())
if __name__ == "__main__":
ws = websocket.WebSocketApp("ws://echo.websocket.org/",
on_message = on_message,
on_open = on_open)
ws.run_forever()
#### 输出 #####
Sent: 0
Sent: 1
Received: 0
Sent: 2
Received: 1
Sent: 3
Received: 2
Sent: 4
Received: 3
Received: 4
Websocket closed
我们在请求的同时也在接受消息
import ssl
import websocket
import json
# 全局计数器
count = 5
def on_message(ws, message):
global count
print(message)
count -= 1
# 接收了5次消息之后关闭websocket连接
if count == 0:
ws.close()
if __name__ == "__main__":
ws = websocket.WebSocketApp(
"wss://api.gemini.com/v1/marketdata/btcusd?top_of_book=true&offers=true",
on_message=on_message)
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
###### 输出 #######
{"type":"update","eventId":7275473603,"socket_sequence":0,"events":[{"type":"change","reason":"initial","price":"11386.12","delta":"1.307","remaining":"1.307","side":"ask"}]}
{"type":"update","eventId":7275475120,"timestamp":1562380981,"timestampms":1562380981991,"socket_sequence":1,"events":[{"type":"change","side":"ask","price":"11386.62","remaining":"1","reason":"top-of-book"}]}
{"type":"update","eventId":7275475271,"timestamp":1562380982,"timestampms":1562380982387,"socket_sequence":2,"events":[{"type":"change","side":"ask","price":"11386.12","remaining":"1.3148","reason":"top-of-book"}]}
{"type":"update","eventId":7275475838,"timestamp":1562380986,"timestampms":1562380986270,"socket_sequence":3,"events":[{"type":"change","side":"ask","price":"11387.16","remaining":"0.072949","reason":"top-of-book"}]}
{"type":"update","eventId":7275475935,"timestamp":1562380986,"timestampms":1562380986767,"socket_sequence":4,"events":[{"type":"change","side":"ask","price":"11389.22","remaining":"0.06204196","reason":"top-of-book"}]}
import copy
import json
import ssl
import time
import websocket
class OrderBook(object):
BIDS = 'bid'
ASKS = 'ask'
def __init__(self, limit=20):
self.limit = limit
# (price, amount)
self.bids = {}
self.asks = {}
self.bids_sorted = []
self.asks_sorted = []
def insert(self, price, amount, direction):
if direction == self.BIDS:
if amount == 0:
if price in self.bids:
del self.bids[price]
else:
self.bids[price] = amount
elif direction == self.ASKS:
if amount == 0:
if price in self.asks:
del self.asks[price]
else:
self.asks[price] = amount
else:
print('WARNING: unknown direction {}'.format(direction))
def sort_and_truncate(self):
# sort
self.bids_sorted = sorted([(price, amount) for price, amount in self.bids.items()], reverse=True)
self.asks_sorted = sorted([(price, amount) for price, amount in self.asks.items()])
# truncate
self.bids_sorted = self.bids_sorted[:self.limit]
self.asks_sorted = self.asks_sorted[:self.limit]
# copy back to bids and asks
self.bids = dict(self.bids_sorted)
self.asks = dict(self.asks_sorted)
def get_copy_of_bids_and_asks(self):
return copy.deepcopy(self.bids_sorted), copy.deepcopy(self.asks_sorted)
class Crawler:
def __init__(self, symbol, output_file):
self.orderbook = OrderBook(limit=10)
self.output_file = output_file
self.ws = websocket.WebSocketApp('wss://api.gemini.com/v1/marketdata/{}'.format(symbol),
on_message = lambda ws, message: self.on_message(message))
self.ws.run_forever(sslopt={'cert_reqs': ssl.CERT_NONE})
def on_message(self, message):
# 对收到的信息进行处理,然后送给 orderbook
data = json.loads(message)
for event in data['events']:
price, amount, direction = float(event['price']), float(event['remaining']), event['side']
self.orderbook.insert(price, amount, direction)
# 整理 orderbook,排序,只选取我们需要的前几个
self.orderbook.sort_and_truncate()
# 输出到文件
with open(self.output_file, 'a+') as f:
bids, asks = self.orderbook.get_copy_of_bids_and_asks()
output = {
'bids': bids,
'asks': asks,
'ts': int(time.time() * 1000)
}
f.write(json.dumps(output) + '\n')
if __name__ == '__main__':
crawler = Crawler(symbol='BTCUSD', output_file='BTCUSD.txt')
###### 输出 #######
{"bids": [[11398.73, 0.96304843], [11398.72, 0.98914437], [11397.32, 1.0], [11396.13, 2.0], [11395.95, 2.0], [11395.87, 1.0], [11394.09, 0.11803397], [11394.08, 1.0], [11393.59, 0.1612581], [11392.96, 1.0]], "asks": [[11407.42, 1.30814001], [11407.92, 1.0], [11409.48, 2.0], [11409.66, 2.0], [11412.15, 0.525], [11412.42, 1.0], [11413.77, 0.11803397], [11413.99, 0.5], [11414.28, 1.0], [11414.72, 1.0]], "ts": 1562558996535}
{"bids": [[11398.73, 0.96304843], [11398.72, 0.98914437], [11397.32, 1.0], [11396.13, 2.0], [11395.95, 2.0], [11395.87, 1.0], [11394.09, 0.11803397], [11394.08, 1.0], [11393.59, 0.1612581], [11392.96, 1.0]], "asks": [[11407.42, 1.30814001], [11407.92, 1.0], [11409.48, 2.0], [11409.66, 2.0], [11412.15, 0.525], [11412.42, 1.0], [11413.77, 0.11803397], [11413.99, 0.5], [11414.28, 1.0], [11414.72, 1.0]], "ts": 1562558997377}
{"bids": [[11398.73, 0.96304843], [11398.72, 0.98914437], [11397.32, 1.0], [11396.13, 2.0], [11395.95, 2.0], [11395.87, 1.0], [11394.09, 0.11803397], [11394.08, 1.0], [11393.59, 0.1612581], [11392.96, 1.0]], "asks": [[11407.42, 1.30814001], [11409.48, 2.0], [11409.66, 2.0], [11412.15, 0.525], [11412.42, 1.0], [11413.77, 0.11803397], [11413.99, 0.5], [11414.28, 1.0], [11414.72, 1.0]], "ts": 1562558997765}
{"bids": [[11398.73, 0.96304843], [11398.72, 0.98914437], [11397.32, 1.0], [11396.13, 2.0], [11395.95, 2.0], [11395.87, 1.0], [11394.09, 0.11803397], [11394.08, 1.0], [11393.59, 0.1612581], [11392.96, 1.0]], "asks": [[11407.42, 1.30814001], [11409.48, 2.0], [11409.66, 2.0], [11412.15, 0.525], [11413.77, 0.11803397], [11413.99, 0.5], [11414.28, 1.0], [11414.72, 1.0]], "ts": 1562558998638}
{"bids": [[11398.73, 0.97131753], [11398.72, 0.98914437], [11397.32, 1.0], [11396.13, 2.0], [11395.95, 2.0], [11395.87, 1.0], [11394.09, 0.11803397], [11394.08, 1.0], [11393.59, 0.1612581], [11392.96, 1.0]], "asks": [[11407.42, 1.30814001], [11409.48, 2.0], [11409.66, 2.0], [11412.15, 0.525], [11413.77, 0.11803397], [11413.99, 0.5], [11414.28, 1.0], [11414.72, 1.0]], "ts": 1562558998645}
{"bids": [[11398.73, 0.97131753], [11398.72, 0.98914437], [11397.32, 1.0], [11396.13, 2.0], [11395.87, 1.0], [11394.09, 0.11803397], [11394.08, 1.0], [11393.59, 0.1612581], [11392.96, 1.0]], "asks": [[11407.42, 1.30814001], [11409.48, 2.0], [11409.66, 2.0], [11412.15, 0.525], [11413.77, 0.11803397], [11413.99, 0.5], [11414.28, 1.0], [11414.72, 1.0]], "ts": 1562558998748}
def assert_msg(condition, msg):
if not condition:
raise Exception(msg)
def read_file(filename):
# 获得文件绝对路径
filepath = path.join(path.dirname(__file__), filename)
# 判定文件是否存在
assert_msg(path.exists(filepath), "文件不存在")
# 读取CSV文件并返回
return pd.read_csv(filepath,
index_col=0,
parse_dates=True,
infer_datetime_format=True)
BTCUSD = read_file('BTCUSD_GEMINI.csv')
assert_msg(BTCUSD.__len__() > 0, '读取失败')
print(BTCUSD.head())
########## 输出 ##########
Time Symbol Open High Low Close Volume
Date
2019-07-08 00:00:00 BTCUSD 11475.07 11540.33 11469.53 11506.43 10.770731
2019-07-07 23:00:00 BTCUSD 11423.00 11482.72 11423.00 11475.07 32.996559
2019-07-07 22:00:00 BTCUSD 11526.25 11572.74 11333.59 11423.00 48.937730
2019-07-07 21:00:00 BTCUSD 11515.80 11562.65 11478.20 11526.25 25.323908
2019-07-07 20:00:00 BTCUSD 11547.98 11624.88 11423.94 11515.80 63.211972
class Backtest:
"""
Backtest回测类,用于读取历史行情数据、执行策略、模拟交易并估计
收益。
初始化的时候调用Backtest.run来时回测
instance, or `backtesting.backtesting.Backtest.optimize` to
optimize it.
"""
def __init__(self,
data: pd.DataFrame,
strategy_type: type(Strategy),
broker_type: type(ExchangeAPI),
cash: float = 10000,
commission: float = .0):
"""
构造回测对象。需要的参数包括:历史数据,策略对象,初始资金数量,手续费率等。
初始化过程包括检测输入类型,填充数据空值等。
参数:
:param data: pd.DataFrame pandas Dataframe格式的历史OHLCV数据
:param broker_type: type(ExchangeAPI) 交易所API类型,负责执行买卖操作以及账户状态的维护
:param strategy_type: type(Strategy) 策略类型
:param cash: float 初始资金数量
:param commission: float 每次交易手续费率。如2%的手续费此处为0.02
"""
assert_msg(issubclass(strategy_type, Strategy), 'strategy_type不是一个Strategy类型')
assert_msg(issubclass(broker_type, ExchangeAPI), 'strategy_type不是一个Strategy类型')
assert_msg(isinstance(commission, Number), 'commission不是浮点数值类型')
data = data.copy(False)
# 如果没有Volumn列,填充NaN
if 'Volume' not in data:
data['Volume'] = np.nan
# 验证OHLC数据格式
assert_msg(len(data.columns & {'Open', 'High', 'Low', 'Close', 'Volume'}) == 5,
("输入的`data`格式不正确,至少需要包含这些列:"
"'Open', 'High', 'Low', 'Close'"))
# 检查缺失值
assert_msg(not data[['Open', 'High', 'Low', 'Close']].max().isnull().any(),
('部分OHLC包含缺失值,请去掉那些行或者通过差值填充. '))
# 如果行情数据没有按照时间排序,重新排序一下
if not data.index.is_monotonic_increasing:
data = data.sort_index()
# 利用数据,初始化交易所对象和策略对象。
self._data = data # type: pd.DataFrame
self._broker = broker_type(data, cash, commission)
self._strategy = strategy_type(self._broker, self._data)
self._results = None
def run(self):
"""
运行回测,迭代历史数据,执行模拟交易并返回回测结果。
Run the backtest. Returns `pd.Series` with results and statistics.
Keyword arguments are interpreted as strategy parameters.
"""
strategy = self._strategy
broker = self._broker
# 策略初始化
strategy.init()
# 设定回测开始和结束位置
start = 100
end = len(self._data)
# 回测主循环,更新市场状态,然后执行策略
for i in range(start, end):
# 注意要先把市场状态移动到第i时刻,然后再执行策略。
broker.next(i)
strategy.next(i)
# 完成策略执行之后,计算结果并返回
self._results = self._compute_result(broker)
return self._results
def _compute_result(self, broker):
s = pd.Series()
s['初始市值'] = broker.initial_cash
s['结束市值'] = broker.market_value
s['收益'] = broker.market_value - broker.initial_cash
return s
def SMA(values, n):
"""
返回简单滑动平均
"""
return pd.Series(values).rolling(n).mean()
def crossover(series1, series2) -> bool:
"""
检查两个序列是否在结尾交叉
:param series1: 序列1
:param series2: 序列2
:return: 如果交叉返回True,反之False
"""
return series1[-2] < series2[-2] and series1[-1] > series2[-1]
def next(self, tick):
# 如果此时快线刚好越过慢线,买入全部
if crossover(self.sma1[:tick], self.sma2[:tick]):
self.buy()
# 如果是慢线刚好越过快线,卖出全部
elif crossover(self.sma2[:tick], self.sma1[:tick]):
self.sell()
# 否则,这个时刻不执行任何操作。
else:
pass
import abc
import numpy as np
from typing import Callable
class Strategy(metaclass=abc.ABCMeta):
"""
抽象策略类,用于定义交易策略。
如果要定义自己的策略类,需要继承这个基类,并实现两个抽象方法:
Strategy.init
Strategy.next
"""
def __init__(self, broker, data):
"""
构造策略对象。
@params broker: ExchangeAPI 交易API接口,用于模拟交易
@params data: list 行情数据数据
"""
self._indicators = []
self._broker = broker # type: _Broker
self._data = data # type: _Data
self._tick = 0
def I(self, func: Callable, *args) -> np.ndarray:
"""
计算买卖指标向量。买卖指标向量是一个数组,长度和历史数据对应;
用于判定这个时间点上需要进行"买"还是"卖"。
例如计算滑动平均:
def init():
self.sma = self.I(utils.SMA, self.data.Close, N)
"""
value = func(*args)
value = np.asarray(value)
assert_msg(value.shape[-1] == len(self._data.Close), '指示器长度必须和data长度相同')
self._indicators.append(value)
return value
@property
def tick(self):
return self._tick
@abc.abstractmethod
def init(self):
"""
初始化策略。在策略回测/执行过程中调用一次,用于初始化策略内部状态。
这里也可以预计算策略的辅助参数。比如根据历史行情数据:
计算买卖的指示器向量;
训练模型/初始化模型参数
"""
pass
@abc.abstractmethod
def next(self, tick):
"""
步进函数,执行第tick步的策略。tick代表当前的"时间"。比如data[tick]用于访问当前的市场价格。
"""
pass
def buy(self):
self._broker.buy()
def sell(self):
self._broker.sell()
@property
def data(self):
return self._data
from utils import assert_msg, crossover, SMA
class SmaCross(Strategy):
# 小窗口SMA的窗口大小,用于计算SMA快线
fast = 10
# 大窗口SMA的窗口大小,用于计算SMA慢线
slow = 20
def init(self):
# 计算历史上每个时刻的快线和慢线
self.sma1 = self.I(SMA, self.data.Close, self.fast)
self.sma2 = self.I(SMA, self.data.Close, self.slow)
def next(self, tick):
# 如果此时快线刚好越过慢线,买入全部
if crossover(self.sma1[:tick], self.sma2[:tick]):
self.buy()
# 如果是慢线刚好越过快线,卖出全部
elif crossover(self.sma2[:tick], self.sma1[:tick]):
self.sell()
# 否则,这个时刻不执行任何操作。
else:
pass
买到的数量= 投入的资金 * (1.0 - 手续费) / 价格
卖出的收益= 持有的数量 * 价格 * (1.0 - 手续费)
from utils import read_file, assert_msg, crossover, SMA
class ExchangeAPI:
def __init__(self, data, cash, commission):
assert_msg(0 < cash, "初始现金数量大于0,输入的现金数量:{}".format(cash))
assert_msg(0 <= commission <= 0.05, "合理的手续费率一般不会超过5%,输入的费率:{}".format(commission))
self._inital_cash = cash
self._data = data
self._commission = commission
self._position = 0
self._cash = cash
self._i = 0
@property
def cash(self):
"""
:return: 返回当前账户现金数量
"""
return self._cash
@property
def position(self):
"""
:return: 返回当前账户仓位
"""
return self._position
@property
def initial_cash(self):
"""
:return: 返回初始现金数量
"""
return self._inital_cash
@property
def market_value(self):
"""
:return: 返回当前市值
"""
return self._cash + self._position * self.current_price
@property
def current_price(self):
"""
:return: 返回当前市场价格
"""
return self._data.Close[self._i]
def buy(self):
"""
用当前账户剩余资金,按照市场价格全部买入
"""
self._position = float(self._cash / (self.current_price * (1 + self._commission)))
self._cash = 0.0
def sell(self):
"""
卖出当前账户剩余持仓
"""
self._cash += float(self._position * self.current_price * (1 - self._commission))
self._position = 0.0
def next(self, tick):
self._i = tick
def main():
BTCUSD = read_file('BTCUSD_GEMINI.csv')
ret = Backtest(BTCUSD, SmaCross, ExchangeAPI, 10000.0, 0.00).run()
print(ret)
if __name__ == '__main__':
main()
初始市值10000.000000
结束市值 576361.772884
收益 566361.772884
初始市值10000.000000
结束市值 2036.562001
收益 -7963.437999
作者 Pieter Hintjens 是一位大牛,他本人的经历也很传奇,2010 年诊断出胆管癌,并成功做了手术切除。但 2016 年 4 月,却发现癌症大面积扩散到了肺部,已经无法治疗。他写的最后一篇通信模式是关于死亡协议的,之后在比利时选择接受安乐死。
# 订阅者 1
import zmq
def run():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:6666')
socket.setsockopt_string(zmq.SUBSCRIBE, '')
print('client 1')
while True:
msg = socket.recv()
print("msg: %s" % msg)
if __name__ == '__main__':
run()
########## 输出 ##########
client 1
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'
# 订阅者 2
import zmq
def run():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:6666')
socket.setsockopt_string(zmq.SUBSCRIBE, '')
print('client 2')
while True:
msg = socket.recv()
print("msg: %s" % msg)
if __name__ == '__main__':
run()
########## 输出 ##########
client 2
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'
# 发布者
import time
import zmq
def run():
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://*:6666')
cnt = 1
while True:
time.sleep(1)
socket.send_string('server cnt {}'.format(cnt))
print('send {}'.format(cnt))
cnt += 1
if __name__ == '__main__':
run()
########## 输出 ##########
send 1
send 2
send 3
send 4
send 5
socket.setsockopt_string(zmq.SUBSCRIBE, '')
Python 之站在高层框架下的 SQLAIchemy 操作 MySQL(关系型数据库)
sudo apt-get install python3-dev
pip install mysqlclient
import MySQLdb
def test_pymysql():
conn = MySQLdb.connect(
host='localhost',
port=3306,
user='your_username',
passwd=your_password’,
db='mysql'
)
cur = conn.cursor()
cur.execute('''
CREATE TABLE price (
timestamp TIMESTAMP NOT NULL,
BTCUSD FLOAT(8,2),
PRIMARY KEY (timestamp)
);
''')
cur.execute('''
INSERT INTO price VALUES(
"2019-07-14 14:12:17",
11234.56
);
''')
conn.commit()
conn.close()
test_pymy
pip install peewee
import peewee
from peewee import *
db = MySQLDatabase('mysql', user='your_username', passwd=your_password’)
class Price(peewee.Model):
timestamp = peewee.DateTimeField(primary_key=True)
BTCUSD = peewee.FloatField()
class Meta:
database = db
def test_peewee():
Price.create_table()
price = Price(timestamp='2019-06-07 13:17:18', BTCUSD='12345.67')
price.save()
test_p
import MySQLdb
import numpy as np
def test_pymysql():
conn = MySQLdb.connect(
host='localhost',
port=3306,
user='your_username',
passwd='your_password',
db='mysql'
)
cur = conn.cursor()
cur.execute('''
SELECT
BTCUSD
FROM
price
WHERE
timestamp > now() - interval 60 minute
''')
BTCUSD = np.array(cur.fetchall())
print(BTCUSD.max(), BTCUSD.min())
conn.close()
test_pym
pip3 install Django
django-admin --version
########## 输出 ##########
2.2.3
django-admin startproject TradingMonitor
cd TradingMonitor/
python3 manage.py migrate
########## 输出 ##########
Applying contenttypes.0001_initial... OK
Applying auth.0001_initial... OK
Applying admin.0001_initial... OK
Applying admin.0002_logentry_remove_auto_add... OK
Applying admin.0003_logentry_add_action_flag_choices... OK
Applying contenttypes.0002_remove_content_type_name... OK
Applying auth.0002_alter_permission_name_max_length... OK
Applying auth.0003_alter_user_email_max_length... OK
Applying auth.0004_alter_user_username_opts... OK
Applying auth.0005_alter_user_last_login_null... OK
Applying auth.0006_require_contenttypes_0002... OK
Applying auth.0007_alter_validators_add_error_messages... OK
Applying auth.0008_alter_user_username_max_length... OK
Applying auth.0009_alter_user_last_name_max_length... OK
Applying auth.0010_alter_group_name_max_length... OK
Applying auth.0011_update_proxy_permissions... OK
Applying sessions.0001_initial... OK
TradingMonitor/
├── TradingMonitor
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── db.sqlite3
└── manage.py
python3 manage.py createsuperuser
########## 输出 ##########
Username (leave blank to use 'ubuntu'): admin
Email address:
Password:
Password (again):
Superuser created successfully.
python3 manage.py runserver
# TradingMonitor/models.py
from django.db import models
class Position(models.Model):
asset = models.CharField(max_length=10)
timestamp = models.DateTimeField()
amount = models.DecimalField(max_digits=10, decimal_places=3)
# TradingMonitor/views.py
from django.shortcuts import render
from .models import Position
def render_positions(request, asset):
positions = Position.objects.filter(asset = asset)
context = {'asset': asset, 'positions': positions}
return render(request, 'positions.html', context)
# TradingMonitor/templates/positions.html
<!DOCTYPE html>
<html lang="en-US">
<head>
<title>Positions for {{asset}}</title>
</head>
<body>
<h1>Positions for {{asset}}</h1>
<table>
<tr>
<th>Time</th>
<th>Amount</th>
</tr>
{% for position in positions %}
<tr>
<th>{{position.timestamp}}</th>
<th>{{position.amount}}</th>
</tr>
{% endfor %}
</table>
</body>
# TradingMonitor/urls.py
from django.contrib import admin
from django.urls import path
from . import views
urlpatterns = [
path('admin/', admin.site.urls),
path('positions/<str:asset>', views.render_positions),
]
mkdir TradingMonitor/migrations
touch TradingMonitor/migrations/__init__.py
TradingMonitor/
├── TradingMonitor
│ ├── migrations
│ └── __init__.py
│ ├── templates
│ └── positions.html
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ ├── models.py
│ ├── views.py
│ └── wsgi.py
├── db.sqlite3
└── manage.py
INSTALLED_APPS= [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'TradingMonitor', # 这里把我们的 app 加上
]
TEMPLATES= [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'TradingMonitor/templates')], # 这里把 templates 的目录加上
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
python manage.py makemigrations
########## 输出 ##########
Migrations for 'TradingMonitor':
TradingMonitor/migrations/0001_initial.py
- Create model Position
python manage.py migrate
########## 输出 ##########
Operations to perform:
Apply all migrations: TradingMonitor, admin, auth, contenttypes, sessions
Running migrations:
Applying TradingMonitor.0001_initial... OK
算法:深度优先搜索(DFS)和广度优先搜索(BFS)
算法:贪心和动态规划
作为 Python 语言,我确实不可能给你把每一种数据结构和算法都详细讲解一遍,但是,还是那句话,基础的数据结构和算法,一定是每个程序员的基本功。
在数据爆炸的互联网的今天,学习资料触手可及,时间就显得更加宝贵。我在这里列出这些纲要的目的,也是希望能够帮你节省时间,为你整理出适合入门学习、掌握的基础知识点,让你可以带着全局观更有针对性地去学习。
当然,一切可以取得成果的学习,都离不开我们自己付出的努力。也只有这样,掌握了数据结构和算法的你,才能在数学基础上对 Python 的理解更进一步。同时,在未来的项目设计中,这些思维亦会在无形之中,帮你设计出更高质量的系统和架构,可以说是终生受益的学习投资了。
希望你可以学会并且切实有所收获,我的公众号是:AI悦创(微信号:AI-YueChuang)
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我正在使用这个:4.times{|i|assert_not_equal("content#{i+2}".constantize,object.first_content)}我之前声明过局部变量content1content2content3content4content5我得到的错误NameError:wrongconstantnamecontent2这个错误是什么意思?我很确定我想要content2=\ 最佳答案 你必须用一个大字母来调用ruby常量:Content2而不是content2。Aconstantnamestart
这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
电脑0x0000001A蓝屏错误怎么U盘重装系统教学分享。有用户电脑开机之后遇到了系统蓝屏的情况。系统蓝屏问题很多时候都是系统bug,只有通过重装系统来进行解决。那么蓝屏问题如何通过U盘重装新系统来解决呢?来看看以下的详细操作方法教学吧。 准备工作: 1、U盘一个(尽量使用8G以上的U盘)。 2、一台正常联网可使用的电脑。 3、ghost或ISO系统镜像文件(Win10系统下载_Win10专业版_windows10正式版下载-系统之家)。 4、在本页面下载U盘启动盘制作工具:系统之家U盘启动工具。 U盘启动盘制作步骤: 注意:制作期间,U盘会被格式化,因此U盘中的重要文件请注
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList()Obt
我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的
本文主要介绍在使用Selenium进行自动化测试或者任务时,对于使用了iframe的页面,如何定位iframe中的元素文章目录场景描述解决方案具体代码场景描述当我们在使用Selenium进行自动化测试的时候,可能会遇到一些界面或者窗体是使用HTML的iframe标签进行承载的。对于iframe中的标签,如果直接查找是无法找到的,会抛出没有找到元素的异常。比如近在咫尺的例子就是,CSDN的登录窗体就是使用的iframe,大家可以尝试通过F12开发者模式查看到的tag_name,class_name,id或者xpath来定位中的页面元素,会抛出NoSuchElementException异常。解决
需求:要创建虚拟机,就需要给他提供一个虚拟的磁盘,我们就在/opt目录下创建一个10G大小的raw格式的虚拟磁盘CentOS-7-x86_64.raw命令格式:qemu-imgcreate-f磁盘格式磁盘名称磁盘大小qemu-imgcreate-f磁盘格式-o?1.创建磁盘qemu-imgcreate-fraw/opt/CentOS-7-x86_64.raw10G执行效果#ls/opt/CentOS-7-x86_64.raw2.安装虚拟机使用virt-install命令,基于我们提供的系统镜像和虚拟磁盘来创建一个虚拟机,另外在创建虚拟机之前,提前打开vnc客户端,在创建虚拟机的时候,通过vnc