šŸ›”ļø title: 'PIP Management - Policy Developer Guide' description: 'Technical guide for developers writing policies and integrating with Control Core PIP system'

šŸ›”ļø Policy Information Point (PIP) Management - Developer Guide

šŸ“Œ Introduction

This guide provides technical details for developers writing policies, integrating with the PIP API, and building custom connectors for Control Core's Policy Information Point system.

šŸ—ļø Architecture Deep Dive

System Components

Backend Services:

  • pip_service.py - Core PIP management, caching, and synchronization
  • secrets_service.py - AES-256 credential encryption and secrets management
  • oauth_service.py - OAuth 2.0 authorization flows and token lifecycle
  • schema_introspection_service.py - Real-time schema discovery from data sources
  • erp_discovery_service.py - ERP pattern discovery, watched-table orchestration, and semantic vector indexing
  • erp_classification_worker_service.py - Background LLM classification worker for watched tables
  • scca_llm_settings_service.py - Shared tenant-level LLM resolver (reads Smart CC settings)
  • semantic_embedding_service.py - Embedding generation using SCCA-configured provider, with deterministic continuity fallback
  • policy-bridge_publisher.py - Policy Bridge data publishing and webhook handling
  • policy-bridge_config_generator.py - Policy Bridge data source configuration generation
  • sync_scheduler.py - APScheduler-based automated synchronization
  • audit_logger.py - Comprehensive audit trail logging

Data Connectors:

  • iam_connector.py - IAM/IDP connectors (Okta, Azure AD, Auth0, LDAP)
  • database_connector.py - Database connectors with schema introspection
  • openapi_parser.py - OpenAPI/Swagger specification parsing

Shared LLM Contract (SCCA + PIP Discovery)

Deep-schema discovery services must use the same LLM tenant configuration as Smart Control Core Agent:

  • Source of truth: smart_cc_settings (tenant_id, llm_provider, custom_api_key, custom_api_url)
  • Used by:
    • watched-table LLM classification (erp_classification_worker_service.py)
    • semantic embedding generation (semantic_embedding_service.py)
    • metadata search query embedding (semantic_metadata_search_service.py)
  • Tenant context is resolved from the authenticated actor (request.state.user_id) and propagated through discovery/search paths.

Design intent:

  • one model governance path
  • one credential rotation path
  • one audit/compliance boundary for AI calls

Database Models:

  • PIPConnection - Connection configuration and metadata
  • OAuthToken - OAuth token storage with expiration tracking
  • WebhookEvent - Webhook event tracking and replay
  • PIPSyncLog - Sync operation history and audit trail
  • AttributeMapping - Field mapping configuration
  • MetadataCache - semantic tags, confidence, source type, friendly names, and embeddings
  • PIPWatchedTable - explicit watched-table list for deep classification
  • PIPSchemaClassificationJob - durable async queue for watched-table LLM classification

Data Flow Architecture

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ 1. Admin Configures Connection (PAP UI)             │
│    POST /api/pip/connections                         │
│    {provider: "okta", auth: "oauth", ...}            │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                     │
                     ā–¼
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ 2. Connection Test & Schema Discovery               │
│    POST /api/pip/connections/test                    │
│    → Makes real API call to Okta                     │
│    → Discovers user schema (GET /api/v1/meta/...)    │
│    → Returns: {fields: [{name, type, ...}]}          │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                     │
                     ā–¼
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ 3. Attribute Mapping (UI)                            │
│    User maps: user.department → profile.department   │
│    Stored in database: AttributeMapping table        │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                     │
                     ā–¼
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ 4. Scheduled Sync (APScheduler)                      │
│    - Cron job runs every hour                        │
│    - Fetches users: GET /api/v1/users?limit=1000     │
│    - Applies mappings: profile.department → dept     │
│    - Transforms to standard format                   │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                     │
                     ā–¼
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ 5. Publish to Policy Bridge                          │
│    POST http://policy-bridge:7002/data/config                 │
│    Topic: "policy_data:okta:identity"                │
│    Data: {users: {...}, groups: {...}}               │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                     │
                     ā–¼
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ 6. Policy Bridge distributes to ALL PEPs             │
│    - PubSub to all connected clients                 │
│    - PEPs update local policy data                   │
│    - Available in: data.policy_data.okta.identity    │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

šŸ”§ API Reference

Connection Management

Create Connection

