Warning!!!
Using this snippet can introduce significant lags on your entry order placement due to the need to loop through all pairs, which means more pairs in the whitelist will result in more lags.
For this example, I will only trade 5 pairs with lowest RSI. The code is simple and I have added few comments, so no more explanation needed. This code should be working in backtest as well.
import talib.abstract as ta
from freqtrade.strategy import IStrategy
from pandas import DataFrame
from typing import Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
# Binance USDT Futures BTC only
class test_rank(IStrategy):
def version(self) -> str:
return "test-rank"
INTERFACE_VERSION = 3
minimal_roi = {"0": 100}
stoploss = -0.99
timeframe = "5m"
process_only_new_candles = True
startup_candle_count = 999
rank_data = dict()
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe["rsi"] = ta.RSI(dataframe["close"], timeperiod=14)
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe["enter_long"] = 1
dataframe["enter_tag"] = "test_rank"
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe["exit_tag"] = ""
return dataframe
def confirm_trade_entry(
self,
pair: str,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
current_time: datetime,
entry_tag: Optional[str],
side: str,
**kwargs,
) -> bool:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_datestamp = last_candle["date"]
if last_datestamp not in self.rank_data:
# Reset the dict object
self.rank_data = dict()
self.rank_data[last_datestamp] = dict()
# create empty dict
rank_data = dict()
# store RSI data of current pair
rank_data[pair] = last_candle["rsi"]
# get list of all pairs in whitelist
pairs = self.dp.current_whitelist()
# loop through all pairs to get their RSI values
for cur_pair in pairs:
if cur_pair not in rank_data:
cur_dataframe, _ = self.dp.get_analyzed_dataframe(
cur_pair, self.timeframe
)
last_candle = cur_dataframe.iloc[-1].squeeze()
rank_data[cur_pair] = last_candle["rsi"]
# After all data fetched, sort it ascending for long position
sorted_pairs = sorted(rank_data.items(), key=lambda x: x[1])
# Sort it in descending order for short position
sorted_pairs_desc = sorted(rank_data.items(), key=lambda x:x[1], reverse=True)
# Trim the list to get top 5 pairs with lowest/highest RSI
trimmed_list = sorted_pairs[0:5]
trimmed_list_desc = sorted_pairs_desc[0:5]
# Change the list back to dict
sorted_dict = dict(trimmed_list)
sorted_dict_desc = dict(trimmed_list_desc)
# save the new dict
self.rank_data[last_datestamp]["long"] = sorted_dict
self.rank_data[last_datestamp]["short"] = sorted_dict_desc
return pair in self.rank_data[last_datestamp][side]
[…] How to trade x amount of pairs based of overall indicator rank […]