Skip to content

Serialize

Serializer and Deserializer

DataFrameDeserializer

Source code in omicron/dal/influx/serialize.py
class DataframeDeserializer(Serializer):
    def __init__(
        self,
        sort_values: Union[str, List[str]] = None,
        encoding: str = "utf-8",
        names: List[str] = None,
        usecols: Union[List[int], List[str]] = None,
        dtype: dict = None,
        time_col: Union[int, str] = None,
        sep: str = ",",
        header: Union[int, List[int], str] = "infer",
        engine: str = None,
        infer_datetime_format=True,
        lineterminator: str = None,
        converters: dict = None,
        skipfooter=0,
        index_col: Union[int, str, List[int], List[str], bool] = None,
        skiprows: Union[int, List[int], Callable] = None,
        **kwargs,
    ):
        """constructor a deserializer which convert a csv-like bytes array to pandas.DataFrame

        the args are the same as pandas.read_csv. for details, please refer to the official doc: [pandas.read_csv](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)

        for performance consideration, please specify the following args:
            - engine = 'c' or 'pyarrow' when possible. Be noticed that 'pyarrow' is the fastest (multi-threaded supported) but may be error-prone. Only use it when you have thoroughly tested.

            - specify dtype when possible

        use `usecols` to specify the columns to read, and `names` to specify the column names (i.e., rename the columns), otherwise, the column names will be inferred from the first line.

        when `names` is specified, it has to be as same length as actual columns of the data. If this causes column renaming, then you should always use column name specified in `names` to access the data (instead of which in `usecols`).

        Examples:
            >>> data = ",result,table,_time,code,name\\r\\n,_result,0,2019-01-01T09:31:00Z,000002.XSHE,国联证券"
            >>> des = DataframeDeserializer(names=["_", "result", "table", "frame", "code", "name"], usecols=["frame", "code", "name"])
            >>> des(data)
                              frame         code  name
            0  2019-01-01T09:31:00Z  000002.XSHE  国联证券

        Args:
            sort_values: sort the dataframe by the specified columns
            encoding: if the data is bytes, then encoding is required, due to pandas.read_csv only handle string array
            sep: the separator/delimiter of each fields
            header: the row number of the header, default is 'infer'
            names: the column names of the dataframe
            index_col: the column number or name of the index column
            usecols: the column name of the columns to use
            dtype: the dtype of the columns
            engine: the engine of the csv file, default is None
            converters: specify converter for columns.
            skiprows: the row number to skip
            skipfooter: the row number to skip at the end of the file
            time_col: the columns to parse as dates
            infer_datetime_format: whether to infer the datetime format
            lineterminator: the line terminator of the csv file, only valid when engine is 'c'
            kwargs: other arguments
        """
        self.sort_values = sort_values
        self.encoding = encoding
        self.sep = sep
        self.header = header
        self.names = names
        self.index_col = index_col
        self.usecols = usecols
        self.dtype = dtype
        self.engine = engine
        self.converters = converters or {}
        self.skiprows = skiprows
        self.skipfooter = skipfooter
        self.infer_datetime_format = infer_datetime_format

        self.lineterminator = lineterminator
        self.kwargs = kwargs

        if names is not None:
            self.header = 0

        if time_col is not None:
            self.converters[time_col] = lambda x: ciso8601.parse_datetime_as_naive(x)

    def __call__(self, data: Union[str, bytes]) -> pd.DataFrame:
        if isinstance(data, str):
            # treat data as string
            stream = io.StringIO(data)
        else:
            stream = io.StringIO(data.decode(self.encoding))

        df = pd.read_csv(
            stream,
            sep=self.sep,
            header=self.header,
            names=self.names,
            index_col=self.index_col,
            usecols=self.usecols,
            dtype=self.dtype,
            engine=self.engine,
            converters=self.converters,
            skiprows=self.skiprows,
            skipfooter=self.skipfooter,
            infer_datetime_format=self.infer_datetime_format,
            lineterminator=self.lineterminator,
            **self.kwargs,
        )

        if self.usecols:
            df = df[list(self.usecols)]
        if self.sort_values is not None:
            return df.sort_values(self.sort_values)
        else:
            return df

