IDX-Chronos / utils.py
omniverse1's picture
Update utils.py
7f84e9b verified
import yfinance as yf
import pandas as pd
import numpy as np
import torch
from datetime import datetime, timedelta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import spaces
def get_indonesian_stocks():
return {
"BBCA.JK": "Bank Central Asia",
"BBRI.JK": "Bank BRI",
"BBNI.JK": "Bank BNI",
"BMRI.JK": "Bank Mandiri",
"TLKM.JK": "Telkom Indonesia",
"UNVR.JK": "Unilever Indonesia",
"ASII.JK": "Astra International",
"INDF.JK": "Indofood Sukses Makmur",
"KLBF.JK": "Kalbe Farma",
"HMSP.JK": "HM Sampoerna",
"GGRM.JK": "Gudang Garam",
"ADRO.JK": "Adaro Energy",
"PGAS.JK": "Perusahaan Gas Negara",
"JSMR.JK": "Jasa Marga",
"WIKA.JK": "Wijaya Karya",
"PTBA.JK": "Tambang Batubara Bukit Asam",
"ANTM.JK": "Aneka Tambang",
"SMGR.JK": "Semen Indonesia",
"INTP.JK": "Indocement Tunggal Prakasa",
"ITMG.JK": "Indo Tambangraya Megah"
}
def calculate_technical_indicators(data):
indicators = {}
def calculate_rsi(prices, period=14):
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
indicators['rsi'] = {'current': calculate_rsi(data['Close']).iloc[-1], 'values': calculate_rsi(data['Close'])}
def calculate_macd(prices, fast=12, slow=26, signal=9):
exp1 = prices.ewm(span=fast).mean()
exp2 = prices.ewm(span=slow).mean()
macd = exp1 - exp2
signal_line = macd.ewm(span=signal).mean()
histogram = macd - signal_line
return macd, signal_line, histogram
macd, signal_line, histogram = calculate_macd(data['Close'])
indicators['macd'] = {'macd': macd.iloc[-1], 'signal': signal_line.iloc[-1], 'histogram': histogram.iloc[-1], 'signal_text': 'BUY' if histogram.iloc[-1] > 0 else 'SELL', 'macd_values': macd, 'signal_values': signal_line}
def calculate_bollinger_bands(prices, period=20, std_dev=2):
sma = prices.rolling(window=period).mean()
std = prices.rolling(window=period).std()
upper_band = sma + (std * std_dev)
lower_band = sma - (std * std_dev)
return upper_band, sma, lower_band
upper, middle, lower = calculate_bollinger_bands(data['Close'])
current_price = data['Close'].iloc[-1]
bb_position = (current_price - lower.iloc[-1]) / (upper.iloc[-1] - lower.iloc[-1])
indicators['bollinger'] = {
'upper': upper.iloc[-1],
'middle': middle.iloc[-1],
'lower': lower.iloc[-1],
'upper_values': upper,
'middle_values': middle,
'lower_values': lower,
'position': 'UPPER' if bb_position > 0.8 else 'LOWER' if bb_position < 0.2 else 'MIDDLE'
}
sma_20_series = data['Close'].rolling(20).mean()
sma_50_series = data['Close'].rolling(50).mean()
indicators['moving_averages'] = {'sma_20': sma_20_series.iloc[-1], 'sma_50': sma_50_series.iloc[-1], 'sma_200': data['Close'].rolling(200).mean().iloc[-1], 'ema_12': data['Close'].ewm(span=12).mean().iloc[-1], 'ema_26': data['Close'].ewm(span=26).mean().iloc[-1], 'sma_20_values': sma_20_series, 'sma_50_values': sma_50_series}
indicators['volume'] = {'current': data['Volume'].iloc[-1], 'avg_20': data['Volume'].rolling(20).mean().iloc[-1], 'ratio': data['Volume'].iloc[-1] / data['Volume'].rolling(20).mean().iloc[-1]}
return indicators
def generate_trading_signals(data, indicators):
signals = {}
current_price = data['Close'].iloc[-1]
buy_signals = 0
sell_signals = 0
signal_details = []
rsi = indicators['rsi']['current']
if rsi < 30:
buy_signals += 1
signal_details.append(f"✅ RSI ({rsi:.1f}) - Oversold - BUY signal")
elif rsi > 70:
sell_signals += 1
signal_details.append(f"❌ RSI ({rsi:.1f}) - Overbought - SELL signal")
else:
signal_details.append(f"⚪ RSI ({rsi:.1f}) - Neutral")
macd_hist = indicators['macd']['histogram']
if macd_hist > 0:
buy_signals += 1
signal_details.append(f"✅ MACD Histogram ({macd_hist:.4f}) - Positive - BUY signal")
else:
sell_signals += 1
signal_details.append(f"❌ MACD Histogram ({macd_hist:.4f}) - Negative - SELL signal")
bb_position = indicators['bollinger']['position']
if bb_position == 'LOWER':
buy_signals += 1
signal_details.append(f"✅ Bollinger Bands - Near lower band - BUY signal")
elif bb_position == 'UPPER':
sell_signals += 1
signal_details.append(f"❌ Bollinger Bands - Near upper band - SELL signal")
else:
signal_details.append("⚪ Bollinger Bands - Middle position")
sma_20 = indicators['moving_averages']['sma_20']
sma_50 = indicators['moving_averages']['sma_50']
if current_price > sma_20 > sma_50:
buy_signals += 1
signal_details.append(f"✅ Price above MA(20,50) - Bullish - BUY signal")
elif current_price < sma_20 < sma_50:
sell_signals += 1
signal_details.append(f"❌ Price below MA(20,50) - Bearish - SELL signal")
else:
signal_details.append("⚪ Moving Averages - Mixed signals")
volume_ratio = indicators['volume']['ratio']
if volume_ratio > 1.5:
buy_signals += 0.5
signal_details.append(f"✅ High volume ({volume_ratio:.1f}x avg) - Strengthens BUY signal")
elif volume_ratio < 0.5:
sell_signals += 0.5
signal_details.append(f"❌ Low volume ({volume_ratio:.1f}x avg) - Weakens SELL signal")
else:
signal_details.append(f"⚪ Normal volume ({volume_ratio:.1f}x avg)")
total_signals = buy_signals + sell_signals
signal_strength = (buy_signals / max(total_signals, 1)) * 100
overall_signal = "BUY" if buy_signals > sell_signals else "SELL" if sell_signals > buy_signals else "HOLD"
recent_high = data['High'].tail(20).max()
recent_low = data['Low'].tail(20).min()
signals = {'overall': overall_signal, 'strength': signal_strength, 'details': '\n'.join(signal_details), 'support': recent_low, 'resistance': recent_high, 'stop_loss': recent_low * 0.95 if overall_signal == "BUY" else recent_high * 1.05}
return signals
def get_fundamental_data(stock):
try:
info = stock.info
history = stock.history(period="1d")
fundamental_info = {'name': info.get('longName', 'N/A'), 'current_price': history['Close'].iloc[-1] if not history.empty else 0, 'market_cap': info.get('marketCap', 0), 'pe_ratio': info.get('forwardPE', 0), 'dividend_yield': info.get('dividendYield', 0) * 100 if info.get('dividendYield') else 0, 'volume': history['Volume'].iloc[-1] if not history.empty else 0, 'info': f"Sector: {info.get('sector', 'N/A')}\nIndustry: {info.get('industry', 'N/A')}\nMarket Cap: {info.get('marketCap', 0)}\n52 Week High: {info.get('fiftyTwoWeekHigh', 'N/A')}\n52 Week Low: {info.get('fiftyTwoWeekLow', 'N/A')}\nBeta: {info.get('beta', 'N/A')}\nEPS: {info.get('forwardEps', 'N/A')}\nBook Value: {info.get('bookValue', 'N/A')}\nPrice to Book: {info.get('priceToBook', 'N/A')}"}
return fundamental_info
except:
return {'name': 'N/A', 'current_price': 0, 'market_cap': 0, 'pe_ratio': 0, 'dividend_yield': 0, 'volume': 0, 'info': 'Unable to fetch fundamental data'}
def format_large_number(num):
if num >= 1e12:
return f"{num/1e12:.2f}T"
elif num >= 1e9:
return f"{num/1e9:.2f}B"
elif num >= 1e6:
return f"{num/1e6:.2f}M"
elif num >= 1e3:
return f"{num/1e3:.2f}K"
else:
return f"{num:.2f}"
@spaces.GPU(duration=120)
def predict_prices(data, model=None, tokenizer=None, prediction_days=30):
try:
prices = data['Close'].values.astype(np.float32)
from chronos import BaseChronosPipeline
pipeline = BaseChronosPipeline.from_pretrained("amazon/chronos-bolt-base", device_map="auto")
with torch.no_grad():
forecast = pipeline.predict(context=torch.tensor(prices), prediction_length=prediction_days)
forecast_np = forecast.squeeze().cpu().numpy() if isinstance(forecast, torch.Tensor) else np.array(forecast)
if forecast_np.ndim > 1:
mean_forecast = forecast_np.mean(axis=tuple(range(forecast_np.ndim - 1)))
else:
mean_forecast = forecast_np
last_price = prices[-1]
predicted_high = float(np.max(mean_forecast))
predicted_low = float(np.min(mean_forecast))
predicted_mean = float(np.mean(mean_forecast))
change_pct = ((predicted_mean - last_price) / last_price) * 100 if last_price != 0 else 0
return {'values': mean_forecast, 'dates': pd.date_range(start=data.index[-1] + timedelta(days=1), periods=len(mean_forecast), freq='D'), 'high_30d': predicted_high, 'low_30d': predicted_low, 'mean_30d': predicted_mean, 'change_pct': change_pct, 'summary': f"AI Model: Amazon Chronos-Bolt (Base)\nPredicted High: {predicted_high:.2f}\nPredicted Low: {predicted_low:.2f}\nExpected Change: {change_pct:.2f}%"}
except Exception as e:
print(f"Error in prediction: {e}")
return {'values': [], 'dates': [], 'high_30d': 0, 'low_30d': 0, 'mean_30d': 0, 'change_pct': 0, 'summary': f'Model error: {e}'}
def create_prediction_chart(data, predictions):
if not len(predictions['values']):
return go.Figure()
fig = go.Figure()
fig.add_trace(go.Scatter(x=data.index[-60:], y=data['Close'].values[-60:], name='Historical Price', line=dict(color='blue', width=2)))
fig.add_trace(go.Scatter(x=predictions['dates'], y=predictions['values'], name='AI Prediction', line=dict(color='red', width=2, dash='dash')))
pred_std = np.std(predictions['values'])
upper_band = predictions['values'] + (pred_std * 1.96)
lower_band = predictions['values'] - (pred_std * 1.96)
fig.add_trace(go.Scatter(x=predictions['dates'], y=upper_band, name='Upper Band', line=dict(color='lightcoral', width=1)))
fig.add_trace(go.Scatter(x=predictions['dates'], y=lower_band, name='Lower Band', line=dict(color='lightcoral', width=1), fill='tonexty', fillcolor='rgba(255,182,193,0.2)'))
fig.update_layout(title=f'Price Prediction - Next {len(predictions["dates"])} Days', xaxis_title='Date', yaxis_title='Price (IDR)', hovermode='x unified', height=500)
return fig
def create_price_chart(data, indicators):
fig = make_subplots(rows=3, cols=1, shared_xaxes=True, vertical_spacing=0.05)
fig.add_trace(go.Candlestick(x=data.index, open=data['Open'], high=data['High'], low=data['Low'], close=data['Close'], name='Price'), row=1, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['moving_averages']['sma_20_values'], name='SMA 20', line=dict(color='orange')), row=1, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['moving_averages']['sma_50_values'], name='SMA 50', line=dict(color='blue')), row=1, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['rsi']['values'], name='RSI', line=dict(color='purple')), row=2, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['macd']['macd_values'], name='MACD', line=dict(color='blue')), row=3, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['macd']['signal_values'], name='Signal', line=dict(color='red')), row=3, col=1)
fig.update_layout(title='Technical Analysis Dashboard', height=900, showlegend=True)
return fig
def create_technical_chart(data, indicators):
fig = make_subplots(rows=2, cols=2, subplot_titles=('Bollinger Bands', 'Volume', 'Price vs MA', 'RSI Analysis'))
fig.add_trace(go.Scatter(x=data.index, y=data['Close'], name='Price', line=dict(color='black')), row=1, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['bollinger']['upper_values'], name='Upper Band', line=dict(color='red')), row=1, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['bollinger']['lower_values'], name='Lower Band', line=dict(color='green'), fill='tonexty', fillcolor='rgba(0,255,0,0.1)'), row=1, col=1)
fig.add_trace(go.Bar(x=data.index, y=data['Volume'], name='Volume', marker_color='lightblue'), row=1, col=2)
fig.add_trace(go.Scatter(x=data.index, y=data['Close'], name='Price', line=dict(color='gray')), row=2, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['moving_averages']['sma_20_values'], name='SMA 20', line=dict(color='orange', dash='dash')), row=2, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['moving_averages']['sma_50_values'], name='SMA 50', line=dict(color='blue', dash='dash')), row=2, col=1)
fig.add_trace(go.Scatter(x=data.index, y=indicators['rsi']['values'], name='RSI', line=dict(color='purple')), row=2, col=2)
fig.add_hline(y=70, line_dash="dash", line_color="red", row=2, col=2)
fig.add_hline(y=30, line_dash="dash", line_color="green", row=2, col=2)
fig.update_layout(title='Technical Indicators Overview', height=800, showlegend=False, hovermode='x unified')
return fig