Dmitry Beresnev commited on
Commit
fdbda30
·
1 Parent(s): 91b6409

add news pooling

Browse files
.env.example CHANGED
@@ -5,4 +5,6 @@ GEMINI_API_TOKEN=
5
  OPENROUTER_API_TOKEN=
6
  GOOGLE_APPS_SCRIPT_URL=
7
  WEBHOOK_SECRET=
8
- SPACE_URL=
 
 
 
5
  OPENROUTER_API_TOKEN=
6
  GOOGLE_APPS_SCRIPT_URL=
7
  WEBHOOK_SECRET=
8
+ SPACE_URL=
9
+ HF_TOKEN=
10
+ HF_DATASET_REPO=
requirements.txt CHANGED
@@ -6,4 +6,7 @@ httpx>=0.25.0
6
  python-dotenv==1.0.0
7
  pydantic==2.5.0
8
  typing-extensions==4.8.0
9
- pytz==2025.2
 
 
 
 
6
  python-dotenv==1.0.0
7
  pydantic==2.5.0
8
  typing-extensions==4.8.0
9
+ pytz==2025.2
10
+ datasets
11
+ huggingface_hub
12
+ pandas
src/services/__init__.py ADDED
File without changes
src/services/news_pooling_service.py ADDED
@@ -0,0 +1,320 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ News polling service for fetching and processing Finnhub news
3
+
4
+ THIS MODULE SHOULD BE REFACTORED
5
+ """
6
+ import asyncio
7
+ import httpx
8
+ from datetime import datetime, timezone, timedelta
9
+ from typing import Any
10
+ import logging
11
+ from contextlib import asynccontextmanager
12
+
13
+ import os
14
+ import hashlib
15
+ from datasets import Dataset, load_dataset
16
+ from huggingface_hub import login
17
+ import pandas as pd
18
+
19
+ from src.telegram_bot.config import Config
20
+ from src.telegram_bot.logger import main_logger as logger
21
+
22
+
23
+ class NewsPollingService:
24
+ def __init__(self, chat_id, finnhub_api_key: str = None, hf_token: str = None, hf_repo: str = None):
25
+ self._api_key = Config.FINNHUB_API_KEY or finnhub_api_key
26
+ self._base_url = "https://finnhub.io/api/v1"
27
+ self._http_client: httpx.AsyncClient | None = None
28
+ self._polling_task: asyncio.Task | None = None
29
+ self._is_running = False
30
+ self._poll_interval = 300 # 5 minutes
31
+ self._subscribers = [] # List of callbacks for new news
32
+ self._hf_token = Config.HF_TOKEN or hf_token
33
+ self._hf_repo = Config.HF_DATASET_REPO or hf_repo
34
+ self._chat_id = chat_id
35
+
36
+ async def initialize(self):
37
+ """Initialize the polling service"""
38
+ self._http_client = httpx.AsyncClient(
39
+ timeout=httpx.Timeout(30.0),
40
+ limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
41
+ )
42
+ logger.info("News polling service initialized")
43
+
44
+ async def cleanup(self):
45
+ """Cleanup resources"""
46
+ await self.stop_polling()
47
+ if self._http_client:
48
+ await self._http_client.aclose()
49
+ logger.info("News polling service cleaned up")
50
+
51
+ def subscribe_to_news(self, callback):
52
+ """Subscribe to new news notifications"""
53
+ self._subscribers.append(callback)
54
+
55
+ def unsubscribe_from_news(self, callback):
56
+ """Unsubscribe from news notifications"""
57
+ if callback in self._subscribers:
58
+ self._subscribers.remove(callback)
59
+
60
+ async def _notify_subscribers(self, new_articles: list[dict[str, Any]]):
61
+ """Notify all subscribers about new articles"""
62
+ if not new_articles or not self._subscribers:
63
+ return
64
+
65
+ for callback in self._subscribers[:]: # Copy list to avoid modification during iteration
66
+ try:
67
+ if asyncio.iscoroutinefunction(callback):
68
+ await callback(self._chat_id, new_articles)
69
+ else:
70
+ callback(self._chat_id, new_articles)
71
+ except Exception as e:
72
+ logger.error(f"Error notifying subscriber: {e}")
73
+
74
+ async def fetch_general_news(self, category: str = "general", min_id: str = "0") -> list[dict[str, Any]]:
75
+ """Fetch general news from Finnhub"""
76
+ try:
77
+ url = f"{self._base_url}/news"
78
+ params = {
79
+ "category": category,
80
+ "token": self._api_key,
81
+ "minId": min_id
82
+ }
83
+
84
+ logger.info(f"Fetching news: category={category}, minId={min_id}")
85
+ response = await self._http_client.get(url, params=params)
86
+ response.raise_for_status()
87
+
88
+ articles = response.json()
89
+ logger.info(f"Fetched {len(articles)} articles from Finnhub")
90
+ return articles
91
+
92
+ except Exception as e:
93
+ logger.error(f"Error fetching general news: {e}")
94
+ return []
95
+
96
+ async def fetch_company_news(self, symbol: str, from_date: str, to_date: str) -> list[dict[str, Any]]:
97
+ """Fetch company-specific news from Finnhub"""
98
+ try:
99
+ url = f"{self._base_url}/company-news"
100
+ params = {
101
+ "symbol": symbol,
102
+ "from": from_date,
103
+ "to": to_date,
104
+ "token": self._api_key
105
+ }
106
+
107
+ logger.info(f"Fetching company news: symbol={symbol}, from={from_date}, to={to_date}")
108
+ response = await self._http_client.get(url, params=params)
109
+ response.raise_for_status()
110
+
111
+ articles = response.json()
112
+ logger.info(f"Fetched {len(articles)} company articles for {symbol}")
113
+ return articles
114
+
115
+ except Exception as e:
116
+ logger.error(f"Error fetching company news for {symbol}: {e}")
117
+ return []
118
+
119
+ async def fetch_market_news(self, category: str = "forex") -> list[dict[str, Any]]:
120
+ """Fetch market news (forex, crypto, merger)"""
121
+ try:
122
+ url = f"{self._base_url}/news"
123
+ params = {
124
+ "category": category,
125
+ "token": self._api_key
126
+ }
127
+
128
+ response = await self._http_client.get(url, params=params)
129
+ response.raise_for_status()
130
+
131
+ articles = response.json()
132
+ logger.info(f"Fetched {len(articles)} {category} articles")
133
+ return articles
134
+
135
+ except Exception as e:
136
+ logger.error(f"Error fetching {category} news: {e}")
137
+ return []
138
+
139
+ async def load_existing_articles(self) -> pd.DataFrame:
140
+ """Load existing dataset from HF"""
141
+ if not self._hf_token or not self._hf_repo:
142
+ logger.warning("Hugging Face config not set.")
143
+ return pd.DataFrame()
144
+ try:
145
+ login(self._hf_token)
146
+ existing = load_dataset(self._hf_repo, split="train")
147
+ df = existing.to_pandas()
148
+ return df
149
+ except Exception as e:
150
+ logger.warning(f"Could not load HF dataset: {e}")
151
+ return pd.DataFrame()
152
+
153
+ async def save_articles_to_hf(self, articles: list[dict[str, Any]]):
154
+ """Save new articles to Hugging Face Dataset"""
155
+ if not self._hf_token or not self._hf_repo:
156
+ logger.warning("HF_TOKEN or HF_DATASET_REPO not set.")
157
+ return
158
+
159
+ login(self._hf_token)
160
+
161
+ for article in articles:
162
+ uid = f"{article.get('datetime', '')}_{article.get('url', '')}"
163
+ article['id'] = hashlib.sha256(uid.encode()).hexdigest()
164
+ article['persisted_at'] = datetime.utcnow().isoformat()
165
+
166
+ df_new = pd.DataFrame(articles)
167
+ df_existing = await self.load_existing_articles()
168
+
169
+ df_combined = pd.concat([df_existing, df_new], ignore_index=True)
170
+ df_combined.drop_duplicates(subset=["id"], inplace=True)
171
+
172
+ dataset = Dataset.from_pandas(df_combined)
173
+ dataset.push_to_hub(self._hf_repo, private=True)
174
+
175
+ logger.info(f"Pushed {len(df_new)} new articles to HF dataset: {self._hf_repo}")
176
+
177
+ async def get_last_processed_timestamp(self) -> int:
178
+ """Get latest timestamp from HF Dataset"""
179
+ df = await self.load_existing_articles()
180
+ if df.empty or "datetime" not in df.columns:
181
+ return 0
182
+ return int(df["datetime"].max())
183
+
184
+ async def process_new_articles(self, tickers: list[str]) -> int:
185
+ """Process and persist new articles to Hugging Face Datasets"""
186
+ try:
187
+ last_timestamp = await self.get_last_processed_timestamp()
188
+ logger.info(f"Processing articles since timestamp: {last_timestamp}")
189
+
190
+ now = datetime.now(timezone.utc)
191
+ from_date = datetime.fromtimestamp(last_timestamp, timezone.utc)
192
+ from_str = from_date.strftime("%Y-%m-%d")
193
+ to_str = now.strftime("%Y-%m-%d")
194
+
195
+ all_new_articles = []
196
+
197
+ '''
198
+ # 1. General news
199
+ general_news = await self.fetch_general_news("general")
200
+ new_general = [a for a in general_news if a.get('datetime', 0) > last_timestamp]
201
+ all_new_articles.extend(new_general)
202
+
203
+
204
+ # 2. Market news
205
+ for category in ["forex", "crypto", "merger"]:
206
+ try:
207
+ cat_news = await self.fetch_market_news(category)
208
+ new_cat = [
209
+ {**a, 'category': category}
210
+ for a in cat_news
211
+ if a.get('datetime', 0) > last_timestamp
212
+ ]
213
+ all_new_articles.extend(new_cat)
214
+ await asyncio.sleep(1)
215
+ except Exception as e:
216
+ logger.error(f"Error fetching {category} news: {e}")
217
+ '''
218
+
219
+ # 3. Company news
220
+ for ticker in tickers:
221
+ try:
222
+ company_news = await self.fetch_company_news(ticker, from_str, to_str)
223
+ new_company = [
224
+ {**a, 'category': f'company_{ticker.lower()}', 'related': [ticker]}
225
+ for a in company_news
226
+ if a.get('datetime', 0) > last_timestamp
227
+ ]
228
+ all_new_articles.extend(new_company)
229
+ await asyncio.sleep(1)
230
+ except Exception as e:
231
+ logger.error(f"Error fetching news for {ticker}: {e}")
232
+
233
+ # Deduplication
234
+ seen = set()
235
+ unique_articles = []
236
+ for a in all_new_articles:
237
+ key = (a.get('url', ''), a.get('headline', ''))
238
+ if key not in seen and key != ('', ''):
239
+ seen.add(key)
240
+ unique_articles.append(a)
241
+
242
+ if unique_articles:
243
+ unique_articles.sort(key=lambda x: x.get('datetime', 0))
244
+ await self.save_articles_to_hf(unique_articles)
245
+ await self._notify_subscribers(unique_articles)
246
+ logger.info(f"Processed and saved {len(unique_articles)} articles.")
247
+ return len(unique_articles)
248
+
249
+ else:
250
+ logger.info("No new articles found.")
251
+ return 0
252
+
253
+ except Exception as e:
254
+ logger.error(f"Error processing new articles: {e}")
255
+ return 0
256
+
257
+ async def _polling_loop(self, tickers: list[str]):
258
+ """Main polling loop"""
259
+ logger.info("Starting news polling loop")
260
+
261
+ while self._is_running:
262
+ try:
263
+ start_time = datetime.now()
264
+ new_count = await self.process_new_articles(tickers)
265
+
266
+ processing_time = (datetime.now() - start_time).total_seconds()
267
+ logger.info(f"Polling completed in {processing_time:.2f}s, found {new_count} new articles")
268
+
269
+ # Wait for next poll
270
+ await asyncio.sleep(self._poll_interval)
271
+
272
+ except asyncio.CancelledError:
273
+ logger.info("Polling loop cancelled")
274
+ break
275
+ except Exception as e:
276
+ logger.error(f"Error in polling loop: {e}")
277
+ # Wait a bit before retrying
278
+ await asyncio.sleep(min(60, self._poll_interval))
279
+
280
+ async def start_polling(self, tickers: list[str], interval: int = 300):
281
+ """Start the polling process"""
282
+ if self._is_running:
283
+ logger.warning("Polling is already running")
284
+ return
285
+
286
+ self._poll_interval = interval
287
+ self._is_running = True
288
+ self._polling_task = asyncio.create_task(self._polling_loop(tickers))
289
+ logger.info(f"Started news polling with {interval}s interval")
290
+
291
+ async def stop_polling(self):
292
+ """Stop the polling process"""
293
+ if not self._is_running:
294
+ return
295
+
296
+ self._is_running = False
297
+ if self._polling_task and not self._polling_task.done():
298
+ self._polling_task.cancel()
299
+ try:
300
+ await self._polling_task
301
+ except asyncio.CancelledError:
302
+ pass
303
+ logger.info("Stopped news polling")
304
+
305
+ async def force_poll_now(self, tickers: list[str]) -> int:
306
+ """Force an immediate poll"""
307
+ logger.info("Forcing immediate news poll")
308
+ return await self.process_new_articles(tickers)
309
+
310
+ async def get_polling_status(self) -> dict[str, Any]:
311
+ """Get current polling status"""
312
+ stats = await self.db.get_database_stats()
313
+
314
+ return {
315
+ "is_running": self._is_running,
316
+ "poll_interval": self._poll_interval,
317
+ "subscribers_count": len(self._subscribers),
318
+ "database_stats": stats,
319
+ "task_status": "running" if self._polling_task and not self._polling_task.done() else "stopped"
320
+ }
src/telegram_bot/config.py CHANGED
@@ -12,6 +12,8 @@ class Config:
12
  PORT: int = int(os.getenv("PORT", 7860))
13
  FINNHUB_API_KEY = os.getenv('FINNHUB_API_TOKEN')
14
  OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_TOKEN', '')
 
 
15
 
16
  @classmethod
17
  def validate(cls) -> bool:
 
12
  PORT: int = int(os.getenv("PORT", 7860))
13
  FINNHUB_API_KEY = os.getenv('FINNHUB_API_TOKEN')
14
  OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_TOKEN', '')
15
+ HF_TOKEN = os.getenv('HF_TOKEN', '')
16
+ HF_DATASET_REPO = os.getenv('HF_DATASET_REPO', '')
17
 
18
  @classmethod
19
  def validate(cls) -> bool:
src/telegram_bot/telegram_bot_service.py CHANGED
@@ -13,6 +13,7 @@ from src.telegram_bot.tg_models import TelegramUpdate, TelegramMessage
13
  from src.api.finnhub.financial_news_requester import fetch_comp_financial_news
14
  from src.api.openrouter.openrouter_client import OpenRouterClient
15
  from src.api.openrouter.prompt_generator import PromptGenerator
 
16
 
17
 
18
  class TelegramBotService:
@@ -239,3 +240,45 @@ class TelegramBotService:
239
  except Exception as e:
240
  main_logger.error(f"Error in news_feed_analysing_by_ticker: {e}")
241
  await self.send_message_via_proxy(chat_id, f"Sorry, there was an error fetching news: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  from src.api.finnhub.financial_news_requester import fetch_comp_financial_news
14
  from src.api.openrouter.openrouter_client import OpenRouterClient
15
  from src.api.openrouter.prompt_generator import PromptGenerator
16
+ from src.services.news_pooling_service import NewsPollingService
17
 
18
 
19
  class TelegramBotService:
 
240
  except Exception as e:
241
  main_logger.error(f"Error in news_feed_analysing_by_ticker: {e}")
242
  await self.send_message_via_proxy(chat_id, f"Sorry, there was an error fetching news: {str(e)}")
243
+
244
+ async def news_analysing_by_ticker(self, chat_id: int, news_list: list[dict[str, Any]]) -> None:
245
+ """
246
+ Analyze news using OpenRouter LLM.
247
+ Args:
248
+ news: News text to analyze.
249
+ Returns:
250
+ Analysis result as a string.
251
+ """
252
+ client = OpenRouterClient(api_key=Config.OPENROUTER_API_KEY)
253
+ for news in news_list:
254
+ start_time = time.perf_counter()
255
+ prompt = PromptGenerator.format_prompt(news)
256
+ main_logger.info(f"LLM Prompt: {prompt}")
257
+ response = client.chat(prompt)
258
+ main_logger.info(f"LLM Response: {response}")
259
+ elapsed = time.perf_counter() - start_time
260
+ main_logger.info(f"Processing time: {elapsed:.2f} seconds")
261
+ await self.send_message_via_proxy(chat_id, f"LLM Response for {news.get('headline', 'No headline')}: {response}")
262
+
263
+ async def news_feed_pooling_by_ticker(
264
+ self, ticker: str, chat_id: int, text: str | None, user_name: str
265
+ ) -> None:
266
+ await self.send_message_via_proxy(chat_id, f"Fetching latest financial news for ticker {ticker} ...")
267
+ try:
268
+ eastern = timezone("US/Eastern")
269
+ now_eastern = datetime.now(eastern)
270
+ if now_eastern.hour < 9 or (now_eastern.hour == 9 and now_eastern.minute < 30) or now_eastern.hour >= 16:
271
+ # Before 9:30 AM or after 4:00 PM Eastern, use yesterday
272
+ current_date = (now_eastern - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
273
+ else:
274
+ current_date = now_eastern.replace(hour=0, minute=0, second=0, microsecond=0)
275
+ ticker=ticker
276
+ date_from=current_date.strftime('%Y-%m-%d')
277
+ date_to=current_date.strftime('%Y-%m-%d')
278
+ newspollingservice = NewsPollingService(chat_id)
279
+ newspollingservice.subscribe_to_news(self.news_analysing_by_ticker)
280
+ feed = await newspollingservice.process_new_articles(tickers=[ticker])
281
+ #main_logger.info(f"Processed: {len(feed)} news items for ticker {ticker}")
282
+ except Exception as e:
283
+ main_logger.error(f"Error in news_feed_analysing_by_ticker: {e}")
284
+ await self.send_message_via_proxy(chat_id, f"Sorry, there was an error fetching news: {str(e)}")