__init__(self, sort_values=None, encoding='utf-8', names=None, usecols=None, dtype=None, time_col=None, sep=',', header='infer', engine=None, infer_datetime_format=True, lineterminator=None, converters=None, skipfooter=0, index_col=None, skiprows=None, **kwargs) special

constructor a deserializer which convert a csv-like bytes array to pandas.DataFrame

the args are the same as pandas.read_csv. for details, please refer to the official doc: pandas.read_csv

for performance consideration, please specify the following args: - engine = 'c' or 'pyarrow' when possible. Be noticed that 'pyarrow' is the fastest (multi-threaded supported) but may be error-prone. Only use it when you have thoroughly tested.

1
- specify dtype when possible

use usecols to specify the columns to read, and names to specify the column names (i.e., rename the columns), otherwise, the column names will be inferred from the first line.

when names is specified, it has to be as same length as actual columns of the data. If this causes column renaming, then you should always use column name specified in names to access the data (instead of which in usecols).

Examples:

>>> data = ",result,table,_time,code,name\r\n,_result,0,2019-01-01T09:31:00Z,000002.XSHE,国联证券"
>>> des = DataframeDeserializer(names=["_", "result", "table", "frame", "code", "name"], usecols=["frame", "code", "name"])
>>> des(data)
                  frame         code  name
0  2019-01-01T09:31:00Z  000002.XSHE  国联证券

Parameters:

Name Type Description Default
sort_values Union[str, List[str]]

sort the dataframe by the specified columns

None
encoding str

if the data is bytes, then encoding is required, due to pandas.read_csv only handle string array

'utf-8'
sep str

the separator/delimiter of each fields

','
header Union[int, List[int], str]

the row number of the header, default is 'infer'

'infer'
names List[str]

the column names of the dataframe

None
index_col Union[int, str, List[int], List[str], bool]

the column number or name of the index column

None
usecols Union[List[int], List[str]]

the column name of the columns to use

None
dtype dict

the dtype of the columns

None
engine str

the engine of the csv file, default is None

None
converters dict

specify converter for columns.

None
skiprows Union[int, List[int], Callable]

the row number to skip

None
skipfooter

the row number to skip at the end of the file

0
time_col Union[int, str]

the columns to parse as dates

None
infer_datetime_format

whether to infer the datetime format

True
lineterminator str

the line terminator of the csv file, only valid when engine is 'c'

None
kwargs

other arguments

{}
Source code in omicron/dal/influx/serialize.py
def __init__(
    self,
    sort_values: Union[str, List[str]] = None,
    encoding: str = "utf-8",
    names: List[str] = None,
    usecols: Union[List[int], List[str]] = None,
    dtype: dict = None,
    time_col: Union[int, str] = None,
    sep: str = ",",
    header: Union[int, List[int], str] = "infer",
    engine: str = None,
    infer_datetime_format=True,
    lineterminator: str = None,
    converters: dict = None,
    skipfooter=0,
    index_col: Union[int, str, List[int], List[str], bool] = None,
    skiprows: Union[int, List[int], Callable] = None,
    **kwargs,
):
    """constructor a deserializer which convert a csv-like bytes array to pandas.DataFrame

    the args are the same as pandas.read_csv. for details, please refer to the official doc: [pandas.read_csv](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)

    for performance consideration, please specify the following args:
        - engine = 'c' or 'pyarrow' when possible. Be noticed that 'pyarrow' is the fastest (multi-threaded supported) but may be error-prone. Only use it when you have thoroughly tested.

        - specify dtype when possible

    use `usecols` to specify the columns to read, and `names` to specify the column names (i.e., rename the columns), otherwise, the column names will be inferred from the first line.

    when `names` is specified, it has to be as same length as actual columns of the data. If this causes column renaming, then you should always use column name specified in `names` to access the data (instead of which in `usecols`).

    Examples:
        >>> data = ",result,table,_time,code,name\\r\\n,_result,0,2019-01-01T09:31:00Z,000002.XSHE,国联证券"
        >>> des = DataframeDeserializer(names=["_", "result", "table", "frame", "code", "name"], usecols=["frame", "code", "name"])
        >>> des(data)
                          frame         code  name
        0  2019-01-01T09:31:00Z  000002.XSHE  国联证券

    Args:
        sort_values: sort the dataframe by the specified columns
        encoding: if the data is bytes, then encoding is required, due to pandas.read_csv only handle string array
        sep: the separator/delimiter of each fields
        header: the row number of the header, default is 'infer'
        names: the column names of the dataframe
        index_col: the column number or name of the index column
        usecols: the column name of the columns to use
        dtype: the dtype of the columns
        engine: the engine of the csv file, default is None
        converters: specify converter for columns.
        skiprows: the row number to skip
        skipfooter: the row number to skip at the end of the file
        time_col: the columns to parse as dates
        infer_datetime_format: whether to infer the datetime format
        lineterminator: the line terminator of the csv file, only valid when engine is 'c'
        kwargs: other arguments
    """
    self.sort_values = sort_values
    self.encoding = encoding
    self.sep = sep
    self.header = header
    self.names = names
    self.index_col = index_col
    self.usecols = usecols
    self.dtype = dtype
    self.engine = engine
    self.converters = converters or {}
    self.skiprows = skiprows
    self.skipfooter = skipfooter
    self.infer_datetime_format = infer_datetime_format

    self.lineterminator = lineterminator
    self.kwargs = kwargs

    if names is not None:
        self.header = 0

    if time_col is not None:
        self.converters[time_col] = lambda x: ciso8601.parse_datetime_as_naive(x)

