How to trade only x amount of pairs with lowest/highest Y indicator value

For this example, I will only trade 5 pairs with lowest RSI. The code is simple and I have added few comments, so no more explanation needed. This code should be working in backtest as well.

import talib.abstract as ta
from freqtrade.strategy import IStrategy
from pandas import DataFrame
from typing import Optional
from datetime import datetime
import logging


logger = logging.getLogger(__name__)


# Binance USDT Futures BTC only
class test_rank(IStrategy):
    def version(self) -> str:
        return "test-rank"

    INTERFACE_VERSION = 3

    minimal_roi = {"0": 100}

    stoploss = -0.99

    timeframe = "5m"

    process_only_new_candles = True
    startup_candle_count = 999

    rank_data = dict()

    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        dataframe["rsi"] = ta.RSI(dataframe["close"], timeperiod=14)

        return dataframe

    def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        dataframe["enter_long"] = 1
        dataframe["enter_tag"] = "test_rank"

        return dataframe

    def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        dataframe["exit_tag"] = ""

        return dataframe

    def confirm_trade_entry(
        self,
        pair: str,
        order_type: str,
        amount: float,
        rate: float,
        time_in_force: str,
        current_time: datetime,
        entry_tag: Optional[str],
        side: str,
        **kwargs,
    ) -> bool:
        dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)

        last_candle = dataframe.iloc[-1].squeeze()
        last_datestamp = last_candle["date"]

        if last_datestamp not in self.rank_data:
            # Reset the dict object
            self.rank_data = dict()
            self.rank_data[last_datestamp] = dict()

            # create empty dict
            rank_data = dict()

            # store RSI data of current pair
            rank_data[pair] = last_candle["rsi"]

            # get list of all pairs in whitelist
            pairs = self.dp.current_whitelist()

            # loop through all pairs to get their RSI values
            for cur_pair in pairs:
                if cur_pair not in rank_data:
                    cur_dataframe, _ = self.dp.get_analyzed_dataframe(
                        cur_pair, self.timeframe
                    )
                    last_candle = cur_dataframe.iloc[-1].squeeze()
                    rank_data[cur_pair] = last_candle["rsi"]

            # After all data fetched, sort it ascending for long position
            sorted_pairs = sorted(rank_data.items(), key=lambda x: x[1])
            # Sort it in descending order for short position
            sorted_pairs_desc = sorted(rank_data.items(), key=lambda x:x[1], reverse=True)

            # Trim the list to get top 5 pairs with lowest/highest RSI
            trimmed_list = sorted_pairs[0:5]
            trimmed_list_desc = sorted_pairs_desc[0:5]

            # Change the list back to dict
            sorted_dict = dict(trimmed_list)
            sorted_dict_desc = dict(trimmed_list_desc)

            # save the new dict
            self.rank_data[last_datestamp]["long"] = sorted_dict
            self.rank_data[last_datestamp]["short"] = sorted_dict_desc

        return pair in self.rank_data[last_datestamp][side]

One comment

Leave a Reply

Your email address will not be published. Required fields are marked *