You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

52 lines
1.9 KiB

import time
import requests
from typing import Any, Optional, Dict, List
import diskcache as dc
from threading import Lock
class CacheManager:
def __init__(self, cache_dir='cache', max_connections: int = 10):
# Initialize the disk cache
self.cache = dc.Cache(cache_dir)
self.fetch_url_lock = Lock()
def get(self, key: str) -> Optional[Any]:
# Fetch the value from the cache
return self.cache.get(key)
def set(self, key: str, value: Any, expire: int = None):
# Store the value in the cache with optional expiry
self.cache.set(key, value, expire=expire)
def purge(self, key: str):
self.cache.delete(key)
def bulk_get(self, keys: List[str]) -> Dict[str, Any]:
# Use a dictionary comprehension to fetch multiple values
return {key: self.cache.get(key) for key in keys if key in self.cache}
def fetch_url(self, url: str, force_refresh: bool = False, expiry: int = 86400*30) -> Optional[str]:
cache_key = f"url_{url}"
if not force_refresh:
cached_data = self.get(cache_key)
if cached_data:
cached_time = cached_data['timestamp']
if time.time() - cached_time < expiry:
return cached_data['content']
# Fetch the URL if not in cache or if a refresh is requested
with self.fetch_url_lock:
print(f"Fetching URL: {url}")
response = requests.get(url)
if response.status_code == 200:
content = response.text
self.set(cache_key, {
'content': content,
'timestamp': time.time()
}, expire=expiry)
time.sleep(0.25) # Throttle requests to avoid being blocked
return content
return None
def close(self):
self.cache.close()