NumpyDeserializer

Source code in omicron/dal/influx/serialize.py
class NumpyDeserializer(Serializer):
    def __init__(
        self,
        dtype: List[tuple] = "float",
        sort_values: Union[str, List[str]] = None,
        use_cols: Union[List[str], List[int]] = None,
        parse_date: Union[int, str] = "_time",
        sep: str = ",",
        encoding: str = "utf-8",
        skip_rows: Union[int, List[int]] = 1,
        header_line: int = 1,
        comments: str = "#",
        converters: Mapping[int, Callable] = None,
    ):
        """construct a deserializer, which will convert a csv like multiline string/bytes array to a numpy array

        the data to be deserialized will be first split into array of fields, then use use_cols to select which fields to use, and re-order them by the order of use_cols. After that, the fields will be converted to numpy array and converted into dtype.

        by default dtype is float, which means the data will be converted to float. If you need to convert to a numpy structured array, then you can specify the dtype as a list of tuples, e.g.

        ```
        dtype = [('col_1', 'datetime64[s]'), ('col_2', '<U12'), ('col_3', '<U4')]

        ```

        by default, the deserializer will try to convert every line from the very first line, if the very first lines contains comments and headers, these lines should be skipped by deserializer, you should set skip_rows to number of lines to skip.

        for more information, please refer to [numpy.loadtxt](https://numpy.org/doc/stable/reference/generated/numpy.loadtxt.html)

        Args:
            dtype: dtype of the output numpy array.
            sort_values: sort the output numpy array by the specified columns. If it's a string, then it's the name of the column, if it's a list of strings, then it's the names of the columns.
            use_cols: use only the specified columns. If it's a list of strings, then it's the names of the columns (presented in raw data header line), if it's a list of integers, then it's the column index.
            parse_date: by default we'll convert "_time" column into python datetime.datetime. Set it to None to turn off the conversion. ciso8601 is default parser. If you need to parse date but just don't like ciso8601, then you can turn off default parser (by set parse_date to None), and specify your own parser in converters.
            sep: separator of each field
            encoding: if the input is bytes, then encoding is used to decode the bytes to string.
            skip_rows: required by np.loadtxt, skip the first n lines
            header_line: which line contains header, started from 1. If you specify use_cols by list of string, then header line must be specified.
            comments: required by np.loadtxt, skip the lines starting with this string
            converters: required by np.loadtxt, a dict of column name to converter function.

        """
        self.dtype = dtype
        self.use_cols = use_cols
        self.sep = sep
        self.encoding = encoding
        self.skip_rows = skip_rows
        self.comments = comments
        self.converters = converters or {}
        self.sort_values = sort_values
        self.parse_date = parse_date
        self.header_line = header_line

        if header_line is None:
            assert parse_date is None or isinstance(
                parse_date, int
            ), "parse_date must be an integer if data contains no header"

            assert use_cols is None or isinstance(
                use_cols[0], int
            ), "use_cols must be a list of integers if data contains no header"

            if len(self.converters) > 1:
                assert all(
                    [isinstance(x, int) for x in self.converters.keys()]
                ), "converters must be a dict of column index to converter function, if there's no header"

        self._parsed_headers = None

    def _parse_header_once(self, stream):
        """parse header and convert use_cols, if columns is specified in string. And if parse_date is required, add it into converters

        Args:
            stream : [description]

        Raises:
            SerializationError: [description]
        """
        if self.header_line is None or self._parsed_headers is not None:
            return

        try:
            line = stream.readlines(self.header_line)[-1]
            cols = line.strip().split(self.sep)
            self._parsed_headers = cols

            use_cols = self.use_cols
            if use_cols is not None and isinstance(use_cols[0], str):
                self.use_cols = [cols.index(col) for col in self.use_cols]

            # convert keys of converters to int
            converters = {cols.index(k): v for k, v in self.converters.items()}

            self.converters = converters

            if isinstance(self.parse_date, str):
                parse_date = cols.index(self.parse_date)
                if parse_date in self.converters.keys():
                    logger.debug(
                        "specify duplicated converter in both parse_date and converters for col %s, use converters.",
                        self.parse_date,
                    )
                else:  # 增加parse_date到converters
                    self.converters[
                        parse_date
                    ] = lambda x: ciso8601.parse_datetime_as_naive(x)

            stream.seek(0)
        except (IndexError, ValueError):
            if line.strip() == "":
                content = "".join(stream.readlines()).strip()
                if len(content) > 0:
                    raise SerializationError(
                        f"specified heder line {self.header_line} is empty"
                    )
                else:
                    raise EmptyResult()
            else:
                raise SerializationError(f"bad header[{self.header_line}]: {line}")

    def __call__(self, data: bytes) -> np.ndarray:
        if self.encoding and isinstance(data, bytes):
            stream = io.StringIO(data.decode(self.encoding))
        else:
            stream = io.StringIO(data)

        try:
            self._parse_header_once(stream)
        except EmptyResult:
            return np.empty((0,), dtype=self.dtype)

        arr = np.loadtxt(
            stream.readlines(),
            delimiter=self.sep,
            skiprows=self.skip_rows,
            dtype=self.dtype,
            usecols=self.use_cols,
            converters=self.converters,
            encoding=self.encoding,
        )

        # 如果返回仅一条记录,有时会出现 shape == ()
        if arr.shape == tuple():
            arr = arr.reshape((-1,))
        if self.sort_values is not None and arr.size > 1:
            return np.sort(arr, order=self.sort_values)
        else:
            return arr

