Official Python client library for the RaidFrame API.
pip install raidframe
from raidframe import RaidFrame
rf = RaidFrame(
token=os.environ["RAIDFRAME_TOKEN"],
project="my-saas", # optional
)
projects = rf.projects.list()
project = rf.projects.get("my-saas")
new_project = rf.projects.create(name="new-app", region="us-east-1")
rf.projects.delete("old-app")
services = rf.services.list()
api = rf.services.get("api")
rf.services.scale("api", min=4, max=20)
rf.services.restart("api")
rf.services.create(name="worker", type="worker", command="python worker.py")
deployment = rf.deploy(watch=True)
print(f"Deployed: {deployment.id} ({deployment.version})")
deployments = rf.deployments.list(limit=10)
rf.deployments.rollback()
rf.databases.create(engine="postgres", name="main", plan="pro")
result = rf.databases.query("main", "SELECT count(*) FROM users")
print(result.rows[0]["count"])
rf.databases.backup("main", name="before-migration")
rf.databases.branch("main", name="feature-x")
rf.env.set(STRIPE_KEY="sk_live_xxx", API_SECRET="secret")
value = rf.env.get("STRIPE_KEY")
variables = rf.env.list()
rf.env.delete("OLD_KEY")
logs = rf.logs.query(
service="api",
since="1h",
level="error",
search="timeout",
limit=50,
)
for entry in logs:
print(f"[{entry.service}] {entry.message}")
# Stream logs
for entry in rf.logs.stream(service="api"):
print(f"[{entry.service}] {entry.message}")
metrics = rf.metrics.get(
service="api",
metrics=["cpu_percent", "response_time_p99"],
period="24h",
)
print(f"CPU: {metrics.cpu_percent.avg}%")
print(f"P99: {metrics.response_time_p99.avg}ms")
from raidframe import Queue, Worker
queue = Queue("tasks")
queue.publish({"type": "send-email", "to": "[email protected]"})
def process(job):
send_email(job.data["to"])
worker = Worker("tasks", handler=process, concurrency=10)
worker.start()
from raidframe import Email
email = Email()
email.send(
from_addr="[email protected]",
to="[email protected]",
subject="Welcome",
template="welcome",
data={"name": "Alice"},
)
The SDK supports asyncio:
import asyncio
from raidframe import AsyncRaidFrame
async def main():
rf = AsyncRaidFrame(token=os.environ["RAIDFRAME_TOKEN"])
services = await rf.services.list()
deployment = await rf.deploy()
print(f"Deployed: {deployment.version}")
asyncio.run(main())
from raidframe.exceptions import NotFoundError, RateLimitError
try:
rf.services.get("nonexistent")
except NotFoundError:
print("Service not found")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")