POST /api/pip/connections
Content-Type: application/json

{
  "name": "Production Okta",
  "description": "Main identity provider for production",
  "connection_type": "identity",
  "provider": "okta",
  "configuration": {
    "domain": "company.okta.com",
    "authorization_server": "default",
    "auth_url": "https://company.okta.com/oauth2/default/v1/authorize",
    "token_url": "https://company.okta.com/oauth2/default/v1/token"
  },
  "credentials": {
    "oauth_token": {
      "client_id": "0oa1234567890abcdef",
      "client_secret": "secret_1234567890abcdef",
      "scopes": "openid profile email groups"
    }
  },
  "sync_enabled": true,
  "sync_frequency": "hourly",
  "health_check_url": "https://company.okta.com/api/v1/users/me"
}

Response:

{
  "id": 1,
  "name": "Production Okta",
  "connection_type": "identity",
  "provider": "okta",
  "status": "active",
  "last_sync": null,
  "created_at": "2024-01-15T10:30:00Z",
  "updated_at": "2024-01-15T10:30:00Z"
}

Test Connection

POST /api/pip/connections/test
Content-Type: application/json

{
  "connection_type": "identity",
  "provider": "okta",
  "configuration": {
    "domain": "company.okta.com"
  },
  "credentials": {
    "oauth_token": {
      "client_id": "...",
      "client_secret": "..."
    }
  }
}

Response:

{
  "success": true,
  "status": "connected",
  "response_time": 0.234,
  "details": {
    "provider": "okta",
    "schema_discovered": true,
    "fields": [
      {
        "name": "id",
        "type": "string",
        "description": "User ID",
        "required": true,
        "nullable": false,
        "sensitivity": "internal",
        "example": "00u1234567890abcdef"
      },
      {
        "name": "email",
        "type": "string",
        "description": "Email address",
        "required": true,
        "sensitivity": "internal",
        "example": "user@company.com"
      },
      {
        "name": "profile.department",
        "type": "string",
        "description": "User department",
        "required": false,
        "sensitivity": "internal",
        "example": "Engineering"
      }
      // ... all real fields from Okta
    ],
    "sample_data": {
      "user_count": 1250,
      "group_count": 45
    }
  }
}

OAuth Integration

Initiate OAuth Flow

GET /api/pip/oauth/authorize/okta?connection_id=1

Response:

{
  "authorization_url": "https://company.okta.com/oauth2/default/v1/authorize?client_id=...&response_type=code&scope=openid+profile+email+groups&redirect_uri=http://localhost:8000/pip/oauth/callback/okta&state=random-secure-token",
  "state": "random-secure-token",
  "provider": "okta",
  "connection_id": 1
}

Handle OAuth Callback

POST /api/pip/oauth/callback/okta
Content-Type: application/json

{
  "code": "authorization-code-from-provider",
  "state": "random-secure-token",
  "connection_id": 1
}

Response:

{
  "success": true,
  "provider": "okta",
  "connection_id": 1,
  "user_info": {
    "id": "00u1234567890abcdef",
    "email": "admin@company.com",
    "name": "Admin User"
  },
  "token_expires_at": "2024-01-15T11:30:00Z"
}

Policy Bridge Integration

Publish to Policy Bridge

POST /api/pip/connections/1/publish-to-policy-bridge?force_full_sync=false

Response:

{
  "success": true,
  "message": "Data published successfully",
  "response_time": 0.456,
  "policy-bridge_response": {
    "topics": ["policy_data:okta", "policy_data:identity", "connection:1"],
    "records_published": 1250,
    "status": "success"
  }
}

Get Data Snapshot (What the Policy Bridge Receives)

GET /api/pip/connections/1/data-snapshot

Response:

