SDK Reference
The SDK is a REST client for cpod-backend. It does not talk to the sidecar directly — the sidecar is an internal implementation detail of the backend. Configure three environment variables and call the API.
The SDK auto-manages OAuth tokens: it fetches a client_credentials JWT on first use, caches it, and refreshes at 80% of the TTL. You never call the token endpoint manually.
Initialization#
Zero credentials. cPod apps hold no API key or client secret. The user logs in through the IdP (Keycloak) via the backend; your app receives a one-time code, swaps it for the token with
exchangeCode, and builds a client withfromToken. The browser never carries the token — only the code. See Authentication for the full flow.fromEnv()is for local dev and headless service accounts only.
import { CpodClient } from '@cpod/sdk'
// 1. send the user to login (backend → IdP)
const { loginUrl } = CpodClient.beginLogin({ backendUrl, returnTo: appUrl + '/auth/callback' })
// 2. on /auth/callback?code=… — swap the code for the token, build the client
const { accessToken, refreshToken } = await CpodClient.exchangeCode({ backendUrl, code })
const client = CpodClient.fromToken(accessToken, { refreshToken }) // silent refresh on 401
// Local dev / headless service account only:
const dev = CpodClient.fromEnv()from cpod import CpodClient
login_url = CpodClient.begin_login(backend_url=backend, return_to=app_url + "/auth/callback")
bundle = await CpodClient.exchange_code(backend_url=backend, code=code)
async with CpodClient.from_token(
bundle["access_token"], refresh_token=bundle["refresh_token"], api_url=backend
) as client:
...
# Local dev / headless service account only:
dev = CpodClient.from_env()loginURL := cpod.BeginLogin(backendURL, appURL+"/auth/callback")
access, refresh, _ := cpod.ExchangeCode(ctx, backendURL, code)
client := cpod.NewFromToken(access, cpod.WithRefreshToken(refresh), cpod.WithBaseURL(backendURL))
// Local dev / headless service account only:
dev, _ := cpod.NewFromEnv()var loginUrl = CpodClient.BeginLogin(backendUrl, appUrl + "/auth/callback");
var (access, refresh) = await CpodClient.ExchangeCodeAsync(backendUrl, code);
var client = CpodClient.FromToken(access, refresh, new CpodOptions { BaseUrl = backendUrl });
// Local dev / headless service account only:
var dev = CpodClient.FromEnv();Auth#
Validates a Bearer token and checks policy. Internally cpod-backend calls AuthService.Authorize on the sidecar — the SDK calls the backend REST endpoint.
authorize(token, action, resource)#
Validates the JWT against your IdP and evaluates Rego policy. Returns an AuthDecision.
decision = await sdk.authorize(
"Bearer eyJhbGci...",
action="read",
resource="/api/orders",
)
print(decision.allowed) # bool
print(decision.claims) # dict — decoded JWT claims
print(decision.reason) # str — why allowed/deniedconst decision = await sdk.authorize(
'Bearer eyJhbGci...',
'/api/orders',
'read'
)
console.log(decision.allowed) // boolean
console.log(decision.claims) // Record<string, unknown>
console.log(decision.reason) // string — why allowed/denieddecision, err := sdk.Authorize(ctx, "Bearer eyJhbGci...")
fmt.Println(decision.Allowed) // bool
fmt.Println(decision.Claims) // map[string]interface{}
fmt.Println(decision.Reason) // stringvalidate_token(token) / revoke_token(token)#
# Validate without policy evaluation
claims = await sdk.validate_token("Bearer eyJhbGci...")
# Revoke a token (adds to deny-list in sidecar)
await sdk.revoke_token("Bearer eyJhbGci...")// Validate without policy evaluation
const claims = await sdk.auth.validateToken('Bearer eyJhbGci...')
// Revoke
await sdk.auth.revokeToken('Bearer eyJhbGci...')claims, err := sdk.Auth.ValidateToken(ctx, "Bearer eyJhbGci...")
err = sdk.Auth.RevokeToken(ctx, "Bearer eyJhbGci...")JobService#
Maps to the JobService gRPC service. Key methods: SubmitJob, GetJob, ListJobs, CancelJob, WatchJob (server-stream), GetJobLogs (server-stream), GetJobOutput.
The Job service runs arbitrary containerized workloads asynchronously. Submit a job with an image and command, stream progress events, then retrieve output files via presigned URLs.
run_job() / jobs.submit() + jobs.watch()#
result = await sdk.run_job(
kind="hello",
image="busybox:1.36",
command=["sh", "-lc"],
args=['echo "hello"'],
env={"GREETING": "howdy"},
inline_files={"prompt.txt": b"hello from the caller"},
user_id="alice@example.com",
timeout_seconds=30,
on_progress=lambda ev: print(f" • {ev.kind} {ev.stage}"),
)
print(result.status, result.exit_code)
for f in result.output.files:
print(f.key, f.presigned_url)With secrets injection — inject a secret bundle as environment variables:
result = await sdk.run_job(
kind="hello",
image="busybox:1.36",
command=["sh", "-lc"],
args=['echo "hello"'],
user_id="alice@example.com",
timeout_seconds=30,
on_progress=lambda ev: print(f" • {ev.kind} {ev.stage}"),
secret_bundles=["demo"], # secrets from the "demo" bundle injected as env vars
)const job = await sdk.jobs.submit({
kind: 'hello',
image: 'busybox:1.36',
command: ['sh', '-lc'],
args: ['echo "hello"'],
env: { GREETING: 'howdy' },
userId: 'alice@example.com',
timeoutSeconds: 30,
})
// Stream progress events (server-streaming gRPC)
for await (const event of sdk.jobs.watch(job.id)) {
console.log(event.kind, event.stage)
}
// Retrieve output files with presigned URLs
const output = await sdk.jobs.getOutput(job.id)
for (const f of output.files) {
console.log(f.key, f.presignedUrl)
}job, err := sdk.Jobs.Submit(ctx, &cpod.JobRequest{
Kind: "hello",
Image: "busybox:1.36",
Command: []string{"sh", "-lc"},
Args: []string{`echo "hello"`},
Env: map[string]string{"GREETING": "howdy"},
UserID: "alice@example.com",
})
// Stream progress events (server-streaming gRPC)
stream, err := sdk.Jobs.Watch(ctx, job.ID)
for event := range stream {
fmt.Println(event.Kind, event.Stage)
}
// Retrieve output
output, err := sdk.Jobs.GetOutput(ctx, job.ID)
for _, f := range output.Files {
fmt.Println(f.Key, f.PresignedURL)
}WatchJob and GetJobLogs are server-streaming gRPC calls — the sidecar pushes events to the SDK as the job progresses. on_progress in Python wraps this stream behind a callback for convenience.
PolicyService#
Maps to the PolicyService gRPC service. Key methods: Evaluate, DryRun, WatchPolicyUpdates (server-stream).
evaluate_policy(path, input)#
Evaluate a specific Rego rule directly. Useful for custom policy checks beyond the standard auth flow.
result = await sdk.evaluate_policy(
path="authz/feature_enabled",
input={
"user_id": "u_123",
"feature": "dark_mode",
"roles": ["viewer"],
},
)
print(result.result) # Any — the value returned by the Rego rule
print(result.allowed) # bool — True if result is truthyconst result = await sdk.policy.evaluate('authz/feature_enabled', {
userId: 'u_123',
feature: 'dark_mode',
roles: ['viewer'],
})
console.log(result.result) // unknown — Rego rule output
console.log(result.allowed) // booleanresult, err := sdk.Policy.Evaluate(ctx, "authz/feature_enabled", map[string]interface{}{
"user_id": "u_123",
"feature": "dark_mode",
"roles": []string{"viewer"},
})
fmt.Println(result.Result) // interface{} — Rego rule output
fmt.Println(result.Allowed) // boolSecretsService#
Maps to the SecretsService gRPC service. Key method: ResolveSecret.
resolve_secret(name)#
Resolve a secret by name from the Vault-backed secrets store. Used at startup or in job setup — prefer secret_bundles in run_job() for injecting secrets into containerized workloads.
secret = await sdk.resolve_secret("db/password")
print(secret.value) # str — the resolved secret valueconst secret = await sdk.secrets.resolve('db/password')
console.log(secret.value)secret, err := sdk.Secrets.Resolve(ctx, "db/password")
fmt.Println(secret.Value)MaskingService#
Maps to the MaskingService gRPC service. Key methods: Mask, MaskString.
mask() / mask_string()#
Mask PII and sensitive fields before logging, storing, or returning data to clients.
# Mask a structured object
masked = await sdk.mask({
"name": "Alice Smith",
"email": "alice@example.com",
"ssn": "123-45-6789",
})
# masked["email"] -> "a***@example.com"
# masked["ssn"] -> "***-**-****"
# Mask a raw string
safe = await sdk.mask_string("Call Alice at 555-867-5309")
# safe -> "Call [NAME] at [PHONE]"// Mask a structured object
const masked = await sdk.masking.mask({
name: 'Alice Smith',
email: 'alice@example.com',
ssn: '123-45-6789',
})
// Mask a raw string
const safe = await sdk.masking.maskString('Call Alice at 555-867-5309')masked, err := sdk.Masking.Mask(ctx, map[string]interface{}{
"name": "Alice Smith",
"email": "alice@example.com",
"ssn": "123-45-6789",
})
safe, err := sdk.Masking.MaskString(ctx, "Call Alice at 555-867-5309")AuditService#
Maps to the AuditService gRPC service. Key methods: EmitAuditEvent, QueryAudit.
emit_audit_event(event)#
Emit a structured audit event. The sidecar persists and forwards it to your configured audit sink.
await sdk.emit_audit_event({
"actor": "alice@example.com",
"action": "delete",
"resource": "/api/documents/doc_123",
"outcome": "allow",
"metadata": {"doc_id": "doc_123", "ip": "10.0.0.1"},
})await sdk.audit.emit({
actor: 'alice@example.com',
action: 'delete',
resource: '/api/documents/doc_123',
outcome: 'allow',
metadata: { docId: 'doc_123', ip: '10.0.0.1' },
})err = sdk.Audit.Emit(ctx, &cpod.AuditEvent{
Actor: "alice@example.com",
Action: "delete",
Resource: "/api/documents/doc_123",
Outcome: "allow",
})FlagService#
Maps to the FlagService gRPC service. Key method: EvaluateFlag.
evaluate_flag(flag, context)#
Evaluate a feature flag against the given context. Returns a boolean decision from the sidecar.
enabled = await sdk.evaluate_flag("dark_mode", {
"user_id": "alice@example.com",
"plan": "pro",
})
if enabled:
return render_dark_theme()const enabled = await sdk.flags.evaluate('dark_mode', {
userId: 'alice@example.com',
plan: 'pro',
})
if (enabled) {
return renderDarkTheme()
}enabled, err := sdk.Flags.Evaluate(ctx, "dark_mode", map[string]interface{}{
"user_id": "alice@example.com",
"plan": "pro",
})RateLimitService#
Maps to the RateLimitService gRPC service. Key methods: CheckRateLimit, ResetRateLimit.
check_rate_limit(key, limit, window)#
Check a token-bucket rate limit for the given key. Returns whether the request is allowed and the remaining quota.
result = await sdk.check_rate_limit(
key=f"user:{user_id}:api",
limit=100,
window_seconds=60,
)
if not result.allowed:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
print(result.remaining) # int — remaining tokens in this windowconst result = await sdk.rateLimit.check({
key: `user:${userId}:api`,
limit: 100,
windowSeconds: 60,
})
if (!result.allowed) {
throw new Error('429 Rate limit exceeded')
}
console.log(result.remaining)result, err := sdk.RateLimit.Check(ctx, &cpod.RateLimitRequest{
Key: fmt.Sprintf("user:%s:api", userID),
Limit: 100,
WindowSeconds: 60,
})
if !result.Allowed {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}Environment Variables#
These are the env vars your application sets to configure the SDK (i.e., where to find cpod-backend):
| Variable | Default | Description |
|---|---|---|
CPOD_API_URL | https://api.cyberpod.app | cpod-backend REST API base URL |
CPOD_CLIENT_ID | — | OAuth client_id for your app |
CPOD_CLIENT_SECRET | — | OAuth client_secret (store in secrets manager, never in code) |
CPOD_TENANT_ID | default | Tenant identifier |
The following are internal cpod-backend config — set by ops, not by SDK consumers:
| Variable | Description |
|---|---|
CORESDK_SIDECAR_ADDR | gRPC address of the sidecar (cpod-backend → sidecar, loopback: [::1]:50051) |
CORESDK_ENV | Set to development to disable JWT validation (never in production) |
CORESDK_JWKS_URI | IdP JWKS endpoint for production JWT validation |
CORESDK_CONTROL_PLANE_URL | Control plane REST API — internal admin only |
Error Types#
from cpod.errors import (
CpodError, # Base error class
NotFoundError, # 404 — entity not found
AuthError, # JWT invalid or expired
ValidationError, # 422 — invalid input fields
RateLimitError, # 429 — rate limit exceeded
)
try:
vuln = await sdk.risk.get_vulnerability("vuln-nonexistent")
except NotFoundError as e:
print(f"Not found: {e.message}")
except AuthError as e:
print(f"Auth error ({e.status}): {e.message}")
except RateLimitError as e:
print(f"Rate limited. Retry in {e.retry_after}s")
except CpodError as e:
print(f"API error {e.status} [{e.code}]: {e.message}")import {
CpodError,
ApiError,
AuthenticationError,
NotFoundError,
RateLimitError,
TimeoutError,
} from '@cpod/sdk'
try {
const person = await client.people.get('per-nonexistent')
} catch (err) {
if (err instanceof NotFoundError) {
console.error('Person not found:', err.message)
} else if (err instanceof AuthenticationError) {
console.error('Auth failed — check your CPOD_API_KEY:', err.message)
} else if (err instanceof RateLimitError) {
console.error(`Rate limited. Retry after ${err.retryAfter}s`)
} else if (err instanceof TimeoutError) {
console.error('Request timed out')
} else if (err instanceof ApiError) {
console.error(`API error ${err.statusCode}: ${err.message}`)
}
}import cpodErrors "github.com/cpod-ai/cpod-sdk-go/errors"
vuln, err := sdk.Risk.GetVulnerability(ctx, "vuln-nonexistent")
if err != nil {
switch {
case cpodErrors.IsNotFound(err):
log.Println("vulnerability not found:", err)
case cpodErrors.IsAuth(err):
log.Println("auth error:", err)
case cpodErrors.IsRateLimit(err):
log.Println("rate limited:", err)
default:
log.Println("API error:", err)
}
}PolicyDeniedError is only raised when the sidecar returns a hard denial. If AuthDecision.allowed is false without an error, the policy evaluated successfully and returned deny — inspect decision.reason for details.
Choosing a capability service: apps vs mcp vs registry#
These three are easy to confuse — two of them touch the same app‑MCP‑tools endpoints, and the third is a separate system. Pick with this guide:
| Service | Use it to… | Endpoints |
|---|---|---|
client.apps | Manage app identity + an app's MCP tools by explicit appId | /api/v1/apps/register, /api/v1/apps/me, /api/v1/apps/{appId}/mcp/tools |
client.mcp | Manage your own app's MCP tools (pre‑bound to your appId) — register / list / unregister / proxy / call | /api/v1/apps/{yourAppId}/mcp/* |
client.registry | The cross‑app capability registry — register a manifest, heartbeat, discover & call tools across apps | /api/v1/registry/{register,heartbeat,tools,call} |
apps and mcp overlap on purpose. client.mcp is a convenience wrapper pre‑bound to your app's appId; client.apps.registerTools(appId, …) does the same with an explicit id. client.registry is a separate system — the live, cross‑app capability catalog. Rule of thumb: apps = identity + your tools (you pass the id), mcp = the same pre‑bound to your app, registry = discover/call tools provided by other apps.
When registering, base_url is optional — required only when a tool is private_proxy (the backend proxies calls to that URL).
Accessing the EDM#
Each entity is reached as client.<domain>.<entity> — e.g. Person:
TypeScript
await client.people.persons.list() // GET /api/v1/people/personsPython
await client.people.list() # GET /api/v1/people/personsGo
client.People.List(ctx, nil) // GET /api/v1/people/persons.NET
await client.People.ListAsync(); // GET /api/v1/people/personsWhat's out of SDK scope#
The SDK is a typed REST client for cpod-backend. It does not bundle analytics or search query engines — use the platform's analytics surface directly for those.