š”ļø title: 'PIP Management - Policy Developer Guide' description: 'Technical guide for developers writing policies and integrating with Control Core PIP system'
š”ļø Policy Information Point (PIP) Management - Developer Guide
š Introduction
This guide provides technical details for developers writing policies, integrating with the PIP API, and building custom connectors for Control Core's Policy Information Point system.
šļø Architecture Deep Dive
System Components
Backend Services:
pip_service.py- Core PIP management, caching, and synchronizationsecrets_service.py- AES-256 credential encryption and secrets managementoauth_service.py- OAuth 2.0 authorization flows and token lifecycleschema_introspection_service.py- Real-time schema discovery from data sourceserp_discovery_service.py- ERP pattern discovery, watched-table orchestration, and semantic vector indexingerp_classification_worker_service.py- Background LLM classification worker for watched tablesscca_llm_settings_service.py- Shared tenant-level LLM resolver (reads Smart CC settings)semantic_embedding_service.py- Embedding generation using SCCA-configured provider, with deterministic continuity fallbackpolicy-bridge_publisher.py- Policy Bridge data publishing and webhook handlingpolicy-bridge_config_generator.py- Policy Bridge data source configuration generationsync_scheduler.py- APScheduler-based automated synchronizationaudit_logger.py- Comprehensive audit trail logging
Data Connectors:
iam_connector.py- IAM/IDP connectors (Okta, Azure AD, Auth0, LDAP)database_connector.py- Database connectors with schema introspectionopenapi_parser.py- OpenAPI/Swagger specification parsing
Shared LLM Contract (SCCA + PIP Discovery)
Deep-schema discovery services must use the same LLM tenant configuration as Smart Control Core Agent:
- Source of truth:
smart_cc_settings(tenant_id,llm_provider,custom_api_key,custom_api_url) - Used by:
- watched-table LLM classification (
erp_classification_worker_service.py) - semantic embedding generation (
semantic_embedding_service.py) - metadata search query embedding (
semantic_metadata_search_service.py)
- watched-table LLM classification (
- Tenant context is resolved from the authenticated actor (
request.state.user_id) and propagated through discovery/search paths.
Design intent:
- one model governance path
- one credential rotation path
- one audit/compliance boundary for AI calls
Database Models:
PIPConnection- Connection configuration and metadataOAuthToken- OAuth token storage with expiration trackingWebhookEvent- Webhook event tracking and replayPIPSyncLog- Sync operation history and audit trailAttributeMapping- Field mapping configurationMetadataCache- semantic tags, confidence, source type, friendly names, and embeddingsPIPWatchedTable- explicit watched-table list for deep classificationPIPSchemaClassificationJob- durable async queue for watched-table LLM classification
Data Flow Architecture
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā 1. Admin Configures Connection (PAP UI) ā
ā POST /api/pip/connections ā
ā {provider: "okta", auth: "oauth", ...} ā
āāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā 2. Connection Test & Schema Discovery ā
ā POST /api/pip/connections/test ā
ā ā Makes real API call to Okta ā
ā ā Discovers user schema (GET /api/v1/meta/...) ā
ā ā Returns: {fields: [{name, type, ...}]} ā
āāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā 3. Attribute Mapping (UI) ā
ā User maps: user.department ā profile.department ā
ā Stored in database: AttributeMapping table ā
āāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā 4. Scheduled Sync (APScheduler) ā
ā - Cron job runs every hour ā
ā - Fetches users: GET /api/v1/users?limit=1000 ā
ā - Applies mappings: profile.department ā dept ā
ā - Transforms to standard format ā
āāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā 5. Publish to Policy Bridge ā
ā POST http://policy-bridge:7002/data/config ā
ā Topic: "policy_data:okta:identity" ā
ā Data: {users: {...}, groups: {...}} ā
āāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā 6. Policy Bridge distributes to ALL PEPs ā
ā - PubSub to all connected clients ā
ā - PEPs update local policy data ā
ā - Available in: data.policy_data.okta.identity ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
š§ API Reference
Connection Management
Create Connection
POST /api/pip/connections
Content-Type: application/json
{
"name": "Production Okta",
"description": "Main identity provider for production",
"connection_type": "identity",
"provider": "okta",
"configuration": {
"domain": "company.okta.com",
"authorization_server": "default",
"auth_url": "https://company.okta.com/oauth2/default/v1/authorize",
"token_url": "https://company.okta.com/oauth2/default/v1/token"
},
"credentials": {
"oauth_token": {
"client_id": "0oa1234567890abcdef",
"client_secret": "secret_1234567890abcdef",
"scopes": "openid profile email groups"
}
},
"sync_enabled": true,
"sync_frequency": "hourly",
"health_check_url": "https://company.okta.com/api/v1/users/me"
}
Response:
{
"id": 1,
"name": "Production Okta",
"connection_type": "identity",
"provider": "okta",
"status": "active",
"last_sync": null,
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:30:00Z"
}
Test Connection
POST /api/pip/connections/test
Content-Type: application/json
{
"connection_type": "identity",
"provider": "okta",
"configuration": {
"domain": "company.okta.com"
},
"credentials": {
"oauth_token": {
"client_id": "...",
"client_secret": "..."
}
}
}
Response:
{
"success": true,
"status": "connected",
"response_time": 0.234,
"details": {
"provider": "okta",
"schema_discovered": true,
"fields": [
{
"name": "id",
"type": "string",
"description": "User ID",
"required": true,
"nullable": false,
"sensitivity": "internal",
"example": "00u1234567890abcdef"
},
{
"name": "email",
"type": "string",
"description": "Email address",
"required": true,
"sensitivity": "internal",
"example": "user@company.com"
},
{
"name": "profile.department",
"type": "string",
"description": "User department",
"required": false,
"sensitivity": "internal",
"example": "Engineering"
}
// ... all real fields from Okta
],
"sample_data": {
"user_count": 1250,
"group_count": 45
}
}
}
OAuth Integration
Initiate OAuth Flow
GET /api/pip/oauth/authorize/okta?connection_id=1
Response:
{
"authorization_url": "https://company.okta.com/oauth2/default/v1/authorize?client_id=...&response_type=code&scope=openid+profile+email+groups&redirect_uri=http://localhost:8000/pip/oauth/callback/okta&state=random-secure-token",
"state": "random-secure-token",
"provider": "okta",
"connection_id": 1
}
Handle OAuth Callback
POST /api/pip/oauth/callback/okta
Content-Type: application/json
{
"code": "authorization-code-from-provider",
"state": "random-secure-token",
"connection_id": 1
}
Response:
{
"success": true,
"provider": "okta",
"connection_id": 1,
"user_info": {
"id": "00u1234567890abcdef",
"email": "admin@company.com",
"name": "Admin User"
},
"token_expires_at": "2024-01-15T11:30:00Z"
}
Policy Bridge Integration
Publish to Policy Bridge
POST /api/pip/connections/1/publish-to-policy-bridge?force_full_sync=false
Response:
{
"success": true,
"message": "Data published successfully",
"response_time": 0.456,
"policy-bridge_response": {
"topics": ["policy_data:okta", "policy_data:identity", "connection:1"],
"records_published": 1250,
"status": "success"
}
}
Get Data Snapshot (What the Policy Bridge Receives)
GET /api/pip/connections/1/data-snapshot
Response:
{
"connection_id": 1,
"provider": "okta",
"connection_type": "identity",
"data": {
"users": {
"00u1234567890abcdef": {
"id": "00u1234567890abcdef",
"email": "john.doe@company.com",
"name": "John Doe",
"groups": ["Engineering", "Developers", "TeamLeads"],
"roles": ["developer", "team-lead"],
"attributes": {
"department": "Engineering",
"title": "Senior Engineer",
"manager": "00u0987654321fedcba",
"clearance_level": "3",
"mfa_enabled": true,
"hire_date": "2020-03-15",
"location": "San Francisco"
},
"last_updated": "2024-01-15T10:30:00Z"
}
// ... all users
},
"groups": {
"00g1234567890abcdef": {
"id": "00g1234567890abcdef",
"name": "Engineering",
"description": "Engineering team",
"members": ["00u1234567890abcdef", "00u2345678901bcdefg"],
"last_updated": "2024-01-15T10:30:00Z"
}
// ... all groups
},
"metadata": {
"provider": "okta",
"connection_id": 1,
"last_updated": "2024-01-15T10:30:00Z",
"user_count": 1250,
"group_count": 45
}
},
"timestamp": "2024-01-15T10:30:00Z",
"version": "1_1705318200"
}
Sync Scheduler
Schedule Automated Sync
POST /api/pip/connections/1/schedule-sync
Response:
{
"success": true,
"message": "Connection 1 scheduled for sync",
"job_id": "sync_1_okta",
"sync_frequency": "hourly"
}
Get Sync Status
GET /api/pip/connections/1/sync-status
Response:
{
"connection_id": 1,
"sync_enabled": true,
"sync_frequency": "hourly",
"job_status": {
"job_id": "sync_1_okta",
"status": "success",
"next_run": "2024-01-15T11:00:00Z",
"last_run": "2024-01-15T10:00:00Z",
"run_count": 24,
"error_count": 0,
"last_error": null
}
}
Trigger Manual Sync
POST /api/pip/connections/1/trigger-sync
š Using PIP Data in Policies
Data Structure in the Policy Cache
When the integration publishes data to the Policy Bridge, it becomes available to the policy engine at:
data.policy_data.<provider>.<connection_type>
Example for Okta:
data.policy_data.okta.identity.users
data.policy_data.okta.identity.groups
data.policy_data.okta.identity.metadata
Example for PostgreSQL:
data.policy_data.postgresql.database.tables.users
data.policy_data.postgresql.database.tables.resources
data.policy_data.postgresql.database.metadata
Accessing User Data
Basic User Lookup:
package app.authorization
import future.keywords.if
import future.keywords.in
# Get user from Okta
user := data.policy_data.okta.identity.users[input.user.id]
# Check user attributes
allow if {
user != null
user.email == input.user.email
user.attributes.department == "Engineering"
}
With Null Safety:
# Safe user lookup with default
get_user(user_id) := user if {
user := data.policy_data.okta.identity.users[user_id]
} else := {
"id": user_id,
"email": "unknown",
"attributes": {}
}
allow if {
user := get_user(input.user.id)
user.attributes.department == "Engineering"
}
Multi-Source Policies
Combining Data from Multiple PIPs:
package app.authorization
import future.keywords.if
import future.keywords.in
# Policy using 3 different data sources
allow if {
# 1. User identity from Okta
okta_user := data.policy_data.okta.identity.users[input.user.id]
okta_user != null
"Engineering" in okta_user.groups
# 2. HR context from Workday
hr_data := data.policy_data.workday.hr.employees[input.user.id]
hr_data.employment_status == "active"
hr_data.clearance_level >= 3
# 3. Resource metadata from PostgreSQL
resource := data.policy_data.postgresql.database.tables.resources[input.resource.id]
resource.owner_id == input.user.id
resource.sensitivity_level in ["public", "internal"]
resource.environment == input.resource.environment
}
Real vs Sample Metadata
Understanding the Metadata Display:
Before Connection Test:
// Sample fields (examples only)
getSampleFields() returns:
[
{ name: "user.email", type: "string" },
{ name: "user.department", type: "string" },
// Generic examples
]
After Successful Connection Test:
// Real fields from Okta API
connectionTestResult.details.fields:
[
{
name: "id",
type: "string",
description: "User ID",
required: true,
sensitivity: "internal",
example: "00u1234567890abcdef"
},
{
name: "profile.department",
type: "string",
description: "Department from Okta profile",
required: false,
sensitivity: "internal",
example: "Engineering"
},
{
name: "profile.customAttribute1",
type: "string",
description: "Your custom Okta attribute",
required: false,
sensitivity: "internal"
}
// ... ALL real fields from your Okta instance
]
How Schema Discovery Works:
For IAM sources (Okta):
# Backend makes real API call
GET https://company.okta.com/api/v1/meta/schemas/user/default
Authorization: SSWS {api_token}
# Okta returns actual schema
{
"definitions": {
"base": {
"properties": {
"login": {"type": "string"},
"email": {"type": "string"},
"firstName": {"type": "string"}
}
},
"custom": {
"properties": {
"clearanceLevel": {"type": "string"},
"department": {"type": "string"},
"manager": {"type": "string"}
}
}
}
}
# Transformed to standard format for frontend
For Databases (PostgreSQL):
-- Backend executes schema introspection
SELECT
column_name,
data_type,
is_nullable,
column_default,
character_maximum_length
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'users';
-- Returns actual database columns
For APIs (OpenAPI):
// Backend parses OpenAPI spec
const spec = await fetch('https://api.example.com/openapi.json');
// Extracts endpoints and models
{
paths: {
"/users/{id}": {
get: {
operationId: "getUser",
parameters: [...],
responses: {
200: {
schema: { $ref: "#/components/schemas/User" }
}
}
}
}
},
components: {
schemas: {
User: {
properties: {
id: {type: "string"},
email: {type: "string", format: "email"},
department: {type: "string"}
},
required: ["id", "email"]
}
}
}
}
// Converted to field list for mapping
š Attribute Mapping Strategy
Standard Attribute Taxonomy
Control Core recommends this attribute naming convention:
User Attributes (identity context):
user.id - Unique identifier
user.email - Email address
user.username - Username/login
user.name - Full name
user.first_name - First name
user.last_name - Last name
user.department - Department name
user.title - Job title
user.manager - Manager ID
user.roles - Role assignments (array)
user.groups - Group memberships (array)
user.permissions - Direct permissions (array)
user.mfa_enabled - MFA status (boolean)
user.clearance_level - Security clearance (string/number)
user.employment_status - Employment status
user.hire_date - Start date
user.location - Work location
user.cost_center - Cost center
user.certifications - Certifications (array)
Resource Attributes (data/asset context):
resource.id - Resource identifier
resource.name - Resource name
resource.type - Resource type
resource.owner_id - Owner user ID
resource.owner_email - Owner email
resource.sensitivity - Data classification
resource.classification - Security classification
resource.environment - Environment (dev/staging/prod)
resource.region - Geographic region
resource.compliance_tags - Compliance tags (array)
resource.status - Status (active/archived/deleted)
resource.created_at - Creation timestamp
resource.updated_at - Last update timestamp
resource.metadata - Additional metadata (object)
API Attributes (endpoint context):
api.path - Endpoint path
api.method - HTTP method
api.operation_id - Operation identifier
api.security - Required security schemes
api.parameters - Required parameters (array)
api.rate_limit - Rate limit tier
api.deprecated - Deprecation status (boolean)
Mapping Best Practices
1. Map Core Identity Attributes First:
// Essential mappings for any identity provider
{
"user.id": "id", // or "sub", "user_id"
"user.email": "email", // or "mail", "emailAddress"
"user.name": "displayName", // or "name", "cn"
"user.roles": "groups", // or "roles", "memberOf"
}
2. Map Provider-Specific Attributes:
For Okta:
{
"user.department": "profile.department",
"user.manager": "profile.manager",
"user.title": "profile.title",
"user.clearance_level": "profile.clearanceLevel"
}
For Azure AD:
{
"user.department": "department",
"user.manager": "manager",
"user.title": "jobTitle",
"user.clearance_level": "extensionAttribute1"
}
3. Handle Nested/Complex Fields:
// Okta groups array
"user.groups": "groups" // Backend flattens to array of group names
// Azure AD nested manager
"user.manager": "manager.id" // Backend extracts nested value
// Custom object
"user.metadata": "profile.customAttributes" // Backend preserves object
š Writing Policies with PIP Data
Pattern 1: Simple Attribute Check
package app.authorization
import future.keywords.if
allow if {
# Get user from PIP
user := data.policy_data.okta.identity.users[input.user.id]
# Check attribute
user.attributes.department == "Engineering"
}
Pattern 2: Group Membership
package app.authorization
import future.keywords.if
import future.keywords.in
allow if {
user := data.policy_data.okta.identity.users[input.user.id]
# Check if user is in group
"Admins" in user.groups
}
# Alternative: Check group members
allow if {
group := data.policy_data.okta.identity.groups["00g_admin_group_id"]
input.user.id in group.members
}
Pattern 3: Resource Ownership
package app.authorization
import future.keywords.if
allow if {
# Get resource from database PIP
resource := data.policy_data.postgresql.database.tables.resources[input.resource.id]
# Check ownership
resource.owner_id == input.user.id
}
# Or with user lookup
allow if {
resource := data.policy_data.postgresql.database.tables.resources[input.resource.id]
user := data.policy_data.okta.identity.users[input.user.id]
resource.owner_id == user.id
}
Pattern 4: Multi-Source Authorization
package app.authorization
import future.keywords.if
import future.keywords.in
# Combines Okta, Workday, and PostgreSQL
allow if {
# Identity check (Okta)
user := data.policy_data.okta.identity.users[input.user.id]
user != null
"Engineering" in user.groups
# Employment check (Workday)
employee := data.policy_data.workday.hr.employees[input.user.id]
employee.employment_status == "active"
employee.clearance_level >= 3
employee.certifications[_] == "PII-Handler"
# Resource check (PostgreSQL)
resource := data.policy_data.postgresql.database.tables.resources[input.resource.id]
resource.environment == "production"
resource.sensitivity_level in ["internal", "confidential"]
# Business logic
input.action == "read"
}
Pattern 5: Dynamic Context with Time
package app.authorization
import future.keywords.if
allow if {
user := data.policy_data.okta.identity.users[input.user.id]
# Check user location
user.attributes.location == "Headquarters"
# Check time (business hours only)
hour := time.clock([time.now_ns()])[0]
hour >= 9
hour < 17
# Check MFA
user.attributes.mfa_enabled == true
}
Pattern 6: Hierarchical Authorization
package app.authorization
import future.keywords.if
# Manager can access their team's resources
allow if {
user := data.policy_data.okta.identity.users[input.user.id]
resource_owner := data.policy_data.okta.identity.users[input.resource.owner_id]
# Check if user is resource owner's manager
resource_owner.attributes.manager == user.id
}
# Or check entire reporting chain
is_in_reporting_chain(user_id, manager_id) if {
user := data.policy_data.okta.identity.users[user_id]
user.attributes.manager == manager_id
}
is_in_reporting_chain(user_id, manager_id) if {
user := data.policy_data.okta.identity.users[user_id]
user.attributes.manager != manager_id
is_in_reporting_chain(user.attributes.manager, manager_id)
}
allow if {
is_in_reporting_chain(input.resource.owner_id, input.user.id)
}
š Webhook Integration
Configuring Webhooks for Real-Time Updates
Webhooks enable instant policy context updates when source data changes.
Webhook Flow:
User updated in Okta
ā
Okta sends webhook
POST /api/pip/webhooks/okta/1
ā
PIP processes update
ā
Publishes to Policy Bridge (incremental)
ā
Policy Bridge pushes to PEPs
ā
Policy decisions use fresh data
(Total latency: < 1 second!)
Webhook Endpoint Format:
POST /api/pip/webhooks/{provider}/{connection_id}
Example Webhook Payload (from Okta):
{
"eventType": "user.lifecycle.update",
"eventId": "event-123",
"published": "2024-01-15T10:35:00Z",
"data": {
"events": [{
"uuid": "00u1234567890abcdef",
"published": "2024-01-15T10:35:00Z",
"eventType": "user.lifecycle.update",
"version": "1.0",
"displayMessage": "User profile updated",
"target": [{
"id": "00u1234567890abcdef",
"type": "User",
"alternateId": "user@company.com",
"displayName": "John Doe"
}]
}]
}
}
PIP Processes and Publishes:
{
"topic": "policy_data:okta:identity:incremental",
"data": {
"users": {
"00u1234567890abcdef": {
"id": "00u1234567890abcdef",
"email": "user@company.com",
"attributes": {
"department": "Sales" // Updated value
},
"last_updated": "2024-01-15T10:35:00Z"
}
}
}
}
Setting Up Webhooks
In Okta:
- Admin Console ā Workflow ā Event Hooks
- Click "Create Event Hook"
- Name: "Control Core PIP Webhook"
- URL:
https://your-pap-domain.com/api/pip/webhooks/okta/1 - Events to subscribe:
- user.lifecycle.create
- user.lifecycle.update
- user.lifecycle.deactivate
- user.lifecycle.activate
- group.user_membership.add
- group.user_membership.remove
- Click "Save & Verify"
In Salesforce:
- Setup ā Platform Events ā Event Subscriptions
- Create event subscription
- Endpoint:
https://your-pap-domain.com/api/pip/webhooks/salesforce/2 - Events: User__e, Account__e, Contact__e, Custom__e
- Enable webhook
In ServiceNow:
- System Web Services ā Outbound ā REST Message
- Create REST message
- Endpoint:
https://your-pap-domain.com/api/pip/webhooks/servicenow/3 - HTTP Method: POST
- Create business rule to trigger on table updates
š Custom Connector Development
Creating a Custom Connector
If your data source isn't supported, you can build a custom connector:
1. Create Connector Class:
# app/connectors/custom_system_connector.py
import aiohttp
from typing import Dict, List, Any
from .base_connector import BaseConnector
class CustomSystemConnector(BaseConnector):
"""Connector for CustomSystem integration"""
def __init__(self, connection_id: int, config: Dict[str, Any], credentials: Dict[str, Any]):
super().__init__(connection_id, config, credentials)
self.api_url = config.get("endpoint", "")
self.api_key = credentials.get("api_key", "")
async def test_connection(self) -> Dict[str, Any]:
"""Test connection to CustomSystem"""
try:
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {self.api_key}"}
async with session.get(f"{self.api_url}/health", headers=headers) as response:
if response.status == 200:
return {"success": True, "status": "connected"}
else:
return {"success": False, "error": f"HTTP {response.status}"}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_schema(self) -> Dict[str, List]:
"""Discover available fields"""
# Implement schema discovery
return {
"users": [
{"name": "id", "type": "string", "required": True},
{"name": "email", "type": "string", "required": True},
{"name": "department", "type": "string", "required": False}
]
}
async def get_users(self, limit: int = 1000) -> List[Dict[str, Any]]:
"""Fetch users from CustomSystem"""
try:
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {self.api_key}"}
async with session.get(
f"{self.api_url}/users?limit={limit}",
headers=headers
) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Failed to fetch users: HTTP {response.status}")
except Exception as e:
raise Exception(f"Error fetching users: {str(e)}")
async def get_groups(self, limit: int = 1000) -> List[Dict[str, Any]]:
"""Fetch groups from CustomSystem"""
# Implement group fetching
pass
2. Register Connector:
# app/connectors/__init__.py
from .okta_connector import OktaConnector
from .custom_system_connector import CustomSystemConnector
CONNECTOR_REGISTRY = {
"okta": OktaConnector,
"azure_ad": AzureADConnector,
"custom_system": CustomSystemConnector # Add here
}
3. Add Integration Template:
# app/data/integration_templates.py
CUSTOM_SYSTEM_TEMPLATE = IntegrationTemplate(
name="Custom System",
description="Custom identity provider integration",
connection_type="identity",
provider="custom_system",
auth_methods=["api_key", "oauth", "basic"],
required_fields=["endpoint", "api_version"],
optional_fields=["timeout", "batch_size"],
default_config={
"api_version": "v1",
"timeout": 30,
"batch_size": 100
},
default_sync_frequency="hourly",
supports_incremental_sync=True,
supports_webhooks=False
)
ā” Performance Optimization
Caching Strategy
PIP-Level Caching (Redis):
# Sensitivity-based TTL
CACHE_TTL = {
"public": 3600, # 1 hour
"internal": 1800, # 30 minutes
"confidential": 300, # 5 minutes
"restricted": 60 # 1 minute
}
Policy Bridge Level:
- Data refresh based on sync frequency
- Real-time for webhook updates
- Pub/sub for instant distribution
PEP-Level (Bouncer):
- Policy cache: 5 minutes
- Decision cache: 1 minute
- Policy cache: Updated by Policy Bridge push
Batch Processing
For large datasets:
# Fetch in batches to avoid timeouts
async def fetch_users_batch(connector, batch_size=100):
offset = 0
all_users = []
while True:
batch = await connector.get_users(limit=batch_size, offset=offset)
if not batch:
break
all_users.extend(batch)
offset += batch_size
# Respect rate limits
await asyncio.sleep(0.1)
return all_users
Incremental Sync Implementation
# Get last sync timestamp
last_sync = connection.last_sync_at
# Query only changed records
if connection.supports_incremental_sync:
query = f"updated_at > '{last_sync}'"
changed_records = await connector.fetch_with_filter(query)
else:
changed_records = await connector.fetch_all()
# Publish only changes to the Policy Bridge
await policy-bridge_publisher.publish_incremental_update(connection, changed_records)
š ļø Testing & Debugging
Local Testing
Test Connection Locally:
curl -X POST http://localhost:8000/api/pip/connections/test \
-H "Content-Type: application/json" \
-d @test_connection.json
Test Data Snapshot:
curl http://localhost:8000/api/pip/connections/1/data-snapshot | jq .
Trigger Manual Sync:
curl -X POST http://localhost:8000/api/pip/connections/1/trigger-sync
Debugging Data Flow
1. Check PIP Fetched Data:
GET /api/pip/connections/1/data-snapshot
2. Check Policy Bridge Received Data:
GET http://localhost:7002/data/policy_data/okta
3. Check PEP Has Data:
GET http://localhost:8080/api/v1/opa/data
4. Test Policy with Data:
curl -X POST http://localhost:8080/v1/data/app/authorization/allow \
-d '{
"input": {
"user": {"id": "00u1234567890abcdef"},
"resource": {"id": "resource-123"},
"action": "read"
}
}'
Common Issues
Issue: Policies can't access PIP data
Debug:
# In your policy, add debugging
package app.authorization
# Check what data exists
users := data.policy_data.okta.identity.users
user_count := count(users)
# This will show in policy decision details
decision := {
"user_count": user_count,
"users_available": [user | user := users[_]]
}
Issue: Stale data in policies
Solutions:
- Check last sync time
- Trigger manual sync
- Verify the Policy Bridge received the update
- Clear PEP cache
- Check sync job status
š Advanced Patterns
Pattern: Just-in-Time Attribute Fetching
For highly sensitive or rapidly changing data:
# Instead of batch sync, fetch on-demand
async def fetch_user_clearance_realtime(user_id):
"""Fetch clearance level in real-time during policy evaluation"""
# Called by PEP during authorization
# Not cached - always fresh
response = await connector.get_user_attribute(user_id, "clearance_level")
return response["clearance_level"]
Pattern: Attribute Transformation
Transform source data before publishing:
# Transform Okta groups array to role hierarchy
def transform_groups_to_roles(groups):
role_hierarchy = {
"Admin-Group": {"role": "admin", "level": 5},
"Manager-Group": {"role": "manager", "level": 4},
"Developer-Group": {"role": "developer", "level": 3}
}
return [role_hierarchy.get(group, {"role": "user", "level": 1}) for group in groups]
Pattern: Data Enrichment
Enrich source data with computed attributes:
# Add computed attributes
user_data["attributes"]["access_level"] = compute_access_level(
user_data["groups"],
user_data["attributes"]["clearance_level"]
)
user_data["attributes"]["risk_score"] = compute_risk_score(
user_data["last_login"],
user_data["mfa_enabled"],
user_data["login_count"]
)
š Migration & Updates
Switching Data Sources
Scenario: Migrating from Okta to Azure AD
Steps:
- Add Azure AD connection (don't delete Okta yet)
- Map attributes to same standard names
- Test with both sources active
- Update policies to use
data.policy_data.azure_adinstead ofdata.policy_data.okta - Verify all policies work
- Disable Okta sync
- Delete Okta connection (after grace period)
Benefit of Standard Attributes: If you mapped correctly, policies need minimal changes!
Schema Changes
Handling Provider Schema Updates:
- Provider adds new field ā Appears in next schema discovery
- Provider renames field ā Update mapping
- Provider removes field ā Policies gracefully handle missing data
- Provider changes field type ā May need policy updates
Best Practice: Use get_value_or_default pattern in policies
š§ API Complete Reference
All PIP Endpoints
Connection CRUD:
POST /api/pip/connections- CreateGET /api/pip/connections- List allGET /api/pip/connections/{id}- Get onePUT /api/pip/connections/{id}- UpdateDELETE /api/pip/connections/{id}- DeletePOST /api/pip/connections/test- Test without savingGET /api/pip/connections/{id}/health- Health check
Deep-Schema Discovery & Semantic Search:
POST /api/pip/connections/{id}/discover- Full discovery pipeline (schema + ERP pattern tags + watched-table queue + indexing)GET /api/pip/connections/{id}/watched-tables- Retrieve watched tablesPUT /api/pip/connections/{id}/watched-tables- Replace watched tablesPOST /api/pip/search-metadata- Natural language semantic metadata searchPOST /api/pip/search_metadata- Alias endpoint for compatibility
OAuth:
GET /api/pip/oauth/authorize/{provider}- Start OAuth flowPOST /api/pip/oauth/callback/{provider}- Handle callbackPOST /api/pip/oauth/refresh/{connection_id}- Refresh token
Policy Bridge:
POST /api/pip/connections/{id}/publish-to-policy-bridge- Publish dataGET /api/pip/connections/{id}/policy-bridge-status- Get statusGET /api/pip/connections/{id}/data-snapshot- Get snapshotGET /api/pip/policy-bridge/config- Get Policy Bridge configGET /api/pip/connections/{id}/policy-bridge-config- Get connection config
Sync:
POST /api/pip/connections/{id}/schedule-sync- ScheduleDELETE /api/pip/connections/{id}/unschedule-sync- UnscheduleGET /api/pip/connections/{id}/sync-status- Get statusPOST /api/pip/connections/{id}/trigger-sync- Manual triggerGET /api/pip/sync/scheduler-stats- StatsGET /api/pip/sync/jobs- All jobs
Webhooks:
POST /api/pip/webhooks/{provider}/{connection_id}- Receive webhook
Templates:
GET /api/pip/integration-templates- List templatesGET /api/pip/integration-templates/{id}- Get template
š ļø Troubleshooting
| Issue | What to check |
|---|---|
| PIP API returns 401 or 403 | Verify API key or token and scope; ensure the caller has access to the PIP/data source. |
| PIP data missing in policy evaluation | Confirm attribute mappings and that the data source sync has run; check policy bundle includes PIP data. |
| Webhook not firing or payload rejected | Validate webhook URL, signature, and payload format; check Control Plane logs for webhook errors. |
| Custom connector fails | Test connection and credentials; ensure TLS and network access from Control Plane to external service. |
| Deep discovery ignores configured model | Verify smart_cc_settings exists for the same tenant/user invoking discovery; check llm_provider/custom_api_url/custom_api_key. |
| Watched-table jobs remain fallback | Confirm scheduler is active and classifier endpoint auth succeeds by validating Smart CC /v1/smart-cc/turn first. |
For more, see the Troubleshooting Guide.
š Support & Resources
Documentation
- PIP Getting Started - For new users
- PIP Admin Guide - For administrators
- Rego Guidelines - For policy authors
- API Reference - Complete API docs
Community
- Community: Use Control Core support channels
- Repository access: Available via your Control Core account team
- Support: support@controlcore.io
You're now equipped to build advanced policy integrations with Control Core's PIP system!