{
  "connection_id": 1,
  "provider": "okta",
  "connection_type": "identity",
  "data": {
    "users": {
      "00u1234567890abcdef": {
        "id": "00u1234567890abcdef",
        "email": "john.doe@company.com",
        "name": "John Doe",
        "groups": ["Engineering", "Developers", "TeamLeads"],
        "roles": ["developer", "team-lead"],
        "attributes": {
          "department": "Engineering",
          "title": "Senior Engineer",
          "manager": "00u0987654321fedcba",
          "clearance_level": "3",
          "mfa_enabled": true,
          "hire_date": "2020-03-15",
          "location": "San Francisco"
        },
        "last_updated": "2024-01-15T10:30:00Z"
      }
      // ... all users
    },
    "groups": {
      "00g1234567890abcdef": {
        "id": "00g1234567890abcdef",
        "name": "Engineering",
        "description": "Engineering team",
        "members": ["00u1234567890abcdef", "00u2345678901bcdefg"],
        "last_updated": "2024-01-15T10:30:00Z"
      }
      // ... all groups
    },
    "metadata": {
      "provider": "okta",
      "connection_id": 1,
      "last_updated": "2024-01-15T10:30:00Z",
      "user_count": 1250,
      "group_count": 45
    }
  },
  "timestamp": "2024-01-15T10:30:00Z",
  "version": "1_1705318200"
}

Sync Scheduler

Schedule Automated Sync

POST /api/pip/connections/1/schedule-sync

Response:

{
  "success": true,
  "message": "Connection 1 scheduled for sync",
  "job_id": "sync_1_okta",
  "sync_frequency": "hourly"
}

Get Sync Status

GET /api/pip/connections/1/sync-status

Response:

{
  "connection_id": 1,
  "sync_enabled": true,
  "sync_frequency": "hourly",
  "job_status": {
    "job_id": "sync_1_okta",
    "status": "success",
    "next_run": "2024-01-15T11:00:00Z",
    "last_run": "2024-01-15T10:00:00Z",
    "run_count": 24,
    "error_count": 0,
    "last_error": null
  }
}

Trigger Manual Sync

POST /api/pip/connections/1/trigger-sync

šŸ“Œ Using PIP Data in Policies

Data Structure in the Policy Cache

When the integration publishes data to the Policy Bridge, it becomes available to the policy engine at:

data.policy_data.<provider>.<connection_type>

Example for Okta:

data.policy_data.okta.identity.users
data.policy_data.okta.identity.groups
data.policy_data.okta.identity.metadata

Example for PostgreSQL:

data.policy_data.postgresql.database.tables.users
data.policy_data.postgresql.database.tables.resources
data.policy_data.postgresql.database.metadata

Accessing User Data

Basic User Lookup:

package app.authorization

import future.keywords.if
import future.keywords.in

# Get user from Okta
user := data.policy_data.okta.identity.users[input.user.id]

# Check user attributes
allow if {
  user != null
  user.email == input.user.email
  user.attributes.department == "Engineering"
}

With Null Safety:

# Safe user lookup with default
get_user(user_id) := user if {
  user := data.policy_data.okta.identity.users[user_id]
} else := {
  "id": user_id,
  "email": "unknown",
  "attributes": {}
}

allow if {
  user := get_user(input.user.id)
  user.attributes.department == "Engineering"
}

Multi-Source Policies

Combining Data from Multiple PIPs:

package app.authorization

import future.keywords.if
import future.keywords.in

# Policy using 3 different data sources
allow if {
  # 1. User identity from Okta
  okta_user := data.policy_data.okta.identity.users[input.user.id]
  okta_user != null
  "Engineering" in okta_user.groups
  
  # 2. HR context from Workday
  hr_data := data.policy_data.workday.hr.employees[input.user.id]
  hr_data.employment_status == "active"
  hr_data.clearance_level >= 3
  
  # 3. Resource metadata from PostgreSQL
  resource := data.policy_data.postgresql.database.tables.resources[input.resource.id]
  resource.owner_id == input.user.id
  resource.sensitivity_level in ["public", "internal"]
  resource.environment == input.resource.environment
}

Real vs Sample Metadata

Understanding the Metadata Display:

Before Connection Test:

// Sample fields (examples only)
getSampleFields() returns:
[
  { name: "user.email", type: "string" },
  { name: "user.department", type: "string" },
  // Generic examples
]

After Successful Connection Test:

// Real fields from Okta API
connectionTestResult.details.fields:
[
  {
    name: "id",
    type: "string",
    description: "User ID",
    required: true,
    sensitivity: "internal",
    example: "00u1234567890abcdef"
  },
  {
    name: "profile.department",
    type: "string",
    description: "Department from Okta profile",
    required: false,
    sensitivity: "internal",
    example: "Engineering"
  },
  {
    name: "profile.customAttribute1",
    type: "string",
    description: "Your custom Okta attribute",
    required: false,
    sensitivity: "internal"
  }
  // ... ALL real fields from your Okta instance
]

