Omicron提供数据持久化、时间(日历、triggers)、行情数据model、基础运算和基础量化因子
close()
async
¶
关闭与缓存的连接
Source code in omicron/__init__.py
async def close():
"""关闭与缓存的连接"""
try:
await cache.close()
except Exception as e: # noqa
pass
init(app_cache=5)
async
¶
初始化Omicron
初始化influxDB, 缓存等连接, 并加载日历和证券列表
上述初始化的连接,应该在程序退出时,通过调用close()
关闭
Source code in omicron/__init__.py
async def init(app_cache: int = 5):
"""初始化Omicron
初始化influxDB, 缓存等连接, 并加载日历和证券列表
上述初始化的连接,应该在程序退出时,通过调用`close()`关闭
"""
global cache
await cache.init(app=app_cache)
await tf.init()
from omicron.models.security import Security
await Security.init()
Extensions package¶
decimals
¶
math_round(x, digits)
¶
由于浮点数的表示问题,很多语言的round函数与数学上的round函数不一致。下面的函数结果与数学上的一致。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
float |
要进行四舍五入的数字 |
required |
digits |
int |
小数点后保留的位数 |
required |
Source code in omicron/extensions/decimals.py
def math_round(x: float, digits: int):
"""由于浮点数的表示问题,很多语言的round函数与数学上的round函数不一致。下面的函数结果与数学上的一致。
Args:
x: 要进行四舍五入的数字
digits: 小数点后保留的位数
"""
return int(x * (10**digits) + copysign(0.5, x)) / (10**digits)
price_equal(x, y)
¶
判断股价是否相等
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
价格1 |
required | |
y |
价格2 |
required |
Returns:
Type | Description |
---|---|
bool |
如果相等则返回True,否则返回False |
Source code in omicron/extensions/decimals.py
def price_equal(x: float, y: float) -> bool:
"""判断股价是否相等
Args:
x : 价格1
y : 价格2
Returns:
如果相等则返回True,否则返回False
"""
return abs(math_round(x, 2) - math_round(y, 2)) < 1e-2
np
¶
Extension function related to numpy
array_math_round(arr, digits)
¶
将一维数组arr的数据进行四舍五入
numpy.around的函数并不是数学上的四舍五入,对1.5和2.5进行round的结果都会变成2,在金融领域计算中,我们必须使用数学意义上的四舍五入。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
ArrayLike |
输入数组 |
required |
digits |
int |
required |
Returns:
Type | Description |
---|---|
np.ndarray |
四舍五入后的一维数组 |
Source code in omicron/extensions/np.py
def array_math_round(arr: Union[float, ArrayLike], digits: int) -> np.ndarray:
"""将一维数组arr的数据进行四舍五入
numpy.around的函数并不是数学上的四舍五入,对1.5和2.5进行round的结果都会变成2,在金融领域计算中,我们必须使用数学意义上的四舍五入。
Args:
arr (ArrayLike): 输入数组
digits (int):
Returns:
np.ndarray: 四舍五入后的一维数组
"""
# 如果是单个元素,则直接返回
if isinstance(arr, float):
return decimals.math_round(arr, digits)
f = np.vectorize(lambda x: decimals.math_round(x, digits))
return f(arr)
array_price_equal(price1, price2)
¶
判断两个价格数组是否相等
Parameters:
Name | Type | Description | Default |
---|---|---|---|
price1 |
ArrayLike |
价格数组 |
required |
price2 |
ArrayLike |
价格数组 |
required |
Returns:
Type | Description |
---|---|
np.ndarray |
判断结果 |
Source code in omicron/extensions/np.py
def array_price_equal(price1: ArrayLike, price2: ArrayLike) -> np.ndarray:
"""判断两个价格数组是否相等
Args:
price1 (ArrayLike): 价格数组
price2 (ArrayLike): 价格数组
Returns:
np.ndarray: 判断结果
"""
price1 = array_math_round(price1, 2)
price2 = array_math_round(price2, 2)
return abs(price1 - price2) < 1e-2
bars_since(condition, default=None)
¶
Return the number of bars since condition
sequence was last True
,
or if never, return default
.
1 2 3 |
|
Source code in omicron/extensions/np.py
def bars_since(condition: Sequence[bool], default=None) -> int:
"""
Return the number of bars since `condition` sequence was last `True`,
or if never, return `default`.
>>> condition = [True, True, False]
>>> bars_since(condition)
1
"""
return next(compress(range(len(condition)), reversed(condition)), default)
bin_cut(arr, n)
¶
将数组arr切分成n份
todo: use padding + reshape to boost performance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
[type] |
[description] |
required |
n |
[type] |
[description] |
required |
Returns:
Type | Description |
---|---|
[type] |
[description] |
Source code in omicron/extensions/np.py
def bin_cut(arr: list, n: int):
"""将数组arr切分成n份
todo: use padding + reshape to boost performance
Args:
arr ([type]): [description]
n ([type]): [description]
Returns:
[type]: [description]
"""
result = [[] for i in range(n)]
for i, e in enumerate(arr):
result[i % n].append(e)
return [e for e in result if len(e)]
count_between(arr, start, end)
¶
计算数组中,start
元素与end
元素之间共有多少个元素
要求arr必须是已排序。计算结果会包含区间边界点。
Examples:
>>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
>>> count_between(arr, 20050104, 20050111)
6
>>> count_between(arr, 20050104, 20050109)
4
Source code in omicron/extensions/np.py
def count_between(arr, start, end):
"""计算数组中,`start`元素与`end`元素之间共有多少个元素
要求arr必须是已排序。计算结果会包含区间边界点。
Examples:
>>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
>>> count_between(arr, 20050104, 20050111)
6
>>> count_between(arr, 20050104, 20050109)
4
"""
pos_start = np.searchsorted(arr, start, side="right")
pos_end = np.searchsorted(arr, end, side="right")
counter = pos_end - pos_start + 1
if start < arr[0]:
counter -= 1
if end > arr[-1]:
counter -= 1
return counter
dataframe_to_structured_array(df, dtypes=None)
¶
convert dataframe (with all columns, and index possibly) to numpy structured arrays
len(dtypes)
should be either equal to len(df.columns)
or len(df.columns) + 1
. In the later case, it implies to include df.index
into converted array.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
the one needs to be converted |
required |
dtypes |
List[Tuple] |
Defaults to None. If it's |
None |
Returns:
Type | Description |
---|---|
ArrayLike |
[description] |
Source code in omicron/extensions/np.py
def dataframe_to_structured_array(
df: DataFrame, dtypes: List[Tuple] = None
) -> ArrayLike:
"""convert dataframe (with all columns, and index possibly) to numpy structured arrays
`len(dtypes)` should be either equal to `len(df.columns)` or `len(df.columns) + 1`. In the later case, it implies to include `df.index` into converted array.
Args:
df: the one needs to be converted
dtypes: Defaults to None. If it's `None`, then dtypes of `df` is used, in such case, the `index` of `df` will not be converted.
Returns:
ArrayLike: [description]
"""
v = df
if dtypes is not None:
dtypes_in_dict = {key: value for key, value in dtypes}
col_len = len(df.columns)
if len(dtypes) == col_len + 1:
v = df.reset_index()
rename_index_to = set(dtypes_in_dict.keys()).difference(set(df.columns))
v.rename(columns={"index": list(rename_index_to)[0]}, inplace=True)
elif col_len != len(dtypes):
raise ValueError(
f"length of dtypes should be either {col_len} or {col_len + 1}, is {len(dtypes)}"
)
# re-arrange order of dtypes, in order to align with df.columns
dtypes = []
for name in v.columns:
dtypes.append((name, dtypes_in_dict[name]))
else:
dtypes = df.dtypes
return np.array(np.rec.fromrecords(v.values), dtype=dtypes)
dict_to_numpy_array(d, dtype)
¶
convert dictionary to numpy array
Examples:
d = {"aaron": 5, "jack": 6} dtype = [("name", "S8"), ("score", "<i4")] dict_to_numpy_array(d, dtype) array([(b'aaron', 5), (b'jack', 6)], dtype=[('name', 'S8'), ('score', '<i4')])
Parameters:
Name | Type | Description | Default |
---|---|---|---|
d |
dict |
[description] |
required |
dtype |
List[Tuple] |
[description] |
required |
Returns:
Type | Description |
---|---|
np.array |
[description] |
Source code in omicron/extensions/np.py
def dict_to_numpy_array(d: dict, dtype: List[Tuple]) -> np.array:
"""convert dictionary to numpy array
Examples:
>>> d = {"aaron": 5, "jack": 6}
>>> dtype = [("name", "S8"), ("score", "<i4")]
>>> dict_to_numpy_array(d, dtype)
array([(b'aaron', 5), (b'jack', 6)],
dtype=[('name', 'S8'), ('score', '<i4')])
Args:
d (dict): [description]
dtype (List[Tuple]): [description]
Returns:
np.array: [description]
"""
return np.fromiter(d.items(), dtype=dtype, count=len(d))
fill_nan(ts)
¶
将ts中的NaN替换为其前值
如果ts起头的元素为NaN,则用第一个非NaN元素替换。
如果所有元素都为NaN,则无法替换。
Examples:
>>> arr = np.arange(6, dtype=np.float32)
>>> arr[3:5] = np.NaN
>>> fill_nan(arr)
...
array([0., 1., 2., 2., 2., 5.], dtype=float32)
>>> arr = np.arange(6, dtype=np.float32)
>>> arr[0:2] = np.nan
>>> fill_nan(arr)
...
array([2., 2., 2., 3., 4., 5.], dtype=float32)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ts |
np.array |
[description] |
required |
Source code in omicron/extensions/np.py
def fill_nan(ts: np.ndarray):
"""将ts中的NaN替换为其前值
如果ts起头的元素为NaN,则用第一个非NaN元素替换。
如果所有元素都为NaN,则无法替换。
Example:
>>> arr = np.arange(6, dtype=np.float32)
>>> arr[3:5] = np.NaN
>>> fill_nan(arr)
... # doctest: +NORMALIZE_WHITESPACE
array([0., 1., 2., 2., 2., 5.], dtype=float32)
>>> arr = np.arange(6, dtype=np.float32)
>>> arr[0:2] = np.nan
>>> fill_nan(arr)
... # doctest: +NORMALIZE_WHITESPACE
array([2., 2., 2., 3., 4., 5.], dtype=float32)
Args:
ts (np.array): [description]
"""
if np.all(np.isnan(ts)):
raise ValueError("all of ts are NaN")
if ts[0] is None or math.isnan(ts[0]):
idx = np.argwhere(~np.isnan(ts))[0]
ts[0] = ts[idx]
mask = np.isnan(ts)
idx = np.where(~mask, np.arange(mask.size), 0)
np.maximum.accumulate(idx, out=idx)
return ts[idx]
find_runs(x)
¶
Find runs of consecutive items in an array.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
ArrayLike |
the sequence to find runs in |
required |
Returns:
Type | Description |
---|---|
Tuple[np.ndarray, np.ndarray, np.ndarray] |
A tuple of unique values, start indices, and length of runs |
Source code in omicron/extensions/np.py
def find_runs(x: ArrayLike) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Find runs of consecutive items in an array.
Args:
x: the sequence to find runs in
Returns:
A tuple of unique values, start indices, and length of runs
"""
# ensure array
x = np.asanyarray(x)
if x.ndim != 1:
raise ValueError("only 1D array supported")
n = x.shape[0]
# handle empty array
if n == 0:
return np.array([]), np.array([]), np.array([])
else:
# find run starts
loc_run_start = np.empty(n, dtype=bool)
loc_run_start[0] = True
np.not_equal(x[:-1], x[1:], out=loc_run_start[1:])
run_starts = np.nonzero(loc_run_start)[0]
# find run values
run_values = x[loc_run_start]
# find run lengths
run_lengths = np.diff(np.append(run_starts, n))
return run_values, run_starts, run_lengths
floor(arr, item)
¶
在数据arr中,找到小于等于item的那一个值。如果item小于所有arr元素的值,返回arr[0];如果item 大于所有arr元素的值,返回arr[-1]
与minute_frames_floor
不同的是,本函数不做回绕与进位.
Examples:
>>> a = [3, 6, 9]
>>> floor(a, -1)
3
>>> floor(a, 9)
9
>>> floor(a, 10)
9
>>> floor(a, 4)
3
>>> floor(a,10)
9
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
required | ||
item |
required |
Source code in omicron/extensions/np.py
def floor(arr, item):
"""
在数据arr中,找到小于等于item的那一个值。如果item小于所有arr元素的值,返回arr[0];如果item
大于所有arr元素的值,返回arr[-1]
与`minute_frames_floor`不同的是,本函数不做回绕与进位.
Examples:
>>> a = [3, 6, 9]
>>> floor(a, -1)
3
>>> floor(a, 9)
9
>>> floor(a, 10)
9
>>> floor(a, 4)
3
>>> floor(a,10)
9
Args:
arr:
item:
Returns:
"""
if item < arr[0]:
return arr[0]
index = np.searchsorted(arr, item, side="right")
return arr[index - 1]
join_by_left(key, r1, r2, mask=True)
¶
左连接 r1
, r2
by key
如果r1
中存在r2
中没有的行,则该行对应的r2
中的那些字段将被mask,或者填充随机数。
same as numpy.lib.recfunctions.join_by(key, r1, r2, jointype='leftouter'), but allows r1 have duplicate keys
Examples:
>>> # to join the following
>>> # [[ 1, 2],
>>> # [ 1, 3], x [[1, 5],
>>> # [ 2, 3]] [4, 7]]
>>> # only first two rows in left will be joined
>>> r1 = np.array([(1, 2), (1,3), (2,3)], dtype=[('seq', 'i4'), ('score', 'i4')])
>>> r2 = np.array([(1, 5), (4,7)], dtype=[('seq', 'i4'), ('age', 'i4')])
>>> joined = join_by_left('seq', r1, r2)
>>> print(joined)
[(1, 2, 5) (1, 3, 5) (2, 3, --)]
>>> print(joined.dtype)
(numpy.record, [('seq', '<i4'), ('score', '<i4'), ('age', '<i4')])
>>> joined[2][2]
masked
>>> joined.tolist()[2][2] == None
True
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
join关键字 |
required | |
r1 |
数据集1 |
required | |
r2 |
数据集2 |
required |
Returns:
Type | Description |
---|---|
a numpy array |
Source code in omicron/extensions/np.py
def join_by_left(key, r1, r2, mask=True):
"""左连接 `r1`, `r2` by `key`
如果`r1`中存在`r2`中没有的行,则该行对应的`r2`中的那些字段将被mask,或者填充随机数。
same as numpy.lib.recfunctions.join_by(key, r1, r2, jointype='leftouter'), but allows r1 have duplicate keys
[Reference: stackoverflow](https://stackoverflow.com/a/53261882/13395693)
Examples:
>>> # to join the following
>>> # [[ 1, 2],
>>> # [ 1, 3], x [[1, 5],
>>> # [ 2, 3]] [4, 7]]
>>> # only first two rows in left will be joined
>>> r1 = np.array([(1, 2), (1,3), (2,3)], dtype=[('seq', 'i4'), ('score', 'i4')])
>>> r2 = np.array([(1, 5), (4,7)], dtype=[('seq', 'i4'), ('age', 'i4')])
>>> joined = join_by_left('seq', r1, r2)
>>> print(joined)
[(1, 2, 5) (1, 3, 5) (2, 3, --)]
>>> print(joined.dtype)
(numpy.record, [('seq', '<i4'), ('score', '<i4'), ('age', '<i4')])
>>> joined[2][2]
masked
>>> joined.tolist()[2][2] == None
True
Args:
key : join关键字
r1 : 数据集1
r2 : 数据集2
Returns:
a numpy array
"""
# figure out the dtype of the result array
descr1 = r1.dtype.descr
descr2 = [d for d in r2.dtype.descr if d[0] not in r1.dtype.names]
descrm = descr1 + descr2
# figure out the fields we'll need from each array
f1 = [d[0] for d in descr1]
f2 = [d[0] for d in descr2]
# cache the number of columns in f1
ncol1 = len(f1)
# get a dict of the rows of r2 grouped by key
rows2 = {}
for row2 in r2:
rows2.setdefault(row2[key], []).append(row2)
# figure out how many rows will be in the result
nrowm = 0
for k1 in r1[key]:
if k1 in rows2:
nrowm += len(rows2[k1])
else:
nrowm += 1
# allocate the return array
# ret = np.full((nrowm, ), fill, dtype=descrm)
_ret = np.recarray(nrowm, dtype=descrm)
if mask:
ret = np.ma.array(_ret, mask=True)
else:
ret = _ret
# merge the data into the return array
i = 0
for row1 in r1:
if row1[key] in rows2:
for row2 in rows2[row1[key]]:
ret[i] = tuple(row1[f1]) + tuple(row2[f2])
i += 1
else:
for j in range(ncol1):
ret[i][j] = row1[j]
i += 1
return ret
numpy_append_fields(base, names, data, dtypes)
¶
给现有的数组base
增加新的字段
实现了numpy.lib.recfunctions.rec_append_fields
的功能。提供这个功能,是因为rec_append_fields
不能处理data
元素的类型为Object的情况。
新增的数据列将顺序排列在其它列的右边。
Examples:
>>> # 新增单个字段
>>> import numpy
>>> old = np.array([i for i in range(3)], dtype=[('col1', '<f4')])
>>> new_list = [2 * i for i in range(3)]
>>> res = numpy_append_fields(old, 'new_col', new_list, [('new_col', '<f4')])
>>> print(res)
...
[(0., 0.) (1., 2.) (2., 4.)]
>>> # 新增多个字段
>>> data = [res['col1'].tolist(), res['new_col'].tolist()]
>>> print(numpy_append_fields(old, ('col3', 'col4'), data, [('col3', '<f4'), ('col4', '<f4')]))
...
[(0., 0., 0.) (1., 1., 2.) (2., 2., 4.)]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base |
[numpy.array] |
基础数组 |
required |
names |
[type] |
新增字段的名字,可以是字符串(单字段的情况),也可以是字符串列表 |
required |
data |
list |
增加的字段的数据,list类型 |
required |
dtypes |
[type] |
新增字段的dtype |
required |
Source code in omicron/extensions/np.py
def numpy_append_fields(
base: np.ndarray, names: Union[str, List[str]], data: List, dtypes: List
) -> np.ndarray:
"""给现有的数组`base`增加新的字段
实现了`numpy.lib.recfunctions.rec_append_fields`的功能。提供这个功能,是因为`rec_append_fields`不能处理`data`元素的类型为Object的情况。
新增的数据列将顺序排列在其它列的右边。
Example:
>>> # 新增单个字段
>>> import numpy
>>> old = np.array([i for i in range(3)], dtype=[('col1', '<f4')])
>>> new_list = [2 * i for i in range(3)]
>>> res = numpy_append_fields(old, 'new_col', new_list, [('new_col', '<f4')])
>>> print(res)
... # doctest: +NORMALIZE_WHITESPACE
[(0., 0.) (1., 2.) (2., 4.)]
>>> # 新增多个字段
>>> data = [res['col1'].tolist(), res['new_col'].tolist()]
>>> print(numpy_append_fields(old, ('col3', 'col4'), data, [('col3', '<f4'), ('col4', '<f4')]))
... # doctest: +NORMALIZE_WHITESPACE
[(0., 0., 0.) (1., 1., 2.) (2., 2., 4.)]
Args:
base ([numpy.array]): 基础数组
names ([type]): 新增字段的名字,可以是字符串(单字段的情况),也可以是字符串列表
data (list): 增加的字段的数据,list类型
dtypes ([type]): 新增字段的dtype
"""
if isinstance(names, str):
names = [names]
data = [data]
result = np.empty(base.shape, dtype=base.dtype.descr + dtypes)
for col in base.dtype.names:
result[col] = base[col]
for i in range(len(names)):
result[names[i]] = data[i]
return result
remove_nan(ts)
¶
从ts
中去除NaN
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ts |
np.array |
[description] |
required |
Returns:
Type | Description |
---|---|
np.array |
[description] |
Source code in omicron/extensions/np.py
def remove_nan(ts: np.ndarray) -> np.ndarray:
"""从`ts`中去除NaN
Args:
ts (np.array): [description]
Returns:
np.array: [description]
"""
return ts[~np.isnan(ts.astype(float))]
replace_zero(ts, replacement=None)
¶
将ts中的0替换为前值, 处理volume数据时常用用到
如果提供了replacement, 则替换为replacement
Source code in omicron/extensions/np.py
def replace_zero(ts: np.ndarray, replacement=None) -> np.ndarray:
"""将ts中的0替换为前值, 处理volume数据时常用用到
如果提供了replacement, 则替换为replacement
"""
if replacement is not None:
return np.where(ts == 0, replacement, ts)
if np.all(ts == 0):
raise ValueError("all of ts are 0")
if ts[0] == 0:
idx = np.argwhere(ts != 0)[0]
ts[0] = ts[idx]
mask = ts == 0
idx = np.where(~mask, np.arange(mask.size), 0)
np.maximum.accumulate(idx, out=idx)
return ts[idx]
rolling(x, win, func)
¶
对序列x
进行窗口滑动计算。
如果func
要实现的功能是argmax, argmin, max, mean, median, min, rank, std, sum, var等,move_argmax,请使用bottleneck中的move_argmin, move_max, move_mean, move_median, move_min move_rank, move_std, move_sum, move_var。这些函数的性能更好。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
[type] |
[description] |
required |
win |
[type] |
[description] |
required |
func |
[type] |
[description] |
required |
Returns:
Type | Description |
---|---|
[type] |
[description] |
Source code in omicron/extensions/np.py
def rolling(x, win, func):
"""对序列`x`进行窗口滑动计算。
如果`func`要实现的功能是argmax, argmin, max, mean, median, min, rank, std, sum, var等,move_argmax,请使用bottleneck中的move_argmin, move_max, move_mean, move_median, move_min move_rank, move_std, move_sum, move_var。这些函数的性能更好。
Args:
x ([type]): [description]
win ([type]): [description]
func ([type]): [description]
Returns:
[type]: [description]
"""
results = []
for subarray in sliding_window_view(x, window_shape=win):
results.append(func(subarray))
return np.array(results)
shift(arr, start, offset)
¶
在numpy数组arr中,找到start(或者最接近的一个),取offset对应的元素。
要求arr
已排序。offset
为正,表明向后移位;offset
为负,表明向前移位
Examples:
>>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
>>> shift(arr, 20050104, 1)
20050105
>>> shift(arr, 20050105, -1)
20050104
>>> # 起始点已右越界,且向右shift,返回起始点
>>> shift(arr, 20050120, 1)
20050120
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
已排序的数组 |
required | |
start |
numpy可接受的数据类型 |
required | |
offset |
int |
[description] |
required |
Returns:
Type | Description |
---|---|
移位后得到的元素值 |
Source code in omicron/extensions/np.py
def shift(arr, start, offset):
"""在numpy数组arr中,找到start(或者最接近的一个),取offset对应的元素。
要求`arr`已排序。`offset`为正,表明向后移位;`offset`为负,表明向前移位
Examples:
>>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
>>> shift(arr, 20050104, 1)
20050105
>>> shift(arr, 20050105, -1)
20050104
>>> # 起始点已右越界,且向右shift,返回起始点
>>> shift(arr, 20050120, 1)
20050120
Args:
arr : 已排序的数组
start : numpy可接受的数据类型
offset (int): [description]
Returns:
移位后得到的元素值
"""
pos = np.searchsorted(arr, start, side="right")
if pos + offset - 1 >= len(arr):
return start
else:
return arr[pos + offset - 1]
smallest_n_argpos(ts, n)
¶
get smallest n (min->max) elements and return argpos which its value ordered in ascent
Examples:
>>> smallest_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
array([8, 7])
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ts |
np.array |
输入的数组 |
required |
n |
int |
取最小的n个元素 |
required |
Returns:
Type | Description |
---|---|
np.array |
[description] |
Source code in omicron/extensions/np.py
def smallest_n_argpos(ts: np.array, n: int) -> np.array:
"""get smallest n (min->max) elements and return argpos which its value ordered in ascent
Example:
>>> smallest_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
array([8, 7])
Args:
ts (np.array): 输入的数组
n (int): 取最小的n个元素
Returns:
np.array: [description]
"""
return np.argsort(ts)[:n]
to_pydatetime(tm)
¶
将numpy.datetime64对象转换成为python的datetime对象
numpy.ndarray.item()方法可用以将任何numpy对象转换成python对象,推荐在任何适用的地方使用.item()方法,而不是本方法。示例:
1 2 3 4 |
|
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tm |
the input numpy datetime object |
required |
Returns:
Type | Description |
---|---|
datetime.datetime |
python datetime object |
.. deprecated:: 2.0.0 use tm.item()
instead
Source code in omicron/extensions/np.py
@deprecated("2.0.0", details="use `tm.item()` instead")
def to_pydatetime(tm: np.datetime64) -> datetime.datetime:
"""将numpy.datetime64对象转换成为python的datetime对象
numpy.ndarray.item()方法可用以将任何numpy对象转换成python对象,推荐在任何适用的地方使用.item()方法,而不是本方法。示例:
```
arr = np.array(['2022-09-08', '2022-09-09'], dtype='datetime64[s]')
arr.item(0) # output is datetime.datetime(2022, 9, 8, 0, 0)
arr[1].item() # output is datetime.datetime(2022, 9, 9, 0, 0)
```
Args:
tm : the input numpy datetime object
Returns:
python datetime object
"""
unix_epoch = np.datetime64(0, "s")
one_second = np.timedelta64(1, "s")
seconds_since_epoch = (tm - unix_epoch) / one_second
return datetime.datetime.utcfromtimestamp(seconds_since_epoch)
top_n_argpos(ts, n)
¶
get top n (max->min) elements and return argpos which its value ordered in descent
Examples:
>>> top_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
array([3, 4])
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ts |
np.array |
[description] |
required |
n |
int |
[description] |
required |
Returns:
Type | Description |
---|---|
np.array |
[description] |
Source code in omicron/extensions/np.py
def top_n_argpos(ts: np.array, n: int) -> np.array:
"""get top n (max->min) elements and return argpos which its value ordered in descent
Example:
>>> top_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
array([3, 4])
Args:
ts (np.array): [description]
n (int): [description]
Returns:
np.array: [description]
"""
ts_ = np.copy(ts)
ts_[np.isnan(ts_)] = -np.inf
return np.argsort(ts_)[-n:][::-1]
Notify package¶
dingtalk
¶
DingTalkMessage
¶
钉钉的机器人消息推送类,封装了常用的消息类型以及加密算法 需要在配置文件中配置钉钉的机器人的access_token 如果配置了加签,需要在配置文件中配置钉钉的机器人的secret 如果配置了自定义关键词,需要在配置文件中配置钉钉的机器人的keyword,多个关键词用英文逗号分隔 全部的配置文件示例如下, 其中secret和keyword可以不配置, access_token必须配置 notify: dingtalk_access_token: xxxx dingtalk_secret: xxxx
Source code in omicron/notify/dingtalk.py
class DingTalkMessage:
"""
钉钉的机器人消息推送类,封装了常用的消息类型以及加密算法
需要在配置文件中配置钉钉的机器人的access_token
如果配置了加签,需要在配置文件中配置钉钉的机器人的secret
如果配置了自定义关键词,需要在配置文件中配置钉钉的机器人的keyword,多个关键词用英文逗号分隔
全部的配置文件示例如下, 其中secret和keyword可以不配置, access_token必须配置
notify:
dingtalk_access_token: xxxx
dingtalk_secret: xxxx
"""
url = "https://oapi.dingtalk.com/robot/send"
@classmethod
def _get_access_token(cls):
"""获取钉钉机器人的access_token"""
if hasattr(cfg.notify, "dingtalk_access_token"):
return cfg.notify.dingtalk_access_token
else:
logger.error(
"Dingtalk not configured, please add the following items:\n"
"notify:\n"
" dingtalk_access_token: xxxx\n"
" dingtalk_secret: xxxx\n"
)
raise ConfigError("dingtalk_access_token not found")
@classmethod
def _get_secret(cls):
"""获取钉钉机器人的secret"""
if hasattr(cfg.notify, "dingtalk_secret"):
return cfg.notify.dingtalk_secret
else:
return None
@classmethod
def _get_url(cls):
"""获取钉钉机器人的消息推送地址,将签名和时间戳拼接在url后面"""
access_token = cls._get_access_token()
url = f"{cls.url}?access_token={access_token}"
secret = cls._get_secret()
if secret:
timestamp, sign = cls._get_sign(secret)
url = f"{url}×tamp={timestamp}&sign={sign}"
return url
@classmethod
def _get_sign(cls, secret: str):
"""获取签名发送给钉钉机器人"""
timestamp = str(round(time.time() * 1000))
secret_enc = secret.encode("utf-8")
string_to_sign = "{}\n{}".format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode("utf-8")
hmac_code = hmac.new(
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return timestamp, sign
@classmethod
def _send(cls, msg):
"""发送消息到钉钉机器人"""
url = cls._get_url()
response = httpx.post(url, json=msg, timeout=30)
if response.status_code != 200:
logger.error(
f"failed to send message, content: {msg}, response from Dingtalk: {response.content.decode()}"
)
return
rsp = json.loads(response.content)
if rsp.get("errcode") != 0:
logger.error(
f"failed to send message, content: {msg}, response from Dingtalk: {rsp}"
)
return response.content.decode()
@classmethod
async def _send_async(cls, msg):
"""发送消息到钉钉机器人"""
url = cls._get_url()
async with httpx.AsyncClient() as client:
r = await client.post(url, json=msg, timeout=30)
if r.status_code != 200:
logger.error(
f"failed to send message, content: {msg}, response from Dingtalk: {r.content.decode()}"
)
return
rsp = json.loads(r.content)
if rsp.get("errcode") != 0:
logger.error(
f"failed to send message, content: {msg}, response from Dingtalk: {rsp}"
)
return r.content.decode()
@classmethod
@deprecated("2.0.0", details="use function `ding` instead")
def text(cls, content):
msg = {"text": {"content": content}, "msgtype": "text"}
return cls._send(msg)
text(cls, content)
classmethod
¶
.. deprecated:: 2.0.0 use function ding
instead
Source code in omicron/notify/dingtalk.py
@classmethod
@deprecated("2.0.0", details="use function `ding` instead")
def text(cls, content):
msg = {"text": {"content": content}, "msgtype": "text"}
return cls._send(msg)
ding(msg)
¶
发送消息到钉钉机器人
支持发送纯文本消息和markdown格式的文本消息。如果要发送markdown格式的消息,请通过字典传入,必须包含包含"title"和"text"两个字段。更详细信息,请见钉钉开放平台文档
Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg |
Union[str, dict] |
待发送消息。 |
required |
Returns:
Type | Description |
---|---|
Awaitable |
发送消息的后台任务。您可以使用此返回句柄来取消任务。 |
Source code in omicron/notify/dingtalk.py
def ding(msg: Union[str, dict]) -> Awaitable:
"""发送消息到钉钉机器人
支持发送纯文本消息和markdown格式的文本消息。如果要发送markdown格式的消息,请通过字典传入,必须包含包含"title"和"text"两个字段。更详细信息,请见[钉钉开放平台文档](https://open.dingtalk.com/document/orgapp-server/message-type)
???+ Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Args:
msg: 待发送消息。
Returns:
发送消息的后台任务。您可以使用此返回句柄来取消任务。
"""
if isinstance(msg, str):
msg_ = {"text": {"content": msg}, "msgtype": "text"}
elif isinstance(msg, dict):
msg_ = {
"msgtype": "markdown",
"markdown": {"title": msg["title"], "text": msg["text"]},
}
else:
raise TypeError
task = asyncio.create_task(DingTalkMessage._send_async(msg_))
return task
mail
¶
compose(subject, plain_txt=None, html=None, attachment=None)
¶
编写MIME邮件。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subject |
str |
邮件主题 |
required |
plain_txt |
str |
纯文本格式的邮件内容 |
None |
html |
str |
html格式的邮件内容. Defaults to None. |
None |
attachment |
str |
附件文件名 |
None |
Returns:
Type | Description |
---|---|
EmailMessage |
MIME mail |
Source code in omicron/notify/mail.py
def compose(
subject: str, plain_txt: str = None, html: str = None, attachment: str = None
) -> EmailMessage:
"""编写MIME邮件。
Args:
subject (str): 邮件主题
plain_txt (str): 纯文本格式的邮件内容
html (str, optional): html格式的邮件内容. Defaults to None.
attachment (str, optional): 附件文件名
Returns:
MIME mail
"""
msg = EmailMessage()
msg["Subject"] = subject
if html:
msg.preamble = plain_txt or ""
msg.set_content(html, subtype="html")
else:
assert plain_txt, "Either plain_txt or html is required."
msg.set_content(plain_txt)
if attachment:
ctype, encoding = mimetypes.guess_type(attachment)
if ctype is None or encoding is not None:
ctype = "application/octet-stream"
maintype, subtype = ctype.split("/", 1)
with open(attachment, "rb") as f:
msg.add_attachment(
f.read(), maintype=maintype, subtype=subtype, filename=attachment
)
return msg
mail_notify(subject=None, body=None, msg=None, html=False, receivers=None)
¶
发送邮件通知。
发送者、接收者及邮件服务器等配置请通过cfg4py配置:
1 2 3 4 5 |
|
MAIL_PASSWORD
来配置。
subject/body与msg必须提供其一。
Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg |
EmailMessage |
[description]. Defaults to None. |
None |
subject |
str |
[description]. Defaults to None. |
None |
body |
str |
[description]. Defaults to None. |
None |
html |
bool |
body是否按html格式处理? Defaults to False. |
False |
receivers |
List[str], Optional |
接收者信息。如果不提供,将使用预先配置的接收者信息。 |
None |
Returns:
Type | Description |
---|---|
Awaitable |
发送消息的后台任务。您可以使用此返回句柄来取消任务。 |
Source code in omicron/notify/mail.py
def mail_notify(
subject: str = None,
body: str = None,
msg: EmailMessage = None,
html=False,
receivers=None,
) -> Awaitable:
"""发送邮件通知。
发送者、接收者及邮件服务器等配置请通过cfg4py配置:
```
notify:
mail_from: aaron_yang@jieyu.ai
mail_to:
- code@jieyu.ai
mail_server: smtp.ym.163.com
```
验证密码请通过环境变量`MAIL_PASSWORD`来配置。
subject/body与msg必须提供其一。
???+ Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Args:
msg (EmailMessage, optional): [description]. Defaults to None.
subject (str, optional): [description]. Defaults to None.
body (str, optional): [description]. Defaults to None.
html (bool, optional): body是否按html格式处理? Defaults to False.
receivers (List[str], Optional): 接收者信息。如果不提供,将使用预先配置的接收者信息。
Returns:
发送消息的后台任务。您可以使用此返回句柄来取消任务。
"""
if all([msg is not None, subject or body]):
raise TypeError("msg参数与subject/body只能提供其中之一")
elif all([msg is None, subject is None, body is None]):
raise TypeError("必须提供msg参数或者subjecdt/body参数")
if msg is None:
if html:
msg = compose(subject, html=body)
else:
msg = compose(subject, plain_txt=body)
cfg = cfg4py.get_instance()
if not receivers:
receivers = cfg.notify.mail_to
password = os.environ.get("MAIL_PASSWORD")
return send_mail(
cfg.notify.mail_from, receivers, password, msg, host=cfg.notify.mail_server
)
send_mail(sender, receivers, password, msg=None, host=None, port=25, cc=None, bcc=None, subject=None, body=None, username=None)
¶
发送邮件通知。
如果只发送简单的文本邮件,请使用 send_mail(sender, receivers, subject=subject, plain=plain)。如果要发送较复杂的带html和附件的邮件,请先调用compose()生成一个EmailMessage,然后再调用send_mail(sender, receivers, msg)来发送邮件。
Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sender |
str |
[description] |
required |
receivers |
List[str] |
[description] |
required |
msg |
EmailMessage |
[description]. Defaults to None. |
None |
host |
str |
[description]. Defaults to None. |
None |
port |
int |
[description]. Defaults to 25. |
25 |
cc |
List[str] |
[description]. Defaults to None. |
None |
bcc |
List[str] |
[description]. Defaults to None. |
None |
subject |
str |
[description]. Defaults to None. |
None |
plain |
str |
[description]. Defaults to None. |
required |
username |
str |
the username used to logon to mail server. if not provided, then |
None |
Returns:
Type | Description |
---|---|
Awaitable |
发送消息的后台任务。您可以使用此返回句柄来取消任务。 |
Source code in omicron/notify/mail.py
@retry(aiosmtplib.errors.SMTPConnectError, tries=3, backoff=2, delay=30, logger=logger)
def send_mail(
sender: str,
receivers: List[str],
password: str,
msg: EmailMessage = None,
host: str = None,
port: int = 25,
cc: List[str] = None,
bcc: List[str] = None,
subject: str = None,
body: str = None,
username: str = None,
) -> Awaitable:
"""发送邮件通知。
如果只发送简单的文本邮件,请使用 send_mail(sender, receivers, subject=subject, plain=plain)。如果要发送较复杂的带html和附件的邮件,请先调用compose()生成一个EmailMessage,然后再调用send_mail(sender, receivers, msg)来发送邮件。
???+ Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Args:
sender (str): [description]
receivers (List[str]): [description]
msg (EmailMessage, optional): [description]. Defaults to None.
host (str, optional): [description]. Defaults to None.
port (int, optional): [description]. Defaults to 25.
cc (List[str], optional): [description]. Defaults to None.
bcc (List[str], optional): [description]. Defaults to None.
subject (str, optional): [description]. Defaults to None.
plain (str, optional): [description]. Defaults to None.
username (str, optional): the username used to logon to mail server. if not provided, then `sender` is used.
Returns:
发送消息的后台任务。您可以使用此返回句柄来取消任务。
"""
if all([msg is not None, subject is not None or body is not None]):
raise TypeError("msg参数与subject/body只能提供其中之一")
elif all([msg is None, subject is None, body is None]):
raise TypeError("必须提供msg参数或者subjecdt/body参数")
msg = msg or EmailMessage()
if isinstance(receivers, str):
receivers = [receivers]
msg["From"] = sender
msg["To"] = ", ".join(receivers)
if subject:
msg["subject"] = subject
if body:
msg.set_content(body)
if cc:
msg["Cc"] = ", ".join(cc)
if bcc:
msg["Bcc"] = ", ".join(bcc)
username = username or sender
if host is None:
host = sender.split("@")[-1]
task = asyncio.create_task(
aiosmtplib.send(
msg, hostname=host, port=port, username=sender, password=password
)
)
return task