__init__(self, dtype='float', sort_values=None, use_cols=None, parse_date='_time', sep=',', encoding='utf-8', skip_rows=1, header_line=1, comments='#', converters=None) special

construct a deserializer, which will convert a csv like multiline string/bytes array to a numpy array

the data to be deserialized will be first split into array of fields, then use use_cols to select which fields to use, and re-order them by the order of use_cols. After that, the fields will be converted to numpy array and converted into dtype.

by default dtype is float, which means the data will be converted to float. If you need to convert to a numpy structured array, then you can specify the dtype as a list of tuples, e.g.

1
dtype = [('col_1', 'datetime64[s]'), ('col_2', '<U12'), ('col_3', '<U4')]

by default, the deserializer will try to convert every line from the very first line, if the very first lines contains comments and headers, these lines should be skipped by deserializer, you should set skip_rows to number of lines to skip.

for more information, please refer to numpy.loadtxt

Parameters:

Name Type Description Default
dtype List[tuple]

dtype of the output numpy array.

'float'
sort_values Union[str, List[str]]

sort the output numpy array by the specified columns. If it's a string, then it's the name of the column, if it's a list of strings, then it's the names of the columns.

None
use_cols Union[List[str], List[int]]

use only the specified columns. If it's a list of strings, then it's the names of the columns (presented in raw data header line), if it's a list of integers, then it's the column index.