How Schema Discovery Works:

For IAM sources (Okta):

# Backend makes real API call
GET https://company.okta.com/api/v1/meta/schemas/user/default
Authorization: SSWS {api_token}

# Okta returns actual schema
{
  "definitions": {
    "base": {
      "properties": {
        "login": {"type": "string"},
        "email": {"type": "string"},
        "firstName": {"type": "string"}
      }
    },
    "custom": {
      "properties": {
        "clearanceLevel": {"type": "string"},
        "department": {"type": "string"},
        "manager": {"type": "string"}
      }
    }
  }
}

# Transformed to standard format for frontend

For Databases (PostgreSQL):

-- Backend executes schema introspection
SELECT 
  column_name,
  data_type,
  is_nullable,
  column_default,
  character_maximum_length
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'users';

-- Returns actual database columns

For APIs (OpenAPI):

// Backend parses OpenAPI spec
const spec = await fetch('https://api.example.com/openapi.json');

// Extracts endpoints and models
{
  paths: {
    "/users/{id}": {
      get: {
        operationId: "getUser",
        parameters: [...],
        responses: {
          200: {
            schema: { $ref: "#/components/schemas/User" }
          }
        }
      }
    }
  },
  components: {
    schemas: {
      User: {
        properties: {
          id: {type: "string"},
          email: {type: "string", format: "email"},
          department: {type: "string"}
        },
        required: ["id", "email"]
      }
    }
  }
}

// Converted to field list for mapping

šŸ“Œ Attribute Mapping Strategy

Standard Attribute Taxonomy

Control Core recommends this attribute naming convention:

User Attributes (identity context):

user.id                  - Unique identifier
user.email               - Email address
user.username            - Username/login
user.name                - Full name
user.first_name          - First name
user.last_name           - Last name
user.department          - Department name
user.title               - Job title
user.manager             - Manager ID
user.roles               - Role assignments (array)
user.groups              - Group memberships (array)
user.permissions         - Direct permissions (array)
user.mfa_enabled         - MFA status (boolean)
user.clearance_level     - Security clearance (string/number)
user.employment_status   - Employment status
user.hire_date           - Start date
user.location            - Work location
user.cost_center         - Cost center
user.certifications      - Certifications (array)

Resource Attributes (data/asset context):

resource.id              - Resource identifier
resource.name            - Resource name
resource.type            - Resource type
resource.owner_id        - Owner user ID
resource.owner_email     - Owner email
resource.sensitivity     - Data classification
resource.classification  - Security classification
resource.environment     - Environment (dev/staging/prod)
resource.region          - Geographic region
resource.compliance_tags - Compliance tags (array)
resource.status          - Status (active/archived/deleted)
resource.created_at      - Creation timestamp
resource.updated_at      - Last update timestamp
resource.metadata        - Additional metadata (object)

API Attributes (endpoint context):

api.path                 - Endpoint path
api.method               - HTTP method
api.operation_id         - Operation identifier
api.security             - Required security schemes
api.parameters           - Required parameters (array)
api.rate_limit           - Rate limit tier
api.deprecated           - Deprecation status (boolean)

Mapping Best Practices

1. Map Core Identity Attributes First:

// Essential mappings for any identity provider
{
  "user.id": "id",              // or "sub", "user_id"
  "user.email": "email",        // or "mail", "emailAddress"
  "user.name": "displayName",   // or "name", "cn"
  "user.roles": "groups",       // or "roles", "memberOf"
}

2. Map Provider-Specific Attributes:

For Okta:

{
  "user.department": "profile.department",
  "user.manager": "profile.manager",
  "user.title": "profile.title",
  "user.clearance_level": "profile.clearanceLevel"
}

For Azure AD:

{
  "user.department": "department",
  "user.manager": "manager",
  "user.title": "jobTitle",
  "user.clearance_level": "extensionAttribute1"
}

3. Handle Nested/Complex Fields:

// Okta groups array
"user.groups": "groups"  // Backend flattens to array of group names

// Azure AD nested manager
"user.manager": "manager.id"  // Backend extracts nested value

// Custom object
"user.metadata": "profile.customAttributes"  // Backend preserves object

šŸ“Œ Writing Policies with PIP Data

Pattern 1: Simple Attribute Check

package app.authorization

import future.keywords.if

