Преобразование данных запаса OHLC в другой таймфрейм с помощью python и pandas
Может ли кто-нибудь указать мне в правильном направлении относительно преобразования таймфрейма данных OHLC с помощью Pandas? То, что я пытаюсь сделать, это построить Dataframe с данными для более высоких таймфреймов, учитывая данные с более низким таймфреймом.
Например, если у меня есть следующие одноминутные (M1) данные:
Open High Low Close Volume
Date
1999-01-04 10:22:00 1.1801 1.1819 1.1801 1.1817 4
1999-01-04 10:23:00 1.1817 1.1818 1.1804 1.1814 18
1999-01-04 10:24:00 1.1817 1.1817 1.1802 1.1806 12
1999-01-04 10:25:00 1.1807 1.1815 1.1795 1.1808 26
1999-01-04 10:26:00 1.1803 1.1806 1.1790 1.1806 4
1999-01-04 10:27:00 1.1801 1.1801 1.1779 1.1786 23
1999-01-04 10:28:00 1.1795 1.1801 1.1776 1.1788 28
1999-01-04 10:29:00 1.1793 1.1795 1.1782 1.1789 10
1999-01-04 10:31:00 1.1780 1.1792 1.1776 1.1792 12
1999-01-04 10:32:00 1.1788 1.1792 1.1788 1.1791 4
который имеет Open, High, Low, Close (OHLC) и значения громкости за каждую минуту, я хотел бы построить набор 5-минутных показаний (M5), который будет выглядеть так:
Open High Low Close Volume
Date
1999-01-04 10:25:00 1.1807 1.1815 1.1776 1.1789 91
1999-01-04 10:30:00 1.1780 1.1792 1.1776 1.1791 16
Итак, рабочий процесс таков:
- Open - это открытие первой строки в timewindow
- Высокий - самый высокий максимум в timewindow
- Низкий низкий
- Закрыть - последнее Закрыть
- Объем - это просто сумма томов
Есть несколько проблем:
- данные имеют пробелы (обратите внимание, что нет строки 10:30:00).
- 5-минутные интервалы должны начинаться в круглое время, например. M5 начинается в 10:25:00, а не 10:22:00.
- во-первых, неполный набор может быть опущен, как в этом примере, или включен (так что мы могли бы иметь 10:20:00 5-минутную запись)
Документация Pandas для восходящей выборки дает пример, но они используют среднее значение как значение строки с повторением выборки, который здесь не будет работать. Я пробовал использовать groupby
и agg
, но безрезультатно. Для одного получить высокий Высокий и низкий Низкий может быть не так сложно, но я понятия не имею, как получить сначала Open и last Close.
То, что я пробовал, это что-то вроде:
grouped = slice.groupby( dr5minute.asof ).agg(
{ 'Low': lambda x : x.min()[ 'Low' ], 'High': lambda x : x.max()[ 'High' ] }
)
но это приводит к следующей ошибке, которую я не понимаю:
In [27]: grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <module>()
----> 1 grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )
/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in agg(self, func, *args, **kwargs)
242 See docstring for aggregate
243 """
--> 244 return self.aggregate(func, *args, **kwargs)
245
246 def _iterate_slices(self):
/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, arg, *args, **kwargs)
1153 colg = SeriesGroupBy(obj[col], column=col,
1154 grouper=self.grouper)
-> 1155 result[col] = colg.aggregate(func)
1156
1157 result = DataFrame(result)
/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, func_or_funcs, *args, **kwargs)
906 return self._python_agg_general(func_or_funcs, *args, **kwargs)
907 except Exception:
--> 908 result = self._aggregate_named(func_or_funcs, *args, **kwargs)
909
910 index = Index(sorted(result), name=self.grouper.names[0])
/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in _aggregate_named(self, func, *args, **kwargs)
976 grp = self.get_group(name)
977 grp.name = name
--> 978 output = func(grp, *args, **kwargs)
979 if isinstance(output, np.ndarray):
980 raise Exception('Must produce aggregated value')
/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <lambda>(x)
----> 1 grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )
IndexError: invalid index to scalar variable.
Так что любая помощь в этом была бы весьма признательна. Если путь, который я выбрал, не будет работать, предложите другой относительно эффективный подход (у меня есть миллионы строк). Некоторые ресурсы на использование Pandas для финансовой обработки также будут приятными.
Ответы
Ответ 1
Ваш подход звучит, но терпит неудачу, потому что каждая функция в dict-of-functions применяется к agg()
получает объект Series, отражающий столбец, соответствующий значению ключа. Поэтому нет необходимости
снова наклейте на метку столбца. При этом и при условии, что groupby сохраняет порядок,
вы можете нарезать серию для извлечения первого/последнего элемента Open/Close
столбцы (примечание: документация groupby не требует сохранения порядка исходных данных
серии, но, кажется, на практике.)
In [50]: df.groupby(dr5minute.asof).agg({'Low': lambda s: s.min(),
'High': lambda s: s.max(),
'Open': lambda s: s[0],
'Close': lambda s: s[-1],
'Volume': lambda s: s.sum()})
Out[50]:
Close High Low Open Volume
key_0
1999-01-04 10:20:00 1.1806 1.1819 1.1801 1.1801 34
1999-01-04 10:25:00 1.1789 1.1815 1.1776 1.1807 91
1999-01-04 10:30:00 1.1791 1.1792 1.1776 1.1780 16
Для справки, вот таблица, в которой суммируются ожидаемые
входных и выходных типов функции агрегации на основе типа объекта groupby и того, как функция (аг) агрегации передана/передается в agg().
agg() method agg func agg func agg()
input type accepts returns result
GroupBy Object
SeriesGroupBy function Series value Series
dict-of-funcs Series value DataFrame, columns match dict keys
list-of-funcs Series value DataFrame, columns match func names
DataFrameGroupBy function DataFrame Series/dict/ary DataFrame, columns match original DataFrame
dict-of-funcs Series value DataFrame, columns match dict keys, where dict keys must be columns in original DataFrame
list-of-funcs Series value DataFrame, MultiIndex columns (original cols x func names)
Из приведенной выше таблицы, если для агрегации требуется доступ к нескольким
столбец, единственный вариант - передать одну функцию в
Объект DataFrameGroupBy. Поэтому альтернативный способ выполнения первоначальной задачи - определить
функция, подобная следующей:
def ohlcsum(df):
df = df.sort()
return {
'Open': df['Open'][0],
'High': df['High'].max(),
'Low': df['Low'].min(),
'Close': df['Close'][-1],
'Volume': df['Volume'].sum()
}
и примените к нему agg():
In [30]: df.groupby(dr5minute.asof).agg(ohlcsum)
Out[30]:
Open High Low Close Volume
key_0
1999-01-04 10:20:00 1.1801 1.1819 1.1801 1.1806 34
1999-01-04 10:25:00 1.1807 1.1815 1.1776 1.1789 91
1999-01-04 10:30:00 1.1780 1.1792 1.1776 1.1791 16
Хотя pandas может предложить более чистую встроенную магию в будущем, надеюсь, это объясняет, как работать с сегодняшними возможностями agg().
Ответ 2
просто для того, чтобы быть полезным другим пользователям с более поздней версией Pandas, метод resample очень быстрый и полезный для выполнения той же задачи:
ohlc_dict = {
'Open':'first',
'High':'max',
'Low':'min',
'Close': 'last',
'Volume': 'sum'
}
df.resample('5T', how=ohlc_dict, closed='left', label='left')
Ответ 3
В моей функции main() я получаю потоковые данные о ставках/запросах. Затем я делаю следующее:
df = pd.DataFrame([])
for msg_type, msg in response.parts():
if msg_type == "pricing.Price":
sd = StreamingData(datetime.now(),instrument_string(msg),
mid_string(msg),account_api,account_id,
's','5min',balance)
df = df.append(sd.df())
sd.resample(df)
Я создал класс StreamingData(), который принимает предоставленный ввод (также создал некоторые функции, чтобы разбить данные ставки/запроса на отдельные компоненты (ставка, запрос, середина, инструмент и т.д.),.
Красота этого - все, что вам нужно сделать, это изменить 's и ' 5min ' на любые временные рамки. Установите его в "m" и "D", чтобы получить ежедневные цены за минуту.
Вот что выглядит моя StreamingData():
class StreamingData(object):
def __init__(self, time, instrument, mid, api, _id, xsec, xmin, balance):
self.time = time
self.instrument = instrument
self.mid = mid
self.api = api
self._id = _id
self.xsec = xsec
self.xmin = xmin
self.balance = balance
self.data = self.resample(self.df())
def df(self):
df1 = pd.DataFrame({'Time':[self.time]})
df2 = pd.DataFrame({'Mid':[float(self.mid)]})
df3 = pd.concat([df1,df2],axis=1,join='inner')
df = df3.set_index(['Time'])
df.index = pd.to_datetime(df.index,unit='s')
return df
def resample(self, df):
xx = df.to_period(freq=self.xsec)
openCol = xx.resample(self.xmin).first()
highCol = xx.resample(self.xmin).max()
lowCol = xx.resample(self.xmin).min()
closeCol = xx.resample(self.xmin).last()
self.data = pd.concat([openCol,highCol,lowCol,closeCol],
axis=1,join='inner')
self.data['Open'] = openCol.round(5)
self.data['High'] = highCol.round(5)
self.data['Low'] = lowCol.round(5)
self.data['Close'] = closeCol.round(5)
return self.data
Таким образом, он принимает данные из StreamingData(), создает индексированный по времени блок данных в df(), добавляет его, а затем передает через resample ( ). Цены, которые я рассчитываю, основаны на: mid = (bid + ask)/2
Ответ 4
bitfinex_klines_pd = bitfinex_klines_pd.resample('4h').agg({
'open': lambda s: s[0],
'high': lambda df: df.max(),
'low': lambda df: df.min(),
'close': lambda df: df[-1],
'volume': lambda df: df.sum()
})