None
parse_date Union[int, str]

by default we'll convert "_time" column into python datetime.datetime. Set it to None to turn off the conversion. ciso8601 is default parser. If you need to parse date but just don't like ciso8601, then you can turn off default parser (by set parse_date to None), and specify your own parser in converters.

'_time'
sep str

separator of each field

','
encoding str

if the input is bytes, then encoding is used to decode the bytes to string.

'utf-8'
skip_rows Union[int, List[int]]

required by np.loadtxt, skip the first n lines

1
header_line int

which line contains header, started from 1. If you specify use_cols by list of string, then header line must be specified.

1
comments str

required by np.loadtxt, skip the lines starting with this string

'#'
converters Mapping[int, Callable]

required by np.loadtxt, a dict of column name to converter function.

None
Source code in omicron/dal/influx/serialize.py
def __init__(
    self,
    dtype: List[tuple] = "float",
    sort_values: Union[str, List[str]] = None,
    use_cols: Union[List[str], List[int]] = None,
    parse_date: Union[int, str] = "_time",
    sep: str = ",",
    encoding: str = "utf-8",
    skip_rows: Union[int, List[int]] = 1,
    header_line: int = 1,
    comments: str = "#",
    converters: Mapping[int, Callable] = None,
):
    """construct a deserializer, which will convert a csv like multiline string/bytes array to a numpy array

    the data to be deserialized will be first split into array of fields, then use use_cols to select which fields to use, and re-order them by the order of use_cols. After that, the fields will be converted to numpy array and converted into dtype.

    by default dtype is float, which means the data will be converted to float. If you need to convert to a numpy structured array, then you can specify the dtype as a list of tuples, e.g.

    ```
    dtype = [('col_1', 'datetime64[s]'), ('col_2', '<U12'), ('col_3', '<U4')]

    ```

    by default, the deserializer will try to convert every line from the very first line, if the very first lines contains comments and headers, these lines should be skipped by deserializer, you should set skip_rows to number of lines to skip.

    for more information, please refer to [numpy.loadtxt](https://numpy.org/doc/stable/reference/generated/numpy.loadtxt.html)

    Args:
        dtype: dtype of the output numpy array.
        sort_values: sort the output numpy array by the specified columns. If it's a string, then it's the name of the column, if it's a list of strings, then it's the names of the columns.
        use_cols: use only the specified columns. If it's a list of strings, then it's the names of the columns (presented in raw data header line), if it's a list of integers, then it's the column index.
        parse_date: by default we'll convert "_time" column into python datetime.datetime. Set it to None to turn off the conversion. ciso8601 is default parser. If you need to parse date but just don't like ciso8601, then you can turn off default parser (by set parse_date to None), and specify your own parser in converters.
        sep: separator of each field
        encoding: if the input is bytes, then encoding is used to decode the bytes to string.
        skip_rows: required by np.loadtxt, skip the first n lines
        header_line: which line contains header, started from 1. If you specify use_cols by list of string, then header line must be specified.
        comments: required by np.loadtxt, skip the lines starting with this string
        converters: required by np.loadtxt, a dict of column name to converter function.

    """
    self.dtype = dtype
    self.use_cols = use_cols
    self.sep = sep
    self.encoding = encoding
    self.skip_rows = skip_rows
    self.comments = comments
    self.converters = converters or {}
    self.sort_values = sort_values
    self.parse_date = parse_date
    self.header_line = header_line

    if header_line is None:
        assert parse_date is None or isinstance(
            parse_date, int
        ), "parse_date must be an integer if data contains no header"

        assert use_cols is None or isinstance(
            use_cols[0], int
        ), "use_cols must be a list of integers if data contains no header"

        if len(self.converters) > 1:
            assert all(
                [isinstance(x, int) for x in self.converters.keys()]
            ), "converters must be a dict of column index to converter function, if there's no header"

    self._parsed_headers = None