allow if {
  # Get user from PIP
  user := data.policy_data.okta.identity.users[input.user.id]
  
  # Check attribute
  user.attributes.department == "Engineering"
}

Pattern 2: Group Membership

package app.authorization

import future.keywords.if
import future.keywords.in

allow if {
  user := data.policy_data.okta.identity.users[input.user.id]
  
  # Check if user is in group
  "Admins" in user.groups
}

# Alternative: Check group members
allow if {
  group := data.policy_data.okta.identity.groups["00g_admin_group_id"]
  input.user.id in group.members
}

Pattern 3: Resource Ownership

package app.authorization

import future.keywords.if

allow if {
  # Get resource from database PIP
  resource := data.policy_data.postgresql.database.tables.resources[input.resource.id]
  
  # Check ownership
  resource.owner_id == input.user.id
}

# Or with user lookup
allow if {
  resource := data.policy_data.postgresql.database.tables.resources[input.resource.id]
  user := data.policy_data.okta.identity.users[input.user.id]
  
  resource.owner_id == user.id
}

Pattern 4: Multi-Source Authorization

package app.authorization

import future.keywords.if
import future.keywords.in

# Combines Okta, Workday, and PostgreSQL
allow if {
  # Identity check (Okta)
  user := data.policy_data.okta.identity.users[input.user.id]
  user != null
  "Engineering" in user.groups
  
  # Employment check (Workday)
  employee := data.policy_data.workday.hr.employees[input.user.id]
  employee.employment_status == "active"
  employee.clearance_level >= 3
  employee.certifications[_] == "PII-Handler"
  
  # Resource check (PostgreSQL)
  resource := data.policy_data.postgresql.database.tables.resources[input.resource.id]
  resource.environment == "production"
  resource.sensitivity_level in ["internal", "confidential"]
  
  # Business logic
  input.action == "read"
}

Pattern 5: Dynamic Context with Time

package app.authorization

import future.keywords.if

allow if {
  user := data.policy_data.okta.identity.users[input.user.id]
  
  # Check user location
  user.attributes.location == "Headquarters"
  
  # Check time (business hours only)
  hour := time.clock([time.now_ns()])[0]
  hour >= 9
  hour < 17
  
  # Check MFA
  user.attributes.mfa_enabled == true
}

Pattern 6: Hierarchical Authorization

package app.authorization

import future.keywords.if

# Manager can access their team's resources
allow if {
  user := data.policy_data.okta.identity.users[input.user.id]
  resource_owner := data.policy_data.okta.identity.users[input.resource.owner_id]
  
  # Check if user is resource owner's manager
  resource_owner.attributes.manager == user.id
}

# Or check entire reporting chain
is_in_reporting_chain(user_id, manager_id) if {
  user := data.policy_data.okta.identity.users[user_id]
  user.attributes.manager == manager_id
}

is_in_reporting_chain(user_id, manager_id) if {
  user := data.policy_data.okta.identity.users[user_id]
  user.attributes.manager != manager_id
  is_in_reporting_chain(user.attributes.manager, manager_id)
}

allow if {
  is_in_reporting_chain(input.resource.owner_id, input.user.id)
}

šŸ”Œ Webhook Integration

Configuring Webhooks for Real-Time Updates

Webhooks enable instant policy context updates when source data changes.

Webhook Flow:

User updated in Okta
      ↓
Okta sends webhook
POST /api/pip/webhooks/okta/1
      ↓
PIP processes update
      ↓
Publishes to Policy Bridge (incremental)
      ↓
Policy Bridge pushes to PEPs
      ↓
Policy decisions use fresh data
(Total latency: < 1 second!)

Webhook Endpoint Format:

POST /api/pip/webhooks/{provider}/{connection_id}

Example Webhook Payload (from Okta):

{
  "eventType": "user.lifecycle.update",
  "eventId": "event-123",
  "published": "2024-01-15T10:35:00Z",
  "data": {
    "events": [{
      "uuid": "00u1234567890abcdef",
      "published": "2024-01-15T10:35:00Z",
      "eventType": "user.lifecycle.update",
      "version": "1.0",
      "displayMessage": "User profile updated",
      "target": [{
        "id": "00u1234567890abcdef",
        "type": "User",
        "alternateId": "user@company.com",
        "displayName": "John Doe"
      }]
    }]
  }
}

