load-balancer / utils.py
ChandimaPrabath's picture
0.0.0.3 Alpha
fb99609
import re
import aiofiles
import json
import logging
def is_valid_url(url):
"""
Validates the URL.
Args:
url (str): The URL to validate.
Returns:
bool: True if the URL is valid, False otherwise.
"""
regex = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return re.match(regex, url) is not None
def convert_to_gb(space_str):
"""
Converts a space string like '50 GB' or '3.33 GB' to a float representing the number of GB.
Args:
space_str (str): The space string to convert.
Returns:
float: The space in GB.
"""
return float(space_str.split()[0])
def bytes_to_human_readable(num, suffix="B"):
"""
Converts bytes to a human-readable format.
Args:
num (int): The number of bytes.
suffix (str): The suffix to use (default is 'B').
Returns:
str: The human-readable string.
"""
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Y{suffix}"
def encode_episodeid(title, season, episode):
"""
Encodes the episode ID based on title, season, and episode.
Args:
title (str): The title of the TV show.
season (str): The season of the TV show.
episode (str): The episode number.
Returns:
str: The encoded episode ID.
"""
return f"{title}_{season}_{episode}"
async def read_json_file(file_path: str):
"""Helper function to read JSON data from a file asynchronously."""
try:
async with aiofiles.open(file_path, 'r') as f:
data = await f.read()
return json.loads(data)
except Exception as e:
raise e
async def read_json_file(file_path):
try:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
data = await f.read()
return json.loads(data)
except Exception as e:
raise e
async def save_to_json(data, path):
"""Save data to a JSON file asynchronously."""
try:
async with aiofiles.open(path, 'w', encoding='utf-8') as f:
await f.write(json.dumps(data, indent=4, ensure_ascii=False))
logging.info(f"Data saved to {path}")
except IOError as e:
logging.error(f"Error saving data to {path}: {e}")