#!/opt/cloudlinux/venv/bin/python3 -bb
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
# isolatectl — unified end-user utility for per-domain resource limits
# and website isolation management. JSON-only output.
#
# Subcommand groups:
# isolatectl limits {list,set,apply} — per-domain (LVD) resource limits
# isolatectl site-isolation {enable,disable,list} — website isolation (CageFS)
import argparse
import json
import logging
import os
import subprocess
import sys
from clcommon.clpwd import ClPwd
from websiteisolation.commands import cmd_apply, cmd_list, cmd_set
from websiteisolation.exceptions import LvdError
VERSION = '1.0.0'
CAGEFSCTL_USER_BIN = '/usr/sbin/cagefsctl-user'
def _setup_logging():
log_dir = os.path.join(ClPwd(min_uid=0).get_pw_by_uid(os.getuid())[0].pw_dir, '.lve')
log_path = os.path.join(log_dir, 'isolatectl.log')
logger = logging.getLogger('websiteisolation')
logger.setLevel(logging.DEBUG)
logger.propagate = False
fmt = logging.Formatter('[%(levelname)s | %(asctime)s]: %(message)s')
os.makedirs(log_dir, mode=0o700, exist_ok=True)
fh = logging.FileHandler(log_path)
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
logger.addHandler(fh)
def _output(data):
"""Print JSON response to stdout."""
print(json.dumps(data, indent=2))
def _error(message, exit_code=1):
"""Print JSON error to stdout and exit."""
_output({'result': 'error', 'message': str(message)})
sys.exit(exit_code)
# ── limits helpers ────────────────────────────────────────────────
def _add_domain_arg(parser, required=True):
parser.add_argument('--domain', required=required, metavar='NAME',
help='domain name')
def _add_limit_args(parser):
parser.add_argument('--cpu', type=int, metavar='VAL',
help='CPU limit (hundredths of percent, 2500 = 25%%)')
parser.add_argument('--pmem', type=int, metavar='VAL',
help='physical memory limit (bytes)')
parser.add_argument('--io', type=int, metavar='VAL',
help='I/O limit (KB/s)')
parser.add_argument('--nproc', type=int, metavar='VAL',
help='max processes')
parser.add_argument('--iops', type=int, metavar='VAL',
help='I/O operations per second')
parser.add_argument('--ep', type=int, metavar='VAL',
help='max entry processes (concurrent connections)')
parser.add_argument('--vmem', type=int, metavar='VAL',
help='virtual memory limit (bytes)')
def _collect_limits(args):
limits = {}
for field in ('cpu', 'pmem', 'io', 'nproc', 'iops', 'ep', 'vmem'):
val = getattr(args, field, None)
if val is not None:
limits[field] = val
return limits
def handle_limits_list(args):
return cmd_list(lve_id=os.geteuid(), domain=args.domain)
def handle_limits_set(args):
limits = _collect_limits(args)
if not limits:
_error("at least one limit (--cpu, --pmem, --io, --nproc, --iops, --ep, --vmem) required")
return cmd_set(os.geteuid(), args.domain, limits)
def handle_limits_apply(args):
return cmd_apply(os.geteuid(), args.domain)
# ── site-isolation helpers ────────────────────────────────────────
def _run_cagefsctl_user(subcmd, extra_args=None):
"""Delegate to cagefsctl-user and pass through its JSON output."""
argv = [CAGEFSCTL_USER_BIN, 'site-isolation-' + subcmd]
if extra_args:
argv.extend(extra_args)
try:
proc = subprocess.run(
argv, capture_output=True, text=True, check=False,
)
except OSError as e:
_error(f"failed to run {CAGEFSCTL_USER_BIN}: {e}")
parsed = None
if proc.stdout.strip():
try:
parsed = json.loads(proc.stdout)
except (json.JSONDecodeError, ValueError):
pass
if proc.returncode != 0:
if parsed is not None:
_output(parsed)
sys.exit(proc.returncode)
stderr = proc.stderr.strip()
_error(f"cagefsctl-user site-isolation-{subcmd} failed "
f"(exit {proc.returncode}): {stderr}")
if parsed is not None:
return parsed
_error('no output from cagefsctl-user')
def handle_site_isolation_enable(args):
return _run_cagefsctl_user('enable', ['--domain', args.domain])
def handle_site_isolation_disable(args):
return _run_cagefsctl_user('disable', ['--domain', args.domain])
def handle_site_isolation_list(args):
return _run_cagefsctl_user('list')
# ── parser ────────────────────────────────────────────────────────
def build_parser():
parser = argparse.ArgumentParser(
prog='isolatectl',
description='Unified utility for per-domain resource limits and '
'website isolation management. JSON-only output.\n\n'
'Note: domain limits require site isolation to be '
'enabled first (isolatectl site-isolation enable).',
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--version', action='version',
version=json.dumps({'version': VERSION}))
groups = parser.add_subparsers(dest='group', metavar='command')
# ── limits ────────────────────────────────────────────────────
limits_parser = groups.add_parser(
'limits',
help='manage per-domain (LVD) resource limits')
limits_sub = limits_parser.add_subparsers(dest='subcommand', metavar='subcommand')
p = limits_sub.add_parser('list', help='show limits for all domains or a specific domain')
_add_domain_arg(p, required=False)
p.set_defaults(func=handle_limits_list)
p = limits_sub.add_parser(
'set', help='store per-domain limits and apply them to kernel',
epilog='Example:\n'
' isolatectl limits set --domain example.com --cpu 5000 --pmem 268435456 --io 2048 --nproc 30 --iops 500 --ep 20 --vmem 536870912\n'
'\n'
'This sets: CPU 50%%, 256 MB PMEM, 2048 KB/s IO, 30 procs, 500 IOPS, 20 entry procs, 512 MB VMEM.\n'
'Use PYLVE_DEBUG=1 for verbose helper output.',
formatter_class=argparse.RawDescriptionHelpFormatter)
_add_domain_arg(p)
_add_limit_args(p)
p.set_defaults(func=handle_limits_set)
p = limits_sub.add_parser('apply', help="push one domain's limits from config to kernel")
_add_domain_arg(p)
p.set_defaults(func=handle_limits_apply)
# ── site-isolation ────────────────────────────────────────────
si_parser = groups.add_parser(
'site-isolation',
help='manage website isolation (CageFS)')
si_sub = si_parser.add_subparsers(dest='subcommand', metavar='subcommand')
p = si_sub.add_parser('enable', help='enable site isolation for domain(s)')
p.add_argument('--domain', required=True,
help='domain name(s) to enable site isolation for (comma-separated)')
p.set_defaults(func=handle_site_isolation_enable)
p = si_sub.add_parser('disable', help='disable site isolation for domain(s)')
p.add_argument('--domain', required=True,
help='domain name(s) to disable site isolation for (comma-separated)')
p.set_defaults(func=handle_site_isolation_disable)
p = si_sub.add_parser('list', help='list domains with site isolation enabled')
p.set_defaults(func=handle_site_isolation_list)
return parser
def main():
if os.getuid() == 0:
_error("isolatectl must not be run as root; run as a regular user")
_setup_logging()
log = logging.getLogger(__name__)
log.debug('Executing "%s"', ' '.join(sys.argv))
is_debug = int(os.environ.get('PYLVE_DEBUG', 0))
if is_debug:
print(f"DEBUG [isolatectl]: uid={os.getuid()} argv={sys.argv}",
file=sys.stderr)
parser = build_parser()
args = parser.parse_args()
if not args.group:
parser.print_help()
sys.exit(1)
if not hasattr(args, 'func'):
# Group given but no subcommand
if args.group == 'limits':
parser.parse_args(['limits', '--help'])
elif args.group == 'site-isolation':
parser.parse_args(['site-isolation', '--help'])
sys.exit(1)
try:
result = args.func(args)
_output(result)
except LvdError as e:
log.error('%s', e)
_error(str(e))
except Exception as e:
log.exception('unexpected error')
_error(f"unexpected error: {e}")
if __name__ == '__main__':
main()