PIP Processes and Publishes:

{
  "topic": "policy_data:okta:identity:incremental",
  "data": {
    "users": {
      "00u1234567890abcdef": {
        "id": "00u1234567890abcdef",
        "email": "user@company.com",
        "attributes": {
          "department": "Sales"  // Updated value
        },
        "last_updated": "2024-01-15T10:35:00Z"
      }
    }
  }
}

Setting Up Webhooks

In Okta:

  1. Admin Console → Workflow → Event Hooks
  2. Click "Create Event Hook"
  3. Name: "Control Core PIP Webhook"
  4. URL: https://your-pap-domain.com/api/pip/webhooks/okta/1
  5. Events to subscribe:
    • user.lifecycle.create
    • user.lifecycle.update
    • user.lifecycle.deactivate
    • user.lifecycle.activate
    • group.user_membership.add
    • group.user_membership.remove
  6. Click "Save & Verify"

In Salesforce:

  1. Setup → Platform Events → Event Subscriptions
  2. Create event subscription
  3. Endpoint: https://your-pap-domain.com/api/pip/webhooks/salesforce/2
  4. Events: User__e, Account__e, Contact__e, Custom__e
  5. Enable webhook

In ServiceNow:

  1. System Web Services → Outbound → REST Message
  2. Create REST message
  3. Endpoint: https://your-pap-domain.com/api/pip/webhooks/servicenow/3
  4. HTTP Method: POST
  5. Create business rule to trigger on table updates

šŸ“Œ Custom Connector Development

Creating a Custom Connector

If your data source isn't supported, you can build a custom connector:

1. Create Connector Class:

# app/connectors/custom_system_connector.py

import aiohttp
from typing import Dict, List, Any
from .base_connector import BaseConnector

class CustomSystemConnector(BaseConnector):
    """Connector for CustomSystem integration"""
    
    def __init__(self, connection_id: int, config: Dict[str, Any], credentials: Dict[str, Any]):
        super().__init__(connection_id, config, credentials)
        self.api_url = config.get("endpoint", "")
        self.api_key = credentials.get("api_key", "")
    
    async def test_connection(self) -> Dict[str, Any]:
        """Test connection to CustomSystem"""
        try:
            async with aiohttp.ClientSession() as session:
                headers = {"Authorization": f"Bearer {self.api_key}"}
                async with session.get(f"{self.api_url}/health", headers=headers) as response:
                    if response.status == 200:
                        return {"success": True, "status": "connected"}
                    else:
                        return {"success": False, "error": f"HTTP {response.status}"}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def get_schema(self) -> Dict[str, List]:
        """Discover available fields"""
        # Implement schema discovery
        return {
            "users": [
                {"name": "id", "type": "string", "required": True},
                {"name": "email", "type": "string", "required": True},
                {"name": "department", "type": "string", "required": False}
            ]
        }
    
    async def get_users(self, limit: int = 1000) -> List[Dict[str, Any]]:
        """Fetch users from CustomSystem"""
        try:
            async with aiohttp.ClientSession() as session:
                headers = {"Authorization": f"Bearer {self.api_key}"}
                async with session.get(
                    f"{self.api_url}/users?limit={limit}", 
                    headers=headers
                ) as response:
                    if response.status == 200:
                        return await response.json()
                    else:
                        raise Exception(f"Failed to fetch users: HTTP {response.status}")
        except Exception as e:
            raise Exception(f"Error fetching users: {str(e)}")
    
    async def get_groups(self, limit: int = 1000) -> List[Dict[str, Any]]:
        """Fetch groups from CustomSystem"""
        # Implement group fetching
        pass

2. Register Connector:

# app/connectors/__init__.py

from .okta_connector import OktaConnector
from .custom_system_connector import CustomSystemConnector

CONNECTOR_REGISTRY = {
    "okta": OktaConnector,
    "azure_ad": AzureADConnector,
    "custom_system": CustomSystemConnector  # Add here
}

3. Add Integration Template:

# app/data/integration_templates.py

CUSTOM_SYSTEM_TEMPLATE = IntegrationTemplate(
    name="Custom System",
    description="Custom identity provider integration",
    connection_type="identity",
    provider="custom_system",
    auth_methods=["api_key", "oauth", "basic"],
    required_fields=["endpoint", "api_version"],
    optional_fields=["timeout", "batch_size"],
    default_config={
        "api_version": "v1",
        "timeout": 30,
        "batch_size": 100
    },
    default_sync_frequency="hourly",
    supports_incremental_sync=True,
    supports_webhooks=False
)

