Serialize
Serializer and Deserializer¶
DataFrameDeserializer¶
Source code in omicron/dal/influx/serialize.py
class DataframeDeserializer(Serializer):
def __init__(
self,
sort_values: Union[str, List[str]] = None,
encoding: str = "utf-8",
names: List[str] = None,
usecols: Union[List[int], List[str]] = None,
dtype: dict = None,
time_col: Union[int, str] = None,
sep: str = ",",
header: Union[int, List[int], str] = "infer",
engine: str = None,
infer_datetime_format=True,
lineterminator: str = None,
converters: dict = None,
skipfooter=0,
index_col: Union[int, str, List[int], List[str], bool] = None,
skiprows: Union[int, List[int], Callable] = None,
**kwargs,
):
"""constructor a deserializer which convert a csv-like bytes array to pandas.DataFrame
the args are the same as pandas.read_csv. for details, please refer to the official doc: [pandas.read_csv](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)
for performance consideration, please specify the following args:
- engine = 'c' or 'pyarrow' when possible. Be noticed that 'pyarrow' is the fastest (multi-threaded supported) but may be error-prone. Only use it when you have thoroughly tested.
- specify dtype when possible
use `usecols` to specify the columns to read, and `names` to specify the column names (i.e., rename the columns), otherwise, the column names will be inferred from the first line.
when `names` is specified, it has to be as same length as actual columns of the data. If this causes column renaming, then you should always use column name specified in `names` to access the data (instead of which in `usecols`).
Examples:
>>> data = ",result,table,_time,code,name\\r\\n,_result,0,2019-01-01T09:31:00Z,000002.XSHE,国联证券"
>>> des = DataframeDeserializer(names=["_", "result", "table", "frame", "code", "name"], usecols=["frame", "code", "name"])
>>> des(data)
frame code name
0 2019-01-01T09:31:00Z 000002.XSHE 国联证券
Args:
sort_values: sort the dataframe by the specified columns
encoding: if the data is bytes, then encoding is required, due to pandas.read_csv only handle string array
sep: the separator/delimiter of each fields
header: the row number of the header, default is 'infer'
names: the column names of the dataframe
index_col: the column number or name of the index column
usecols: the column name of the columns to use
dtype: the dtype of the columns
engine: the engine of the csv file, default is None
converters: specify converter for columns.
skiprows: the row number to skip
skipfooter: the row number to skip at the end of the file
time_col: the columns to parse as dates
infer_datetime_format: whether to infer the datetime format
lineterminator: the line terminator of the csv file, only valid when engine is 'c'
kwargs: other arguments
"""
self.sort_values = sort_values
self.encoding = encoding
self.sep = sep
self.header = header
self.names = names
self.index_col = index_col
self.usecols = usecols
self.dtype = dtype
self.engine = engine
self.converters = converters or {}
self.skiprows = skiprows
self.skipfooter = skipfooter
self.infer_datetime_format = infer_datetime_format
self.lineterminator = lineterminator
self.kwargs = kwargs
if names is not None:
self.header = 0
if time_col is not None:
self.converters[time_col] = lambda x: ciso8601.parse_datetime_as_naive(x)
def __call__(self, data: Union[str, bytes]) -> pd.DataFrame:
if isinstance(data, str):
# treat data as string
stream = io.StringIO(data)
else:
stream = io.StringIO(data.decode(self.encoding))
df = pd.read_csv(
stream,
sep=self.sep,
header=self.header,
names=self.names,
index_col=self.index_col,
usecols=self.usecols,
dtype=self.dtype,
engine=self.engine,
converters=self.converters,
skiprows=self.skiprows,
skipfooter=self.skipfooter,
infer_datetime_format=self.infer_datetime_format,
lineterminator=self.lineterminator,
**self.kwargs,
)
if self.usecols:
df = df[list(self.usecols)]
if self.sort_values is not None:
return df.sort_values(self.sort_values)
else:
return df
__init__(self, sort_values=None, encoding='utf-8', names=None, usecols=None, dtype=None, time_col=None, sep=',', header='infer', engine=None, infer_datetime_format=True, lineterminator=None, converters=None, skipfooter=0, index_col=None, skiprows=None, **kwargs)
special
¶
constructor a deserializer which convert a csv-like bytes array to pandas.DataFrame
the args are the same as pandas.read_csv. for details, please refer to the official doc: pandas.read_csv
for performance consideration, please specify the following args: - engine = 'c' or 'pyarrow' when possible. Be noticed that 'pyarrow' is the fastest (multi-threaded supported) but may be error-prone. Only use it when you have thoroughly tested.
1 |
|
use usecols
to specify the columns to read, and names
to specify the column names (i.e., rename the columns), otherwise, the column names will be inferred from the first line.
when names
is specified, it has to be as same length as actual columns of the data. If this causes column renaming, then you should always use column name specified in names
to access the data (instead of which in usecols
).
Examples:
>>> data = ",result,table,_time,code,name\r\n,_result,0,2019-01-01T09:31:00Z,000002.XSHE,国联证券"
>>> des = DataframeDeserializer(names=["_", "result", "table", "frame", "code", "name"], usecols=["frame", "code", "name"])
>>> des(data)
frame code name
0 2019-01-01T09:31:00Z 000002.XSHE 国联证券
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sort_values |
Union[str, List[str]] |
sort the dataframe by the specified columns |
None |
encoding |
str |
if the data is bytes, then encoding is required, due to pandas.read_csv only handle string array |
'utf-8' |
sep |
str |
the separator/delimiter of each fields |
',' |
header |
Union[int, List[int], str] |
the row number of the header, default is 'infer' |
'infer' |
names |
List[str] |
the column names of the dataframe |
None |
index_col |
Union[int, str, List[int], List[str], bool] |
the column number or name of the index column |
None |
usecols |
Union[List[int], List[str]] |
the column name of the columns to use |
None |
dtype |
dict |
the dtype of the columns |
None |
engine |
str |
the engine of the csv file, default is None |
None |
converters |
dict |
specify converter for columns. |
None |
skiprows |
Union[int, List[int], Callable] |
the row number to skip |
None |
skipfooter |
the row number to skip at the end of the file |
0 |
|
time_col |
Union[int, str] |
the columns to parse as dates |
None |
infer_datetime_format |
whether to infer the datetime format |
True |
|
lineterminator |
str |
the line terminator of the csv file, only valid when engine is 'c' |
None |
kwargs |
other arguments |
{} |
Source code in omicron/dal/influx/serialize.py
def __init__(
self,
sort_values: Union[str, List[str]] = None,
encoding: str = "utf-8",
names: List[str] = None,
usecols: Union[List[int], List[str]] = None,
dtype: dict = None,
time_col: Union[int, str] = None,
sep: str = ",",
header: Union[int, List[int], str] = "infer",
engine: str = None,
infer_datetime_format=True,
lineterminator: str = None,
converters: dict = None,
skipfooter=0,
index_col: Union[int, str, List[int], List[str], bool] = None,
skiprows: Union[int, List[int], Callable] = None,
**kwargs,
):
"""constructor a deserializer which convert a csv-like bytes array to pandas.DataFrame
the args are the same as pandas.read_csv. for details, please refer to the official doc: [pandas.read_csv](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)
for performance consideration, please specify the following args:
- engine = 'c' or 'pyarrow' when possible. Be noticed that 'pyarrow' is the fastest (multi-threaded supported) but may be error-prone. Only use it when you have thoroughly tested.
- specify dtype when possible
use `usecols` to specify the columns to read, and `names` to specify the column names (i.e., rename the columns), otherwise, the column names will be inferred from the first line.
when `names` is specified, it has to be as same length as actual columns of the data. If this causes column renaming, then you should always use column name specified in `names` to access the data (instead of which in `usecols`).
Examples:
>>> data = ",result,table,_time,code,name\\r\\n,_result,0,2019-01-01T09:31:00Z,000002.XSHE,国联证券"
>>> des = DataframeDeserializer(names=["_", "result", "table", "frame", "code", "name"], usecols=["frame", "code", "name"])
>>> des(data)
frame code name
0 2019-01-01T09:31:00Z 000002.XSHE 国联证券
Args:
sort_values: sort the dataframe by the specified columns
encoding: if the data is bytes, then encoding is required, due to pandas.read_csv only handle string array
sep: the separator/delimiter of each fields
header: the row number of the header, default is 'infer'
names: the column names of the dataframe
index_col: the column number or name of the index column
usecols: the column name of the columns to use
dtype: the dtype of the columns
engine: the engine of the csv file, default is None
converters: specify converter for columns.
skiprows: the row number to skip
skipfooter: the row number to skip at the end of the file
time_col: the columns to parse as dates
infer_datetime_format: whether to infer the datetime format
lineterminator: the line terminator of the csv file, only valid when engine is 'c'
kwargs: other arguments
"""
self.sort_values = sort_values
self.encoding = encoding
self.sep = sep
self.header = header
self.names = names
self.index_col = index_col
self.usecols = usecols
self.dtype = dtype
self.engine = engine
self.converters = converters or {}
self.skiprows = skiprows
self.skipfooter = skipfooter
self.infer_datetime_format = infer_datetime_format
self.lineterminator = lineterminator
self.kwargs = kwargs
if names is not None:
self.header = 0
if time_col is not None:
self.converters[time_col] = lambda x: ciso8601.parse_datetime_as_naive(x)
NumpyDeserializer¶
Source code in omicron/dal/influx/serialize.py
class NumpyDeserializer(Serializer):
def __init__(
self,
dtype: List[tuple] = "float",
sort_values: Union[str, List[str]] = None,
use_cols: Union[List[str], List[int]] = None,
parse_date: Union[int, str] = "_time",
sep: str = ",",
encoding: str = "utf-8",
skip_rows: Union[int, List[int]] = 1,
header_line: int = 1,
comments: str = "#",
converters: Mapping[int, Callable] = None,
):
"""construct a deserializer, which will convert a csv like multiline string/bytes array to a numpy array
the data to be deserialized will be first split into array of fields, then use use_cols to select which fields to use, and re-order them by the order of use_cols. After that, the fields will be converted to numpy array and converted into dtype.
by default dtype is float, which means the data will be converted to float. If you need to convert to a numpy structured array, then you can specify the dtype as a list of tuples, e.g.
```
dtype = [('col_1', 'datetime64[s]'), ('col_2', '<U12'), ('col_3', '<U4')]
```
by default, the deserializer will try to convert every line from the very first line, if the very first lines contains comments and headers, these lines should be skipped by deserializer, you should set skip_rows to number of lines to skip.
for more information, please refer to [numpy.loadtxt](https://numpy.org/doc/stable/reference/generated/numpy.loadtxt.html)
Args:
dtype: dtype of the output numpy array.
sort_values: sort the output numpy array by the specified columns. If it's a string, then it's the name of the column, if it's a list of strings, then it's the names of the columns.
use_cols: use only the specified columns. If it's a list of strings, then it's the names of the columns (presented in raw data header line), if it's a list of integers, then it's the column index.
parse_date: by default we'll convert "_time" column into python datetime.datetime. Set it to None to turn off the conversion. ciso8601 is default parser. If you need to parse date but just don't like ciso8601, then you can turn off default parser (by set parse_date to None), and specify your own parser in converters.
sep: separator of each field
encoding: if the input is bytes, then encoding is used to decode the bytes to string.
skip_rows: required by np.loadtxt, skip the first n lines
header_line: which line contains header, started from 1. If you specify use_cols by list of string, then header line must be specified.
comments: required by np.loadtxt, skip the lines starting with this string
converters: required by np.loadtxt, a dict of column name to converter function.
"""
self.dtype = dtype
self.use_cols = use_cols
self.sep = sep
self.encoding = encoding
self.skip_rows = skip_rows
self.comments = comments
self.converters = converters or {}
self.sort_values = sort_values
self.parse_date = parse_date
self.header_line = header_line
if header_line is None:
assert parse_date is None or isinstance(
parse_date, int
), "parse_date must be an integer if data contains no header"
assert use_cols is None or isinstance(
use_cols[0], int
), "use_cols must be a list of integers if data contains no header"
if len(self.converters) > 1:
assert all(
[isinstance(x, int) for x in self.converters.keys()]
), "converters must be a dict of column index to converter function, if there's no header"
self._parsed_headers = None
def _parse_header_once(self, stream):
"""parse header and convert use_cols, if columns is specified in string. And if parse_date is required, add it into converters
Args:
stream : [description]
Raises:
SerializationError: [description]
"""
if self.header_line is None or self._parsed_headers is not None:
return
try:
line = stream.readlines(self.header_line)[-1]
cols = line.strip().split(self.sep)
self._parsed_headers = cols
use_cols = self.use_cols
if use_cols is not None and isinstance(use_cols[0], str):
self.use_cols = [cols.index(col) for col in self.use_cols]
# convert keys of converters to int
converters = {cols.index(k): v for k, v in self.converters.items()}
self.converters = converters
if isinstance(self.parse_date, str):
parse_date = cols.index(self.parse_date)
if parse_date in self.converters.keys():
logger.debug(
"specify duplicated converter in both parse_date and converters for col %s, use converters.",
self.parse_date,
)
else: # 增加parse_date到converters
self.converters[
parse_date
] = lambda x: ciso8601.parse_datetime_as_naive(x)
stream.seek(0)
except (IndexError, ValueError):
if line.strip() == "":
content = "".join(stream.readlines()).strip()
if len(content) > 0:
raise SerializationError(
f"specified heder line {self.header_line} is empty"
)
else:
raise EmptyResult()
else:
raise SerializationError(f"bad header[{self.header_line}]: {line}")
def __call__(self, data: bytes) -> np.ndarray:
if self.encoding and isinstance(data, bytes):
stream = io.StringIO(data.decode(self.encoding))
else:
stream = io.StringIO(data)
try:
self._parse_header_once(stream)
except EmptyResult:
return np.empty((0,), dtype=self.dtype)
arr = np.loadtxt(
stream.readlines(),
delimiter=self.sep,
skiprows=self.skip_rows,
dtype=self.dtype,
usecols=self.use_cols,
converters=self.converters,
encoding=self.encoding,
)
# 如果返回仅一条记录,有时会出现 shape == ()
if arr.shape == tuple():
arr = arr.reshape((-1,))
if self.sort_values is not None and arr.size > 1:
return np.sort(arr, order=self.sort_values)
else:
return arr
__init__(self, dtype='float', sort_values=None, use_cols=None, parse_date='_time', sep=',', encoding='utf-8', skip_rows=1, header_line=1, comments='#', converters=None)
special
¶
construct a deserializer, which will convert a csv like multiline string/bytes array to a numpy array
the data to be deserialized will be first split into array of fields, then use use_cols to select which fields to use, and re-order them by the order of use_cols. After that, the fields will be converted to numpy array and converted into dtype.
by default dtype is float, which means the data will be converted to float. If you need to convert to a numpy structured array, then you can specify the dtype as a list of tuples, e.g.
1 |
|
by default, the deserializer will try to convert every line from the very first line, if the very first lines contains comments and headers, these lines should be skipped by deserializer, you should set skip_rows to number of lines to skip.
for more information, please refer to numpy.loadtxt
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dtype |
List[tuple] |
dtype of the output numpy array. |
'float' |
sort_values |
Union[str, List[str]] |
sort the output numpy array by the specified columns. If it's a string, then it's the name of the column, if it's a list of strings, then it's the names of the columns. |
None |
use_cols |
Union[List[str], List[int]] |
use only the specified columns. If it's a list of strings, then it's the names of the columns (presented in raw data header line), if it's a list of integers, then it's the column index. |
None |
parse_date |
Union[int, str] |
by default we'll convert "_time" column into python datetime.datetime. Set it to None to turn off the conversion. ciso8601 is default parser. If you need to parse date but just don't like ciso8601, then you can turn off default parser (by set parse_date to None), and specify your own parser in converters. |
'_time' |
sep |
str |
separator of each field |
',' |
encoding |
str |
if the input is bytes, then encoding is used to decode the bytes to string. |
'utf-8' |
skip_rows |
Union[int, List[int]] |
required by np.loadtxt, skip the first n lines |
1 |
header_line |
int |
which line contains header, started from 1. If you specify use_cols by list of string, then header line must be specified. |
1 |
comments |
str |
required by np.loadtxt, skip the lines starting with this string |
'#' |
converters |
Mapping[int, Callable] |
required by np.loadtxt, a dict of column name to converter function. |
None |
Source code in omicron/dal/influx/serialize.py
def __init__(
self,
dtype: List[tuple] = "float",
sort_values: Union[str, List[str]] = None,
use_cols: Union[List[str], List[int]] = None,
parse_date: Union[int, str] = "_time",
sep: str = ",",
encoding: str = "utf-8",
skip_rows: Union[int, List[int]] = 1,
header_line: int = 1,
comments: str = "#",
converters: Mapping[int, Callable] = None,
):
"""construct a deserializer, which will convert a csv like multiline string/bytes array to a numpy array
the data to be deserialized will be first split into array of fields, then use use_cols to select which fields to use, and re-order them by the order of use_cols. After that, the fields will be converted to numpy array and converted into dtype.
by default dtype is float, which means the data will be converted to float. If you need to convert to a numpy structured array, then you can specify the dtype as a list of tuples, e.g.
```
dtype = [('col_1', 'datetime64[s]'), ('col_2', '<U12'), ('col_3', '<U4')]
```
by default, the deserializer will try to convert every line from the very first line, if the very first lines contains comments and headers, these lines should be skipped by deserializer, you should set skip_rows to number of lines to skip.
for more information, please refer to [numpy.loadtxt](https://numpy.org/doc/stable/reference/generated/numpy.loadtxt.html)
Args:
dtype: dtype of the output numpy array.
sort_values: sort the output numpy array by the specified columns. If it's a string, then it's the name of the column, if it's a list of strings, then it's the names of the columns.
use_cols: use only the specified columns. If it's a list of strings, then it's the names of the columns (presented in raw data header line), if it's a list of integers, then it's the column index.
parse_date: by default we'll convert "_time" column into python datetime.datetime. Set it to None to turn off the conversion. ciso8601 is default parser. If you need to parse date but just don't like ciso8601, then you can turn off default parser (by set parse_date to None), and specify your own parser in converters.
sep: separator of each field
encoding: if the input is bytes, then encoding is used to decode the bytes to string.
skip_rows: required by np.loadtxt, skip the first n lines
header_line: which line contains header, started from 1. If you specify use_cols by list of string, then header line must be specified.
comments: required by np.loadtxt, skip the lines starting with this string
converters: required by np.loadtxt, a dict of column name to converter function.
"""
self.dtype = dtype
self.use_cols = use_cols
self.sep = sep
self.encoding = encoding
self.skip_rows = skip_rows
self.comments = comments
self.converters = converters or {}
self.sort_values = sort_values
self.parse_date = parse_date
self.header_line = header_line
if header_line is None:
assert parse_date is None or isinstance(
parse_date, int
), "parse_date must be an integer if data contains no header"
assert use_cols is None or isinstance(
use_cols[0], int
), "use_cols must be a list of integers if data contains no header"
if len(self.converters) > 1:
assert all(
[isinstance(x, int) for x in self.converters.keys()]
), "converters must be a dict of column index to converter function, if there's no header"
self._parsed_headers = None