#!/usr/bin/python3
import argparse
import json
import logging
import time
from datetime import datetime
from http import HTTPStatus
from typing import Callable, Dict
import requests
def retry(fn, **kwargs):
response = fn(**kwargs)
if response.status_code == HTTPStatus.SERVICE_UNAVAILABLE:
for attempt in range(1, 11):
time.sleep(0.1 * attempt)
logging.info(f"Trying to execute request to {kwargs['url']} attempt {attempt}")
response = fn(**kwargs)
if response.status_code != HTTPStatus.SERVICE_UNAVAILABLE:
break
return response
def make_json_error(code: int, text: str):
return {
"error_code": code,
"error": text
}
def execute_request(request: Callable, path, output=False, token=None, **kwargs):
cookies = {}
headers = {}
if token:
cookies.update({"ses6": token})
headers.update({"x-xsrf-token": token})
logging.info(f"Executing request to {path}")
response = retry(request, url=path, cookies=cookies, headers=headers, **kwargs)
content = response.content.decode("utf-8")
if response.status_code not in (HTTPStatus.OK, HTTPStatus.CREATED):
logging.error(f"Return code {response.status_code}: {content}")
return make_json_error(response.status_code, content)
if output:
try:
result = json.loads(content)
except json.JSONDecodeError as e:
logging.error(f"Error while parsing response: {content}")
result = make_json_error(HTTPStatus.INTERNAL_SERVER_ERROR.value, e.msg)
return result
return None
class API:
def __init__(self, url, email, password):
self.url = url
self.email = email
self.password = password
self.token = None
def get(self, path, params=None):
return execute_request(requests.get, f"{self.url}{path}", output=True, token=self.token, params=params)
def post(self, path, data = {}):
return execute_request(requests.post, f"{self.url}{path}", output=True, token=self.token, json=data)
def delete(self, path):
return execute_request(requests.delete, f"{self.url}{path}", token=self.token)
def __auth(self):
result = self.post("/auth/v4/public/token", {"email": self.email, "password": self.password})
if result is not None and result.get("error"):
raise RuntimeError(result["error"])
self.token = result["token"]
def __enter__(self):
self.__auth()
return self
def __exit__(self, e_type, e_value, e_traceback):
if self.token:
self.delete(f"/auth/v4/token/{self.token}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Check VM bandwidth")
parser.add_argument("--url", type=str, metavar="url", help=f"Platform url in format \"https://<your_domain>\"", required=True)
parser.add_argument("--email", type=str, metavar="email", help="Platform administrator email", required=True)
parser.add_argument("--password", type=str, metavar="password", help="Platform administrator password", required=True)
parser.add_argument("--vmid", type=int, metavar="vmid", help="Id of the VM", required=True)
parser.add_argument("--threshold-gib", type=int, metavar="threshold_gib", help="VM bandwidth threshold, in GiB", default=1024)
parser.add_argument("--limit-mbitps", type=int, metavar="limit_mbitps", help="VM bandwidth limit after reaching threshold, in Mbit/s", default=10)
return parser.parse_args()
def main():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
args = parse_args()
print(args)
now = datetime.now().strftime('%H:%M_%Y%m%d')
first_day = datetime.today().replace(day=1, hour=0, minute=0).strftime('%H:%M_%Y%m%d')
logging.info(f"Checking bandwidth from {first_day} to {now} for VM {args.vmid}")
with API(args.url, args.email, args.password) as api:
summary_bytes = 0
rx = api.get(f"/vm/v3/host/{args.vmid}/metrics", params = {
"target": "net_rx_summary",
"from": first_day,
"until": now,
"output": "single",
"interval": "1month"
})
logging.debug(rx)
if rx is not None and isinstance(rx, Dict) and rx.get("error"):
raise RuntimeError(rx["error"])
if rx:
datapoints = rx[0].get("datapoints", [])
for point in datapoints:
if point[0]:
summary_bytes += point[0]
tx = api.get(f"/vm/v3/host/{args.vmid}/metrics", params = {
"target": "net_tx_summary",
"from": first_day,
"until": now,
"output": "single",
"interval": "1month"
})
if tx is not None and isinstance(tx, Dict) and tx.get("error"):
raise RuntimeError(tx["error"])
if tx:
datapoints = tx[0].get("datapoints", [])
for point in datapoints:
if point[0]:
summary_bytes += point[0]
if summary_bytes / 2**30 >= args.threshold_gib:
logging.info(f"Threshold {args.threshold_gib} GiB reached, limiting speed to {args.limit_mbitps}")
result = api.post(f"/vm/v3/host/{args.vmid}/resource", data = {
"net_in_mbitps": args.limit_mbitps,
"net_out_mbitps": args.limit_mbitps
})
logging.info(result)
if __name__ == '__main__':
main()