⚔ Performance Optimization

Caching Strategy

PIP-Level Caching (Redis):

# Sensitivity-based TTL
CACHE_TTL = {
    "public": 3600,      # 1 hour
    "internal": 1800,    # 30 minutes
    "confidential": 300, # 5 minutes
    "restricted": 60     # 1 minute
}

Policy Bridge Level:

  • Data refresh based on sync frequency
  • Real-time for webhook updates
  • Pub/sub for instant distribution

PEP-Level (Bouncer):

  • Policy cache: 5 minutes
  • Decision cache: 1 minute
  • Policy cache: Updated by Policy Bridge push

Batch Processing

For large datasets:

# Fetch in batches to avoid timeouts
async def fetch_users_batch(connector, batch_size=100):
    offset = 0
    all_users = []
    
    while True:
        batch = await connector.get_users(limit=batch_size, offset=offset)
        if not batch:
            break
        
        all_users.extend(batch)
        offset += batch_size
        
        # Respect rate limits
        await asyncio.sleep(0.1)
    
    return all_users

Incremental Sync Implementation

# Get last sync timestamp
last_sync = connection.last_sync_at

# Query only changed records
if connection.supports_incremental_sync:
    query = f"updated_at > '{last_sync}'"
    changed_records = await connector.fetch_with_filter(query)
else:
    changed_records = await connector.fetch_all()

# Publish only changes to the Policy Bridge
await policy-bridge_publisher.publish_incremental_update(connection, changed_records)

šŸ› ļø Testing & Debugging

Local Testing

Test Connection Locally:

curl -X POST http://localhost:8000/api/pip/connections/test \
  -H "Content-Type: application/json" \
  -d @test_connection.json

Test Data Snapshot:

curl http://localhost:8000/api/pip/connections/1/data-snapshot | jq .

Trigger Manual Sync:

curl -X POST http://localhost:8000/api/pip/connections/1/trigger-sync

Debugging Data Flow

1. Check PIP Fetched Data:

GET /api/pip/connections/1/data-snapshot

2. Check Policy Bridge Received Data:

GET http://localhost:7002/data/policy_data/okta

3. Check PEP Has Data:

GET http://localhost:8080/api/v1/opa/data

4. Test Policy with Data:

curl -X POST http://localhost:8080/v1/data/app/authorization/allow \
  -d '{
    "input": {
      "user": {"id": "00u1234567890abcdef"},
      "resource": {"id": "resource-123"},
      "action": "read"
    }
  }'

Common Issues

Issue: Policies can't access PIP data

Debug:

# In your policy, add debugging
package app.authorization

# Check what data exists
users := data.policy_data.okta.identity.users
user_count := count(users)

# This will show in policy decision details
decision := {
  "user_count": user_count,
  "users_available": [user | user := users[_]]
}

Issue: Stale data in policies

Solutions:

  1. Check last sync time
  2. Trigger manual sync
  3. Verify the Policy Bridge received the update
  4. Clear PEP cache
  5. Check sync job status

šŸ“Œ Advanced Patterns

Pattern: Just-in-Time Attribute Fetching

For highly sensitive or rapidly changing data:

# Instead of batch sync, fetch on-demand
async def fetch_user_clearance_realtime(user_id):
    """Fetch clearance level in real-time during policy evaluation"""
    # Called by PEP during authorization
    # Not cached - always fresh
    response = await connector.get_user_attribute(user_id, "clearance_level")
    return response["clearance_level"]

Pattern: Attribute Transformation

Transform source data before publishing:

# Transform Okta groups array to role hierarchy
def transform_groups_to_roles(groups):
    role_hierarchy = {
        "Admin-Group": {"role": "admin", "level": 5},
        "Manager-Group": {"role": "manager", "level": 4},
        "Developer-Group": {"role": "developer", "level": 3}
    }
    
    return [role_hierarchy.get(group, {"role": "user", "level": 1}) for group in groups]

Pattern: Data Enrichment

Enrich source data with computed attributes:

# Add computed attributes
user_data["attributes"]["access_level"] = compute_access_level(
    user_data["groups"],
    user_data["attributes"]["clearance_level"]
)

user_data["attributes"]["risk_score"] = compute_risk_score(
    user_data["last_login"],
    user_data["mfa_enabled"],
    user_data["login_count"]
)

šŸ“Œ Migration & Updates

Switching Data Sources

Scenario: Migrating from Okta to Azure AD

Steps:

  1. Add Azure AD connection (don't delete Okta yet)
  2. Map attributes to same standard names
  3. Test with both sources active
  4. Update policies to use data.policy_data.azure_ad instead of data.policy_data.okta
  5. Verify all policies work
  6. Disable Okta sync
  7. Delete Okta connection (after grace period)

Benefit of Standard Attributes: If you mapped correctly, policies need minimal changes!

Schema Changes

Handling Provider Schema Updates:

  1. Provider adds new field → Appears in next schema discovery
  2. Provider renames field → Update mapping
  3. Provider removes field → Policies gracefully handle missing data
  4. Provider changes field type → May need policy updates

Best Practice: Use get_value_or_default pattern in policies

šŸ”§ API Complete Reference

All PIP Endpoints

Connection CRUD:

  • POST /api/pip/connections - Create
  • GET /api/pip/connections - List all
  • GET /api/pip/connections/{id} - Get one
  • PUT /api/pip/connections/{id} - Update
  • DELETE /api/pip/connections/{id} - Delete
  • POST /api/pip/connections/test - Test without saving
  • GET /api/pip/connections/{id}/health - Health check

Deep-Schema Discovery & Semantic Search:

  • POST /api/pip/connections/{id}/discover - Full discovery pipeline (schema + ERP pattern tags + watched-table queue + indexing)
  • GET /api/pip/connections/{id}/watched-tables - Retrieve watched tables
  • PUT /api/pip/connections/{id}/watched-tables - Replace watched tables
  • POST /api/pip/search-metadata - Natural language semantic metadata search
  • POST /api/pip/search_metadata - Alias endpoint for compatibility

OAuth:

  • GET /api/pip/oauth/authorize/{provider} - Start OAuth flow
  • POST /api/pip/oauth/callback/{provider} - Handle callback
  • POST /api/pip/oauth/refresh/{connection_id} - Refresh token

Policy Bridge:

  • POST /api/pip/connections/{id}/publish-to-policy-bridge - Publish data
  • GET /api/pip/connections/{id}/policy-bridge-status - Get status
  • GET /api/pip/connections/{id}/data-snapshot - Get snapshot
  • GET /api/pip/policy-bridge/config - Get Policy Bridge config
  • GET /api/pip/connections/{id}/policy-bridge-config - Get connection config

Sync:

  • POST /api/pip/connections/{id}/schedule-sync - Schedule
  • DELETE /api/pip/connections/{id}/unschedule-sync - Unschedule
  • GET /api/pip/connections/{id}/sync-status - Get status
  • POST /api/pip/connections/{id}/trigger-sync - Manual trigger
  • GET /api/pip/sync/scheduler-stats - Stats
  • GET /api/pip/sync/jobs - All jobs

Webhooks:

  • POST /api/pip/webhooks/{provider}/{connection_id} - Receive webhook

Templates:

  • GET /api/pip/integration-templates - List templates
  • GET /api/pip/integration-templates/{id} - Get template

šŸ› ļø Troubleshooting

IssueWhat to check
PIP API returns 401 or 403Verify API key or token and scope; ensure the caller has access to the PIP/data source.
PIP data missing in policy evaluationConfirm attribute mappings and that the data source sync has run; check policy bundle includes PIP data.
Webhook not firing or payload rejectedValidate webhook URL, signature, and payload format; check Control Plane logs for webhook errors.
Custom connector failsTest connection and credentials; ensure TLS and network access from Control Plane to external service.
Deep discovery ignores configured modelVerify smart_cc_settings exists for the same tenant/user invoking discovery; check llm_provider/custom_api_url/custom_api_key.
Watched-table jobs remain fallbackConfirm scheduler is active and classifier endpoint auth succeeds by validating Smart CC /v1/smart-cc/turn first.

For more, see the Troubleshooting Guide.

šŸ“ž Support & Resources

Documentation

Community

  • Community: Use Control Core support channels
  • Repository access: Available via your Control Core account team
  • Support: support@controlcore.io

You're now equipped to build advanced policy integrations with Control Core's PIP system!