🛡️ Rego Policy Language Guidelines

This comprehensive guide covers the Rego policy language as used in Control Core, including language basics, Control Core-specific patterns, real-world examples, and best practices.

🔧 API-First References

Use these pages alongside this guide when implementing policy workflows in your IDE or CI pipeline:

🛡️ Introduction to Rego

Rego is a declarative policy language designed for expressing authorization rules. It's used by Open Policy Agent (OPA) and forms the foundation of Control Core's policy engine.

Why Rego?

  • Declarative: Express what should happen, not how
  • Expressive: Handle complex authorization logic elegantly
  • Testable: Built-in support for unit testing
  • Fast: Sub-millisecond evaluation with caching
  • Safe: No side effects, no infinite loops

Key Concepts

Rules: Define authorization decisions

allow if {
    input.user.role == "admin"
}

Queries: Ask questions about data

admin_users[user] if {
    user := data.users[_]
    user.role == "admin"
}

Assignments: Compute intermediate values

is_admin := input.user.role == "admin"

🛡️ Rego Basics

Package Declaration

Every policy starts with a package declaration:

package controlcore.policy

# Your rules here

Best Practice: Use hierarchical naming:

  • controlcore.policy.api - API policies
  • controlcore.policy.data - Data access policies
  • controlcore.policy.ai - AI agent policies

Imports

Import built-in functions and future keywords:

import rego.v1           # Use Rego v1 syntax
import future.keywords   # Use future keywords (if, contains, etc.)

Recommended imports for Control Core:

package controlcore.policy

import rego.v1
import future.keywords.if
import future.keywords.in
import future.keywords.contains
import future.keywords.every

Data Types

Strings:

name := "John Doe"
role := "admin"

Numbers:

age := 30
clearance_level := 3
rate_limit := 1000

Booleans:

is_active := true
mfa_enabled := false

Arrays:

roles := ["admin", "developer", "manager"]
permissions := ["read", "write", "delete"]

Objects:

user := {
    "id": "user-123",
    "name": "John Doe",
    "roles": ["admin"],
    "attributes": {
        "department": "Engineering"
    }
}

Sets:

allowed_roles := {"admin", "manager", "developer"}

Null:

value := null

Variables and Assignment

Simple assignment:

user_id := input.user.id
role := input.user.roles[0]

Pattern matching:

user := input.user
action := input.action.name

Iteration:

# Iterate over array
role := input.user.roles[_]

# Iterate over object
value := input.resource.attributes[key]

# Iterate with index
role := input.user.roles[i]

Operators

Comparison:

x == y   # Equal
x != y   # Not equal
x < y    # Less than
x <= y   # Less than or equal
x > y    # Greater than
x >= y   # Greater than or equal

Logical:

x; y     # OR (either x or y)
x
y        # AND (both x and y)
not x    # NOT

Arithmetic:

x + y    # Addition
x - y    # Subtraction
x * y    # Multiplication
x / y    # Division
x % y    # Modulo

Set operations:

x | y    # Union
x & y    # Intersection
x - y    # Difference

String operations:

contains(x, y)        # String contains
startswith(x, y)      # Starts with
endswith(x, y)        # Ends with

Array operations:

x in array           # Element in array
count(array)         # Array length

📌 Control Core Input Schema

All Control Core policies receive this input structure:

package controlcore.policy

import rego.v1

# Input is provided by Control Core
# {
#   "user": {
#     "id": "user-123",
#     "email": "user@company.com",
#     "roles": ["developer", "team-lead"],
#     "department": "Engineering",
#     "attributes": {
#       "clearance_level": 3,
#       "manager": "user-456",
#       "mfa_enabled": true
#     }
#   },
#   "resource": {
#     "id": "resource-789",
#     "type": "api",
#     "path": "/api/v1/users",
#     "attributes": {
#       "owner": "user-123",
#       "sensitivity": "confidential",
#       "environment": "production"
#     }
#   },
#   "action": {
#     "name": "read",
#     "method": "GET"
#   },
#   "context": {
#     "ip_address": "192.168.1.100",
#     "time": "2025-01-25T10:30:00Z",
#     "request_id": "req-123"
#   },
#   "workload": {                    // Optional - when Work ID is enabled
#     "spiffe_id": "spiffe://trust-domain/bouncer/bouncer-id"
#   }
# }

# Access input fields
user_id := input.user.id
user_roles := input.user.roles
resource_type := input.resource.type
action_name := input.action.name

# Workload identity (when Work ID enabled and mTLS present)
spiffe_id := input.workload.spiffe_id  # e.g. "spiffe://trust-domain/bouncer/bouncer-id"

Workload Identity: When Work ID is enabled, input.workload.spiffe_id contains the workload identity from the client certificate (when mTLS is used). Use it for workload-based authorization. See Work ID Configuration.

🛡️ Basic Policy Patterns

Pattern 1: Simple Allow/Deny

package controlcore.policy

import rego.v1

# Default deny (fail secure)
default allow := false

# Allow admin users
allow if {
    input.user.roles[_] == "admin"
}

Pattern 2: Multiple Conditions (AND)

package controlcore.policy

import rego.v1

default allow := false

# All conditions must be true
allow if {
    input.user.roles[_] == "developer"
    input.resource.type == "api"
    input.action.name == "read"
    input.resource.attributes.environment == "development"
}

Pattern 3: Alternative Conditions (OR)

package controlcore.policy

import rego.v1

default allow := false

# Method 1: Multiple rules (implicit OR)
allow if {
    input.user.roles[_] == "admin"
}

allow if {
    input.user.roles[_] == "manager"
}

# Method 2: Using 'in'
allow if {
    input.user.roles[_] in ["admin", "manager", "owner"]
}

Pattern 4: Negative Conditions

package controlcore.policy

import rego.v1

default allow := false

allow if {
    # User has developer role
    input.user.roles[_] == "developer"
    
    # BUT not on suspended list
    not user_suspended
}

user_suspended if {
    input.user.id in data.suspended_users
}

Pattern 5: Helper Functions

package controlcore.policy

import rego.v1

default allow := false

# Helper function
is_admin if {
    input.user.roles[_] == "admin"
}

is_manager if {
    input.user.roles[_] == "manager"
}

has_elevated_access if {
    is_admin
}

has_elevated_access if {
    is_manager
}

# Use in main rule
allow if {
    has_elevated_access
    input.action.name == "write"
}

📌 Advanced Patterns

Pattern 1: Set Operations

package controlcore.policy

import rego.v1

default allow := false

required_roles := {"admin", "security-officer"}
user_roles := {role | role := input.user.roles[_]}

# User has all required roles
allow if {
    required_roles & user_roles == required_roles
}

# Alternative: using every
allow if {
    every role in required_roles {
        role in user_roles
    }
}

Pattern 2: Comprehensions

package controlcore.policy

import rego.v1

# Array comprehension
admin_users := [user |
    user := data.users[_]
    user.roles[_] == "admin"
]

# Object comprehension
user_map := {user.id: user.name |
    user := data.users[_]
}

# Set comprehension
admin_emails := {user.email |
    user := data.users[_]
    user.roles[_] == "admin"
}

Pattern 3: Recursive Rules

package controlcore.policy

import rego.v1

# Check if user is in reporting chain
is_manager_of(user_id, employee_id) if {
    employee := data.users[employee_id]
    employee.manager == user_id
}

# Recursive: Check entire chain
is_manager_of(user_id, employee_id) if {
    employee := data.users[employee_id]
    employee.manager != user_id
    is_manager_of(user_id, employee.manager)
}

# Allow managers to access their reports' data
allow if {
    is_manager_of(input.user.id, input.resource.attributes.owner)
}

Pattern 4: Dynamic Rules

package controlcore.policy

import rego.v1

# Generate rules dynamically based on data
allow if {
    policy := data.dynamic_policies[input.resource.type]
    evaluate_policy(policy)
}

evaluate_policy(policy) if {
    required_role := policy.required_role
    input.user.roles[_] == required_role
}

📌 Control Core Specific Patterns

Pattern 1: Role-Based Access Control (RBAC)

package controlcore.policy

import rego.v1

default allow := false

# Role hierarchy
role_hierarchy := {
    "admin": 4,
    "manager": 3,
    "developer": 2,
    "viewer": 1
}

# Get user's highest role level
user_level := max([level |
    role := input.user.roles[_]
    level := role_hierarchy[role]
])

# Required level for action
required_levels := {
    "delete": 4,
    "write": 3,
    "read": 2,
    "view": 1
}

# Allow if user level >= required level
allow if {
    required := required_levels[input.action.name]
    user_level >= required
}

Pattern 2: Attribute-Based Access Control (ABAC)

package controlcore.policy

import rego.v1

default allow := false

allow if {
    # User attributes
    input.user.department == input.resource.attributes.department
    input.user.attributes.clearance_level >= input.resource.attributes.required_clearance
    
    # Resource attributes
    input.resource.attributes.environment in ["development", "staging"]
    
    # Action attributes
    input.action.name in ["read", "list"]
    
    # Context attributes
    is_business_hours
}

is_business_hours if {
    hour := time.clock([time.now_ns()])[0]
    hour >= 9
    hour < 17
}

Pattern 3: Resource Ownership

package controlcore.policy

import rego.v1

default allow := false

# Users can access their own resources
allow if {
    input.resource.attributes.owner == input.user.id
}

# Users can access resources in their department
allow if {
    input.resource.attributes.department == input.user.department
    input.action.name in ["read", "list"]
}

# Managers can access team resources
allow if {
    input.user.roles[_] == "manager"
    employee_ids := {id | 
        employee := data.users[_]
        employee.manager == input.user.id
        id := employee.id
    }
    input.resource.attributes.owner in employee_ids
}

Pattern 4: Data Filtering (Row-Level Security)

package controlcore.policy

import rego.v1

# Filter visible rows based on user permissions
visible_rows[row] if {
    some row in input.query.result
    row.owner_id == input.user.id
}

visible_rows[row] if {
    some row in input.query.result
    input.user.roles[_] == "manager"
    row.department == input.user.department
}

visible_rows[row] if {
    some row in input.query.result
    input.user.roles[_] == "admin"
}

# Mask sensitive columns
masked_data := {key: value |
    some key, value in input.data
    not key in sensitive_fields
}

masked_data[key] := "***REDACTED***" if {
    some key, _ in input.data
    key in sensitive_fields
    not user_can_view_sensitive
}

sensitive_fields := {"ssn", "salary", "password", "api_key"}

user_can_view_sensitive if {
    input.user.attributes.clearance_level >= 3
}

🤖 AI/LLM Policy Patterns

Pattern 1: AI Agent Access Control

package controlcore.policy.ai

import rego.v1

default allow := false

# Allow AI agent with safety checks
allow if {
    # User authorized for AI
    input.user.roles[_] in ["developer", "data-scientist", "ai-engineer"]
    
    # Prompt safety validated
    prompt_safe
    
    # Usage limits not exceeded
    within_usage_limits
    
    # Content policy compliant
    content_compliant
}

# Validate prompt safety
prompt_safe if {
    prompt := input.context.prompt
    
    # Check length
    count(prompt) < 4000
    
    # No sensitive keywords
    not contains_sensitive_keywords(prompt)
    
    # Safety score acceptable
    input.context.safety_score > 0.8
}

contains_sensitive_keywords(prompt) if {
    sensitive := ["password", "api-key", "secret", "private-key", "token"]
    lower_prompt := lower(prompt)
    some keyword in sensitive
    contains(lower_prompt, keyword)
}

# Check usage limits
within_usage_limits if {
    user_stats := data.usage_stats[input.user.id]
    user_stats.requests_today < 1000
    user_stats.tokens_today < 100000
}

# Content policy compliance
content_compliant if {
    prompt := input.context.prompt
    not contains(lower(prompt), "malicious")
    not contains(lower(prompt), "hack")
    not contains(lower(prompt), "exploit")
}

Pattern 2: LLM Context-Aware Policy

package controlcore.policy.llm

import rego.v1

default allow := false

# Allow LLM access with context checks
allow if {
    user_has_llm_access
    llm_context_valid
    prompt_safety_validated
}

user_has_llm_access if {
    input.user.roles[_] in ["admin", "developer", "analyst"]
    input.user.attributes.llm_access_level in ["full", "limited"]
}

llm_context_valid if {
    context := input.context
    
    # Prompt context available
    context.prompt_context.available == true
    
    # Model capabilities match
    context.model_context.capabilities[_] in ["text_generation", "completion"]
    
    # Performance acceptable
    context.model_context.performance.score > 0.8
}

prompt_safety_validated if {
    prompt := input.context.prompt
    
    # Length check
    prompt.length < 4000
    
    # No sensitive data
    not contains_pii(prompt.content)
    
    # Follows safety guidelines
    follows_safety_guidelines(prompt.content)
}

contains_pii(text) if {
    pii_patterns := ["ssn", "credit card", "password", "api key"]
    lower_text := lower(text)
    some pattern in pii_patterns
    contains(lower_text, pattern)
}

follows_safety_guidelines(text) if {
    prohibited := ["harmful", "dangerous", "illegal", "malicious"]
    lower_text := lower(text)
    every term in prohibited {
        not contains(lower_text, term)
    }
}

Pattern 3: RAG System Authorization

package controlcore.policy.rag

import rego.v1

default allow := false

# Allow RAG system access
allow if {
    user_authorized_for_rag
    query_safe
    retrieval_context_valid
}

user_authorized_for_rag if {
    input.user.roles[_] in ["researcher", "analyst", "developer"]
    input.user.attributes.rag_access == true
}

query_safe if {
    query := input.context.query
    
    # Query not malicious
    not injection_attempt(query)
    
    # Query scope appropriate
    query_scope_valid(query)
}

injection_attempt(query) if {
    injection_patterns := ["'; DROP TABLE", "OR 1=1", "../", "<script>"]
    some pattern in injection_patterns
    contains(query, pattern)
}

query_scope_valid(query) if {
    # User can only query their department's data
    query_scope := input.context.query_scope
    query_scope.department == input.user.department
}

query_scope_valid(query) if {
    # Admins can query all data
    input.user.roles[_] == "admin"
}

retrieval_context_valid if {
    context := input.context.retrieval
    
    # Source documents accessible
    every doc_id in context.source_documents {
        user_can_access_document(doc_id)
    }
    
    # Retrieval count reasonable
    count(context.source_documents) <= 10
}

user_can_access_document(doc_id) if {
    doc := data.documents[doc_id]
    doc.owner == input.user.id
}

user_can_access_document(doc_id) if {
    doc := data.documents[doc_id]
    doc.classification == "public"
}

Pattern 4: Content Injection Rules

package controlcore.policy.content

import rego.v1

# Pre-prompt content injection
pre_prompt_injection := {
    "type": "context_enrichment",
    "content": enriched_context
} if {
    input.context.injection_enabled == true
}

enriched_context := concat("\n", [
    sprintf("User: %s", [input.user.email]),
    sprintf("Department: %s", [input.user.department]),
    sprintf("Clearance Level: %d", [input.user.attributes.clearance_level]),
    "Guidelines: Follow company security policies",
    "Restrictions: Do not share sensitive information"
])

# Post-response filtering
filtered_response := response if {
    response := mask_sensitive_data(input.response)
}

mask_sensitive_data(response) := masked if {
    # Replace sensitive patterns
    masked := replace(response, ssn_pattern, "***-**-****")
}

ssn_pattern := `\d{3}-\d{2}-\d{4}`

📌 Built-in Functions

String Functions

# Concatenation
result := concat("-", ["hello", "world"])  # "hello-world"

# Formatting
message := sprintf("User %s has role %s", [user, role])

# Case conversion
lower_text := lower("HELLO")  # "hello"
upper_text := upper("hello")  # "HELLO"

# String operations
contains("hello world", "world")      # true
startswith("hello", "hel")            # true
endswith("world", "ld")               # true
trim("  hello  ", " ")                # "hello"
replace("hello", "l", "r")            # "herro"

Array Functions

# Count
count([1, 2, 3])  # 3

# Concatenation
concat_array := array.concat([1, 2], [3, 4])  # [1, 2, 3, 4]

# Slice
array.slice([1, 2, 3, 4], 1, 3)  # [2, 3]

Set Functions

# Union
{"a", "b"} | {"b", "c"}  # {"a", "b", "c"}

# Intersection
{"a", "b"} & {"b", "c"}  # {"b"}

# Difference
{"a", "b"} - {"b", "c"}  # {"a"}

Aggregation Functions

# Sum
sum([1, 2, 3, 4])  # 10

# Product
product([2, 3, 4])  # 24

# Max/Min
max([1, 5, 3])  # 5
min([1, 5, 3])  # 1

# Count
count([1, 2, 3])  # 3

Type Functions

# Type checking
is_string("hello")     # true
is_number(42)          # true
is_boolean(true)       # true
is_array([1, 2])       # true
is_object({"a": 1})    # true
is_set({1, 2})         # true
is_null(null)          # true

Time Functions

# Current time (nanoseconds since epoch)
now := time.now_ns()

# Parse time
parsed := time.parse_rfc3339_ns("2025-01-25T10:30:00Z")

# Clock components [hour, minute, second]
[hour, minute, second] := time.clock([now])

# Date components [year, month, day]
[year, month, day] := time.date([now])

# Day of week (0=Sunday, 6=Saturday)
weekday := time.weekday([now])

JSON Functions

# Parse JSON string
data := json.unmarshal('{"name": "John", "age": 30}')

# Marshal to JSON
json_str := json.marshal({"name": "John"})

Encoding Functions

# Base64
encoded := base64.encode("hello")
decoded := base64.decode("aGVsbG8=")

# URL encoding
url_encoded := urlquery.encode("hello world")
url_decoded := urlquery.decode("hello+world")

Crypto Functions

# ⚠️  DEPRECATED: MD5 and SHA1 are deprecated for security reasons
# Use SHA-256 or SHA-512 instead in production code:
sha256_hash := crypto.sha256("hello")
sha512_hash := crypto.sha512("hello")

# MD5 (DEPRECATED - Do not use in production)
# md5_hash := crypto.md5("hello")

# SHA1 (DEPRECATED - Do not use in production)
# sha1_hash := crypto.sha1("hello")

Regex Functions

# Match
regex.match("^[a-z]+$", "hello")  # true

# Find all matches
matches := regex.find_n("[0-9]+", "abc123def456", -1)  # ["123", "456"]

# Replace
result := regex.replace("hello", "l+", "L")  # "heLo"

📌 Testing Policies

Unit Testing

package controlcore.policy

import rego.v1

# Policy under test
default allow := false

allow if {
    input.user.roles[_] == "admin"
}

# Test cases
test_admin_allowed if {
    allow with input as {
        "user": {"roles": ["admin"]},
        "resource": {"type": "api"},
        "action": {"name": "read"}
    }
}

test_developer_denied if {
    not allow with input as {
        "user": {"roles": ["developer"]},
        "resource": {"type": "api"},
        "action": {"name": "read"}
    }
}

test_no_role_denied if {
    not allow with input as {
        "user": {"roles": []},
        "resource": {"type": "api"},
        "action": {"name": "read"}
    }
}

Testing with Mock Data

package controlcore.policy

import rego.v1

# Mock data
mock_users := {
    "user-1": {"id": "user-1", "roles": ["admin"]},
    "user-2": {"id": "user-2", "roles": ["developer"]},
}

test_with_mock_data if {
    allow with input as {
        "user": {"id": "user-1"},
        "resource": {"type": "api"},
        "action": {"name": "write"}
    }
    with data.users as mock_users
}

📌 Best Practices

1. Always Default Deny

# Good: Explicit default deny
default allow := false

allow if {
    # conditions
}

# Bad: No default (implicit undefined)
allow if {
    # conditions
}

2. Use Helper Functions

# Good: Reusable helper functions
is_admin if {
    input.user.roles[_] == "admin"
}

is_resource_owner if {
    input.resource.owner == input.user.id
}

allow if {
    is_admin
}

allow if {
    is_resource_owner
}

# Bad: Duplicate logic
allow if {
    input.user.roles[_] == "admin"
}

allow if {
    input.user.roles[_] == "admin"
    input.resource.type == "api"
}

3. Document Your Policies

package controlcore.policy

import rego.v1

# Policy: API Access Control
# Owner: Security Team
# Last Updated: 2025-01-25
#
# This policy controls access to production APIs based on:
# - User roles and permissions
# - Resource sensitivity levels
# - Time-based restrictions
# - MFA requirements for sensitive operations
#
# Related Documentation: https://wiki.company.com/api-security

default allow := false

# Allow admin users full access
# Admins are defined in the identity provider
allow if {
    input.user.roles[_] == "admin"
}

4. Keep Policies Simple

# Good: Simple and clear
allow if {
    input.user.roles[_] == "developer"
    input.action.name == "read"
}

# Bad: Overly complex
allow if {
    count([r | r := input.user.roles[_]; r in ["developer", "engineer", "coder"]]) > 0
    [a | a := ["read", "get", "list"]][_] == input.action.name
}

5. Use Type Checking

# Good: Check types
allow if {
    is_string(input.user.id)
    is_array(input.user.roles)
    count(input.user.roles) > 0
    input.user.roles[_] == "admin"
}

# Bad: Assume types
allow if {
    input.user.roles[_] == "admin"  # Could fail if roles is not an array
}

⚡ Performance Optimization

1. Use Early Returns

# Good: Check cheap conditions first
allow if {
    input.action.name == "read"  # Quick check
    input.user.roles[_] == "developer"  # Quick check
    expensive_database_lookup  # Expensive check last
}

# Bad: Expensive check first
allow if {
    expensive_database_lookup  # Slow
    input.action.name == "read"  # Fast but checked last
}

2. Cache-Friendly Policies

# Good: Deterministic (cacheable)
allow if {
    input.user.roles[_] == "admin"
    input.resource.type == "api"
}

# Bad: Non-deterministic (not cacheable)
allow if {
    time.now_ns() < input.session.expiry  # Changes every nanosecond!
}

# Better: Use reasonable granularity
allow if {
    current_hour := time.clock([time.now_ns()])[0]
    current_hour < 17  # Changes hourly, more cacheable
}

3. Avoid Expensive Operations

# Good: Simple operations
allow if {
    input.user.department == "Engineering"
}

# Bad: Complex iteration
allow if {
    count([u | u := data.users[_]; u.department == "Engineering"]) > 100
}

🔒 Regulatory Compliance Patterns

FINTRAC Compliance (Canada)

Large Cash Transaction Reporting (LCTR):

package fintrac.lctr

import rego.v1

default allow := false
default requires_lctr := false

# FINTRAC: Large Cash Transaction Report required for CAD $10,000+
requires_lctr if {
    input.transaction.amount >= 10000
    input.transaction.currency == "CAD"
    input.transaction.type in ["cash", "cash_equivalent"]
}

# Allow with LCTR requirement
allow if {
    # User must be FINTRAC-trained
    input.user.attributes.fintrac_certified == true
    input.user.roles[_] in ["teller", "financial-advisor", "compliance-officer"]
    
    # Required transaction details present
    input.transaction.customer_id != null
    input.transaction.conductor_id != null
    input.transaction.purpose != null
    input.transaction.source_of_funds != null
    
    # Create LCTR if required
    requires_lctr
}

# Suspicious Transaction Report (STR) Detection
str_indicators[reason] if {
    # Structuring: Multiple transactions just under reporting threshold
    recent_txns := data.transactions_last_30_days[input.transaction.customer_id]
    near_threshold := [t | t := recent_txns[_]; t.amount >= 9000; t.amount < 10000]
    count(near_threshold) >= 3
    reason := "possible_structuring"
}

str_indicators[reason] if {
    # Unusual pattern: Amount significantly higher than normal
    customer := data.customers[input.transaction.customer_id]
    input.transaction.amount > (customer.average_transaction * 10)
    reason := "unusual_amount"
}

str_indicators[reason] if {
    # Rapid movement of funds
    input.transaction.type == "wire_transfer"
    input.transaction.received_within_hours < 24
    input.transaction.immediate_withdrawal == true
    reason := "rapid_movement_of_funds"
}

# Block suspicious transactions for compliance review
allow if {
    count(str_indicators) == 0
    input.user.attributes.aml_cleared == true
}

# Compliance officer can process flagged transactions
allow if {
    count(str_indicators) > 0
    input.user.roles[_] == "compliance-officer"
    input.transaction.compliance_review_completed == true
}

Third Party Determination:

package fintrac.third_party

import rego.v1

# FINTRAC: Identify if conducting on behalf of third party
third_party_determination_required if {
    input.transaction.amount >= 10000
    input.transaction.customer_id != input.transaction.conductor_id
}

allow if {
    third_party_determination_required
    
    # Third party information collected
    input.transaction.third_party.name != null
    input.transaction.third_party.address != null
    input.transaction.third_party.relationship != null
    
    # User verified third party information
    input.user.roles[_] in ["teller", "compliance-officer"]
}

OSFI Guidelines (Canada)

Guideline B-10: Outsourcing of Business Activities:

package osfi.outsourcing

import rego.v1

default allow := false

# OSFI B-10: Third-party access requires controls
allow if {
    input.user.type == "third_party_vendor"
    
    # Vendor must be approved
    vendor := data.approved_vendors[input.user.vendor_id]
    vendor.approval_status == "active"
    vendor.due_diligence_completed == true
    
    # Access limited to contracted services only
    input.resource.type in vendor.contracted_services
    
    # Monitoring and audit enabled
    input.context.audit_enabled == true
    input.context.vendor_activity_logged == true
}

# OSFI: Segregation of Duties
allow if {
    not has_conflicting_duties(input.user.roles)
    duty_appropriate_for_action(input.user.roles, input.action.name)
}

has_conflicting_duties(roles) if {
    "maker" in roles
    "checker" in roles
}

has_conflicting_duties(roles) if {
    "initiator" in roles
    "approver" in roles
}

Guideline E-21: Technology and Cyber Security:

package osfi.cyber_security

import rego.v1

default allow := false

# OSFI E-21: Multi-Factor Authentication for sensitive operations
allow if {
    input.resource.attributes.sensitivity in ["confidential", "restricted"]
    
    # MFA required
    input.user.attributes.mfa_verified == true
    input.context.mfa_timestamp_age_seconds < 300  # MFA within last 5 minutes
    
    # Access from approved location
    input.context.ip_address in data.approved_ip_ranges
}

# OSFI E-21: Privileged Access Management
allow if {
    input.user.type == "privileged_user"
    
    # Privileged sessions must be recorded
    input.context.session_recording_enabled == true
    
    # Just-in-time access (time-limited)
    session_timestamp := time.parse_rfc3339_ns(input.context.session_start)
    session_age_seconds := (time.now_ns() - session_timestamp) / 1000000000
    session_age_seconds < 3600  # 1 hour max
    
    # Approval required
    input.context.privileged_access_approved == true
}

AML Regulations (US & Canada)

Transaction Monitoring:

package aml.monitoring

import rego.v1

default allow := false

# AML: Currency Transaction Report (CTR) for USD $10,000+
requires_ctr if {
    input.transaction.amount >= 10000
    input.transaction.currency == "USD"
}

# AML: Suspicious Activity Report (SAR) triggers
sar_triggers[trigger] if {
    # Trigger 1: Transactions with no apparent business purpose
    input.transaction.business_purpose == "unknown"
    input.transaction.amount >= 5000
    trigger := "no_business_purpose"
}

sar_triggers[trigger] if {
    # Trigger 2: Customer refuses to provide information
    customer := data.customers[input.transaction.customer_id]
    customer.info_refusal_count > 0
    trigger := "customer_refused_information"
}

sar_triggers[trigger] if {
    # Trigger 3: Funds transfers to high-risk jurisdictions
    input.transaction.type == "wire_transfer"
    input.transaction.destination_country in data.high_risk_countries
    trigger := "high_risk_jurisdiction"
}

# Allow with SAR filing requirement
allow if {
    count(sar_triggers) > 0
    input.user.roles[_] == "compliance-officer"
    input.transaction.sar_filed == true
}

allow if {
    count(sar_triggers) == 0
    requires_ctr
    input.transaction.ctr_filed == true
}

Customer Due Diligence (CDD):

package aml.cdd

import rego.v1

default allow := false

# AML: Customer Due Diligence requirements
allow if {
    customer := data.customers[input.resource.attributes.customer_id]
    
    # Standard CDD (all customers)
    customer.identity_verified == true
    customer.name != null
    customer.address != null
    customer.date_of_birth != null
    customer.identification_number != null
    
    # Beneficial ownership (for entities)
    beneficial_ownership_satisfied(customer)
    
    # Risk assessment completed
    customer.risk_assessment_date != null
    risk_assessment_current(customer.risk_assessment_date)
}

beneficial_ownership_satisfied(customer) if {
    customer.type == "individual"
}

beneficial_ownership_satisfied(customer) if {
    customer.type == "entity"
    count(customer.beneficial_owners) > 0
    every owner in customer.beneficial_owners {
        owner.ownership_percentage >= 25
        owner.identity_verified == true
    }
}

risk_assessment_current(assessment_date) if {
    days_since := (time.now_ns() - time.parse_rfc3339_ns(assessment_date)) / (24 * 60 * 60 * 1000000000)
    days_since < 365  # Annual review required
}

Office of Foreign Assets Control (OFAC) Screening:

package aml.ofac

import rego.v1

default allow := false

# OFAC: Sanctions screening required
allow if {
    customer := data.customers[input.transaction.customer_id]
    
    # Not on OFAC SDN (Specially Designated Nationals) list
    not customer.id in data.ofac_sdn_list
    not customer.name in data.ofac_sdn_names
    
    # Not transacting with sanctioned countries
    not input.transaction.destination_country in data.ofac_sanctioned_countries
    not input.transaction.origination_country in data.ofac_sanctioned_countries
    
    # Screening completed recently
    screening_age := (time.now_ns() - time.parse_rfc3339_ns(customer.last_ofac_screening)) / (24 * 60 * 60 * 1000000000)
    screening_age < 1  # Daily screening
}

# OFAC: 50% Rule (entities owned by sanctioned persons)
allow if {
    entity := data.entities[input.transaction.entity_id]
    
    # Check all owners
    sanctioned_ownership := sum([owner.ownership_percentage |
        owner := entity.owners[_]
        owner.id in data.ofac_sdn_list
    ])
    
    # Total sanctioned ownership must be < 50%
    sanctioned_ownership < 50
}

FinCEN Regulations (US)

Bank Secrecy Act (BSA) Compliance:

package fincen.bsa

import rego.v1

default allow := false

# BSA: Currency Transaction Report (CTR) for $10,000+
requires_ctr if {
    input.transaction.amount >= 10000
    input.transaction.currency == "USD"
    input.transaction.type in ["cash", "deposit", "withdrawal"]
}

# BSA: Suspicious Activity Report (SAR)
requires_sar if {
    count(suspicious_indicators) >= 2
}

suspicious_indicators[indicator] if {
    # Multiple transactions just under $10,000
    recent := data.transactions_last_7_days[input.transaction.customer_id]
    count([t | t := recent[_]; t.amount >= 9000; t.amount < 10000]) >= 2
    indicator := "structuring"
}

suspicious_indicators[indicator] if {
    # Unusual transaction for customer profile
    customer := data.customers[input.transaction.customer_id]
    input.transaction.amount > (customer.average_monthly_transactions * 5)
    indicator := "unusual_activity"
}

suspicious_indicators[indicator] if {
    # Transactions with known high-risk indicators
    input.transaction.destination_country in data.high_risk_jurisdictions
    input.transaction.amount >= 5000
    indicator := "high_risk_jurisdiction"
}

allow if {
    requires_ctr
    input.transaction.ctr_filed == true
    input.user.roles[_] in ["teller", "compliance-officer"]
}

allow if {
    requires_sar
    input.transaction.sar_filed == true
    input.user.roles[_] == "compliance-officer"
    input.transaction.senior_management_notified == true
}

Customer Identification Program (CIP):

package fincen.cip

import rego.v1

default allow := false

# FinCEN: CIP requirements for account opening
allow if {
    input.action.name == "open_account"
    customer := input.request.customer_info
    
    # Four key pieces of information required
    customer.name != null
    customer.date_of_birth != null
    customer.address != null
    customer.identification_number != null  # SSN or ITIN
    
    # Identification document verified
    customer.id_verification.method in ["documentary", "non_documentary"]
    customer.id_verification.status == "verified"
    
    # User authorized to open accounts
    input.user.roles[_] in ["account-officer", "branch-manager"]
}

GDPR Compliance (EU)

Right to Access (Article 15):

package gdpr.access

import rego.v1

default allow := false

# GDPR: Data subject has right to access their personal data
allow if {
    input.action.name == "read"
    input.resource.type == "personal_data"
    
    # User accessing their own data
    input.resource.attributes.data_subject_id == input.user.id
}

# Allow data protection officer to access for compliance
allow if {
    input.user.roles[_] == "data-protection-officer"
    input.context.purpose == "compliance_review"
    input.context.audit_enabled == true
}

Right to Erasure (Article 17):

package gdpr.erasure

import rego.v1

default allow := false

# GDPR: Right to be forgotten
allow if {
    input.action.name == "delete"
    input.resource.type == "personal_data"
    
    # Verify erasure request
    erasure_request := data.erasure_requests[input.resource.attributes.data_subject_id]
    erasure_request.status == "approved"
    erasure_request.legal_grounds_verified == true
    
    # Only DPO can execute erasure
    input.user.roles[_] == "data-protection-officer"
    
    # No legal obligation to retain
    not legal_retention_required(input.resource)
}

legal_retention_required(resource) if {
    # Financial records must be kept for 7 years
    resource.attributes.type == "financial_record"
    record_age_days := (time.now_ns() - time.parse_rfc3339_ns(resource.attributes.created_at)) / (24 * 60 * 60 * 1000000000)
    record_age_days < 2555  # 7 years
}

Data Processing Purpose Limitation (Article 5):

package gdpr.purpose_limitation

import rego.v1

default allow := false

# GDPR: Data can only be used for specified, explicit purposes
allow if {
    consent := data.user_consents[input.resource.attributes.data_subject_id]
    
    # Purpose matches consent
    input.context.processing_purpose in consent.approved_purposes
    
    # Consent is still valid
    consent.status == "active"
    consent.withdrawal_date == null
}

HIPAA Compliance (US Healthcare)

Minimum Necessary Rule:

package hipaa.minimum_necessary

import rego.v1

default allow := false

# HIPAA: Access only minimum necessary PHI
allow if {
    input.user.roles[_] in ["physician", "nurse", "healthcare-provider"]
    
    # User is part of patient's care team
    patient_id := input.resource.attributes.patient_id
    care_team := data.patient_care_teams[patient_id]
    input.user.id in care_team.members
    
    # Purpose of use is valid
    input.context.purpose_of_use in ["treatment", "payment", "healthcare-operations"]
    
    # Access limited to necessary fields only
    requested_fields := input.query.fields
    every field in requested_fields {
        field in necessary_fields_for_purpose[input.context.purpose_of_use]
    }
}

necessary_fields_for_purpose := {
    "treatment": ["patient_id", "diagnosis", "medications", "allergies", "vital_signs"],
    "payment": ["patient_id", "insurance_info", "billing_codes", "charges"],
    "healthcare-operations": ["patient_id", "encounter_date", "provider_id"]
}

Break-the-Glass Emergency Access:

package hipaa.emergency_access

import rego.v1

default allow := false

# HIPAA: Emergency access with logging
allow if {
    input.context.emergency == true
    input.user.roles[_] in ["physician", "emergency-physician", "nurse"]
    
    # Emergency access logged
    input.context.audit_enabled == true
    input.context.emergency_justification != null
    
    # Access reviewed post-emergency
    schedule_emergency_access_review(input.user.id, input.resource.id)
}

schedule_emergency_access_review(user_id, resource_id) if {
    # Creates audit record for review
    true
}

PCI-DSS Compliance

Cardholder Data Access Control:

package pci.cardholder_data

import rego.v1

default allow := false

# PCI-DSS Requirement 7: Restrict access to cardholder data
allow if {
    input.resource.attributes.contains_cardholder_data == true
    
    # User has business need to know
    input.user.attributes.pci_access_justification != null
    input.user.roles[_] in data.pci_authorized_roles
    
    # Access from secured network only
    input.context.network_segment == "cardholder_data_environment"
    
    # Strong authentication
    input.user.attributes.mfa_verified == true
    
    # Automatic logout after 15 minutes
    session_age := (time.now_ns() - time.parse_rfc3339_ns(input.context.session_start)) / 1000000000
    session_age < 900
}

# PCI-DSS Requirement 8: Unique ID for each user
allow if {
    # Ensure unique user identification
    input.user.id != null
    input.user.id != "shared"
    input.user.id != "generic"
    
    # No shared accounts
    not is_shared_account(input.user.id)
}

is_shared_account(user_id) if {
    user_id in ["admin", "root", "service", "shared"]
}

SOC 2 Compliance

Access Control (CC6.1, CC6.2):

package soc2.access_control

import rego.v1

default allow := false

# SOC 2 CC6.1: Logical access requires authentication
allow if {
    # User must be authenticated
    input.user.authenticated == true
    input.user.authentication_method in ["saml", "oauth", "mfa"]
    
    # Session must be valid
    session_valid(input.context.session_id)
    
    # Authorization based on role
    input.user.roles[_] in authorized_roles_for_resource[input.resource.type]
}

session_valid(session_id) if {
    session := data.active_sessions[session_id]
    session.status == "active"
    
    # Session not expired
    expiry := time.parse_rfc3339_ns(session.expiry)
    time.now_ns() < expiry
}

# SOC 2 CC6.2: Prior to issuing credentials, registry approved
allow if {
    input.action.name == "create_user_account"
    
    # Approval workflow completed
    request := input.request.account_request
    request.manager_approval == true
    request.security_approval == true
    
    # User provisioning follows principle of least privilege
    every role in request.requested_roles {
        role in data.approved_roles
    }
}

Change Management (CC8.1):

package soc2.change_management

import rego.v1

default allow := false

# SOC 2 CC8.1: Changes to policies require approval
allow if {
    input.action.name in ["create_policy", "update_policy", "delete_policy"]
    
    # Change request submitted
    change_request := input.context.change_request
    change_request.id != null
    
    # Approvals obtained
    change_request.technical_approval == true
    change_request.business_approval == true
    
    # Testing completed
    change_request.testing_status == "passed"
    
    # User authorized to deploy
    input.user.roles[_] in ["policy-admin", "devops-engineer"]
}

🛠️ Troubleshooting

IssueWhat to check
Policy validation errorsEnsure syntax matches Rego rules (e.g. no missing if, correct indentation). Use the policy editor's validation or run tests locally.
Unexpected allow/denyTrace rules with sample input; check that input schema (user, resource, action, context) matches what the bouncer sends.
Tests pass locally but fail in Control PlaneConfirm the same Rego version and built-ins; check that bundled policies and dependencies (e.g. PIP data) are in sync.
Policy slow or timeoutSimplify rules, avoid heavy comprehensions on large collections; use indexing and see Performance Optimization.

For more, see the Troubleshooting Guide.

📌 External Resources

Official Documentation

Tutorials and Learning

Community Resources

Style Guides

📌 Control Core Specific Resources

🔧 Quick Reference

Common Patterns Cheat Sheet

# Basic access control
default allow := false
allow if { input.user.roles[_] == "admin" }

# Multiple conditions (AND)
allow if {
    input.user.roles[_] == "developer"
    input.action.name == "read"
}

# Alternative conditions (OR)
allow if { input.user.roles[_] == "admin" }
allow if { input.user.roles[_] == "manager" }

# Set membership
allow if { input.user.roles[_] in ["admin", "manager"] }

# Negation
allow if { not user_suspended }

# Iteration
user_ids := [user.id | user := data.users[_]]

# Helper function
is_admin if { input.user.roles[_] == "admin" }

# Time-based
is_business_hours if {
    hour := time.clock([time.now_ns()])[0]
    hour >= 9; hour < 17
}

# Resource ownership
allow if { input.resource.owner == input.user.id }

# Data filtering
visible_rows[row] if {
    some row in input.query.result
    row.owner == input.user.id
}

🛡️ Advanced Rego Features

This section covers advanced Rego capabilities for building sophisticated policies that go beyond basic conditions.

Advanced Conditions & Logic

Nested Conditions with some/every

The some and every keywords enable powerful iteration and validation patterns:

Using some for iteration:

# Check if ANY user has admin role
has_admin_user if {
    some user in data.users
    user.role == "admin"
}

# Find admin user with specific permission
admin_with_permission(permission) if {
    some user in data.users
    user.role == "admin"
    permission in user.permissions
}

Using every for validation:

# ALL users must have MFA enabled
all_users_have_mfa if {
    every user in data.users {
        user.mfa_enabled == true
    }
}

# All transactions must be under limit
transactions_valid if {
    every txn in input.transactions {
        txn.amount < data.limits.max_transaction
    }
}

Complex boolean logic:

# Multi-level approval workflow
allow if {
    # Manager approval required
    some approval in input.approvals
    approval.role == "manager"
    approval.status == "approved"
    
    # AND either VP approval OR amount under threshold
    {
        some vp_approval in input.approvals
        vp_approval.role == "vp"
        vp_approval.status == "approved"
    } or {
        input.transaction.amount < 10000
    }
}

Nested Rule Structures

# Hierarchical permission checking
allow if {
    resource_accessible
    action_permitted
}

resource_accessible if {
    input.resource.type == "document"
    document_access_granted
}

document_access_granted if {
    input.resource.classification == "public"
}

document_access_granted if {
    input.resource.classification == "internal"
    employee_access
}

document_access_granted if {
    input.resource.classification == "confidential"
    manager_access
}

employee_access if {
    input.user.employee_id != ""
}

manager_access if {
    input.user.role in ["manager", "director", "vp"]
}

action_permitted if {
    input.action in allowed_actions
}

allowed_actions contains "read" if { employee_access }
allowed_actions contains "write" if { manager_access }
allowed_actions contains "delete" if { input.user.role == "admin" }

Comprehensions

Comprehensions are powerful tools for transforming and filtering data.

Array Comprehensions

Filtering:

# Get all admin users
admin_users := [user |
    some user in data.users
    user.role == "admin"
]

# Get user IDs with high clearance
high_clearance_ids := [user.id |
    some user in data.users
    user.clearance_level >= 5
]

# Complex filtering with multiple conditions
eligible_approvers := [user |
    some user in data.users
    user.role in ["manager", "director"]
    user.department == input.resource.department
    user.active == true
]

Transformation:

# Extract just the names
user_names := [user.name | some user in data.users]

# Create structured list
user_summaries := [{"id": user.id, "name": user.name} |
    some user in data.users
]

# Permission aggregation
all_user_permissions := [perm |
    some user in data.users
    some perm in user.permissions
]

Object Comprehensions

Creating mappings:

# Map user ID to role
user_roles := {user.id: user.role |
    some user in data.users
}

# Build permission matrix
permission_matrix := {user.id: user.permissions |
    some user in data.users
    user.active == true
}

# Department user count
dept_counts := {dept: count(users) |
    some dept in department_list
    users := [u | some u in data.users; u.department == dept]
}

Data transformation:

# Transform array to lookup object
resource_by_id := {resource.id: resource |
    some resource in input.resources
}

# Index by multiple keys
users_by_email := {user.email: user |
    some user in data.users
}

Set Comprehensions

Creating unique sets:

# All unique roles in the system
all_roles := {user.role |
    some user in data.users
}

# All departments with active users
active_departments := {user.department |
    some user in data.users
    user.active == true
}

# Permissions across all admins
admin_permissions := {perm |
    some user in data.users
    user.role == "admin"
    some perm in user.permissions
}

Functions and Helper Rules

Create reusable logic with functions and helper rules.

Helper Rules

# Reusable validation functions
is_admin if {
    input.user.role == "admin"
}

is_manager if {
    input.user.role == "manager"
}

is_employee if {
    input.user.employee_id != ""
}

# Composite helpers
has_elevated_access if { is_admin }
has_elevated_access if { is_manager }

# Parameterized helpers
user_has_role(role) if {
    input.user.roles[_] == role
}

user_has_permission(permission) if {
    input.user.permissions[_] == permission
}

Functions with Arguments

# Calculate risk score
risk_score(transaction) := score if {
    base_score := transaction.amount / 1000
    multiplier := transaction_type_multiplier(transaction.type)
    score := base_score * multiplier
}

transaction_type_multiplier("wire_transfer") := 2.0
transaction_type_multiplier("ach") := 1.0
transaction_type_multiplier("check") := 0.5
transaction_type_multiplier(_) := 1.0  # default

# Permission calculation
effective_permissions(user) := permissions if {
    # Direct permissions
    direct := user.permissions
    
    # Role-based permissions
    role_perms := [perm |
        some role in user.roles
        some perm in data.role_permissions[role]
    ]
    
    # Combine and deduplicate
    permissions := array.concat(direct, role_perms)
}

Custom Validation Functions

# Validate email format
is_valid_email(email) if {
    regex.match(`^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`, email)
}

# Validate phone number
is_valid_phone(phone) if {
    regex.match(`^\+?[1-9]\d{1,14}$`, phone)
}

# Validate credit card (Luhn algorithm)
is_valid_card(number) if {
    # Implementation of Luhn algorithm
    digits := [to_number(d) | some d in split(number, "")]
    sum := luhn_sum(digits, count(digits) - 1, false)
    sum % 10 == 0
}

luhn_sum(digits, index, double) := sum if {
    index < 0
    sum := 0
} else := sum if {
    digit := digits[index]
    value := digit * (1 + to_number(double))
    adjusted := value - (9 * (value > 9))
    sum := adjusted + luhn_sum(digits, index - 1, not double)
}

Data Lookups and External Data

Integrate external data sources into policy decisions.

Accessing the Data Document

# Direct data access
allow if {
    user_role := data.user_roles[input.user.id]
    user_role == "admin"
}

# Hierarchical data access
allow if {
    dept := data.departments[input.user.department]
    dept.classification == "secure"
    input.resource.classification <= dept.max_resource_level
}

PIP Integration Patterns

Policy Information Points (PIPs) provide real-time context:

# User data from Auth0 PIP
allow if {
    # PIP data available at data.pip.auth0
    user_profile := data.pip.auth0.users[input.user.id]
    user_profile.email_verified == true
    user_profile.mfa_enabled == true
}

# Location data from IP Geolocation PIP
allow if {
    # PIP data available at data.pip.ipgeo
    location := data.pip.ipgeo.lookup[input.request.ip]
    location.country in data.allowed_countries
}

# Real-time risk scoring from custom PIP
deny if {
    risk_data := data.pip.risk_engine.score[input.user.id]
    risk_data.score > data.thresholds.max_risk
}

Dynamic Permission Lookups

# Load permissions from external system
user_permissions contains permission if {
    # Fetch from PIP at policy evaluation time
    user_data := data.pip.permissions.users[input.user.id]
    some permission in user_data.permissions
}

# Resource-specific permissions
allow if {
    resource_acl := data.pip.acls.resources[input.resource.id]
    input.user.id in resource_acl.allowed_users
}

# Role hierarchy from LDAP
user_roles contains role if {
    ldap_user := data.pip.ldap.users[input.user.dn]
    some group in ldap_user.groups
    role := data.role_mappings[group]
}

Query Patterns

Advanced query patterns for complex scenarios.

Universal Quantification (every)

# All team members must approve
allow if {
    team := data.teams[input.resource.team_id]
    every member in team.members {
        some approval in input.approvals
        approval.user_id == member.id
        approval.status == "approved"
    }
}

# All documents must be classified
documents_valid if {
    every doc in input.documents {
        doc.classification in ["public", "internal", "confidential", "secret"]
    }
}

# All regions must be compliant
compliant if {
    every region in input.deployment.regions {
        region in data.compliant_regions
        data.region_config[region].encryption_enabled == true
    }
}

Existential Quantification (some)

# At least one admin must approve
allow if {
    some approval in input.approvals
    approval.role == "admin"
    approval.status == "approved"
}

# Any developer can read
allow if {
    input.action == "read"
    some role in input.user.roles
    role == "developer"
}

# Check if any violation exists
has_violations if {
    some check in security_checks
    not check.passed
}

Combined Patterns

# At least 2 managers must approve, and no director can reject
allow if {
    # Count manager approvals
    manager_approvals := count([a |
        some a in input.approvals
        a.role == "manager"
        a.status == "approved"
    ])
    manager_approvals >= 2
    
    # Check no director rejections
    not director_rejected
}

director_rejected if {
    some approval in input.approvals
    approval.role == "director"
    approval.status == "rejected"
}

OPA v1.10.0+ Features

Take advantage of the latest OPA capabilities.

Enhanced Built-in Functions

import rego.v1

# New string functions (OPA 0.60+)
normalized_email := strings.trim_space(lower(input.user.email))

# Template rendering (OPA 0.58+)
message := sprintf("User %s attempted %s on %s", [
    input.user.name,
    input.action,
    input.resource.name
])

# Advanced JSON operations
filtered_data := json.remove(input.sensitive_data, ["/ssn", "/credit_card"])

# UUID generation for audit trails
audit_id := uuid.rfc4122()

Performance Optimizations

Partial evaluation (Enterprise feature):

# Mark rules for partial evaluation
allow if {
    # This will be partially evaluated at compile time
    input.user.department in data.authorized_departments
}

Caching strategies:

# Cache expensive computations
user_permissions := cached_permissions(input.user.id)

cached_permissions(user_id) := permissions if {
    # Complex permission calculation
    permissions := compute_effective_permissions(user_id)
}

API Extensions

Custom built-in functions (requires OPA bundle):

# Use custom functions from your bundle
allow if {
    # Custom cryptographic verification
    custom.verify_signature(input.payload, input.signature, data.public_key)
}

# Custom geospatial functions
allow if {
    distance := custom.geo_distance(
        input.user.location,
        input.resource.location
    )
    distance < data.max_distance_km
}

📌 Real-World Advanced Examples

Financial Compliance (FINTRAC, OSFI)

package controlcore.compliance.fintrac

import rego.v1

# FINTRAC: Large cash transaction reporting
reportable_transaction if {
    input.transaction.type == "cash"
    input.transaction.amount >= 10000  # CAD
    input.transaction.currency == "CAD"
}

# FINTRAC: Suspicious transaction monitoring
suspicious_transaction if {
    # Multiple transactions just below reporting threshold
    recent_transactions := [t |
        some t in data.transactions[input.user.id]
        t.timestamp > time.now_ns() - (24 * 3600 * 1000000000)  # 24 hours
        t.amount >= 9000
        t.amount < 10000
    ]
    count(recent_transactions) >= 3
}

# OSFI: Know Your Client validation
allow if {
    input.action == "high_risk_transaction"
    kyc_complete(input.user.id)
    risk_assessment_current(input.user.id)
}

kyc_complete(user_id) if {
    kyc := data.kyc_records[user_id]
    kyc.identity_verified == true
    kyc.address_verified == true
    kyc.documents_valid == true
}

risk_assessment_current(user_id) if {
    assessment := data.risk_assessments[user_id]
    age_days := (time.now_ns() - assessment.timestamp) / (24 * 3600 * 1000000000)
    age_days <= 365  # Annual refresh required
}

Healthcare (HIPAA)

package controlcore.compliance.hipaa

import rego.v1

# HIPAA: Minimum necessary standard
allow if {
    input.action == "read"
    input.resource.type == "patient_record"
    
    # Only necessary fields for user's role
    requested_fields := input.query.fields
    necessary_fields := minimum_necessary_fields(input.user.role)
    
    every field in requested_fields {
        field in necessary_fields
    }
}

minimum_necessary_fields("doctor") := [
    "patient_id", "name", "dob", "diagnosis",
    "medications", "allergies", "treatments"
]

minimum_necessary_fields("nurse") := [
    "patient_id", "name", "medications", "vital_signs"
]

minimum_necessary_fields("billing") := [
    "patient_id", "name", "insurance", "procedures"
]

# Break-the-glass emergency access
allow if {
    input.emergency == true
    input.user.role in ["doctor", "nurse"]
    
    # Log emergency access for audit
    emergency_access_logged
}

emergency_access_logged if {
    # Would trigger external logging via PIP
    data.pip.audit.log_emergency_access(input)
}

AI Agent Control with Prompt Filtering

package controlcore.ai.prompt_filter

import rego.v1

# Block prompts trying to extract PII
deny if {
    prompt := lower(input.prompt)
    
    # Check for PII extraction attempts
    some pattern in pii_extraction_patterns
    contains(prompt, pattern)
}

pii_extraction_patterns := [
    "show me all email addresses",
    "list all phone numbers",
    "give me social security",
    "display credit card",
    "export user data"
]

# Block prompt injection attempts
deny if {
    prompt := input.prompt
    
    # Check for instruction override attempts
    some pattern in injection_patterns
    regex.match(pattern, prompt)
}

injection_patterns := [
    `(?i)ignore\s+previous\s+instructions`,
    `(?i)disregard\s+.*\s+above`,
    `(?i)system\s*:\s*you\s+are`,
    `(?i)act\s+as\s+if`,
]

# Content filtering for responses
filtered_response := response if {
    response := {
        "content": mask_sensitive_data(input.ai_response),
        "metadata": input.metadata
    }
}

mask_sensitive_data(text) := masked if {
    # Mask SSNs
    step1 := regex.replace(text, `\d{3}-\d{2}-\d{4}`, "***-**-****")
    
    # Mask credit cards
    step2 := regex.replace(step1, `\d{4}\s?\d{4}\s?\d{4}\s?\d{4}`, "**** **** **** ****")
    
    # Mask emails
    masked := regex.replace(step2, `[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}`, "***@***.***")
}

Data Masking Patterns

package controlcore.data.masking

import rego.v1

# Field-level masking based on user clearance
masked_fields contains field if {
    some field in input.response.fields
    field_classification := data.field_classifications[field]
    field_classification > input.user.clearance_level
}

filtered_response := response if {
    response := {k: v |
        some k, v in input.response
        not k in masked_fields
    }
}

# Partial masking for different user types
mask_value(field, value, user_role) := masked if {
    field == "ssn"
    user_role == "customer_service"
    masked := concat("", ["***-**-", substring(value, 7, -1)])
} else := masked if {
    field == "credit_card"
    masked := concat("", ["**** **** **** ", substring(value, 15, -1)])
} else := value  # No masking needed

Time-Based Access Control

package controlcore.temporal

import rego.v1

# Business hours check
is_business_hours if {
    [hour, _, _] := time.clock([time.now_ns(), "America/New_York"])
    [_, _, weekday] := time.date([time.now_ns()])
    
    weekday >= 1  # Monday
    weekday <= 5  # Friday
    hour >= 9
    hour < 17
}

# Maintenance window check
is_maintenance_window if {
    now := time.now_ns()
    some window in data.maintenance_windows
    now >= window.start_time
    now <= window.end_time
}

# Time-limited permissions
allow if {
    input.action == "write"
    temp_access := data.temporary_access[input.user.id]
    time.now_ns() <= temp_access.expires_at
}

Location-Based Restrictions

package controlcore.geo

import rego.v1

# Allowed countries
allow if {
    input.action == "access"
    
    location := data.pip.ipgeo.lookup[input.request.ip]
    location.country in data.allowed_countries
}

# Distance-based access
allow if {
    user_location := input.user.location
    resource_location := data.resources[input.resource.id].location
    
    distance_km := geo_distance(user_location, resource_location)
    distance_km <= data.max_distance_km
}

# Calculate distance between coordinates
geo_distance(loc1, loc2) := distance_km if {
    # Haversine formula
    lat1 := loc1.lat * 3.14159265359 / 180
    lat2 := loc2.lat * 3.14159265359 / 180
    dlat := lat2 - lat1
    dlon := (loc2.lon - loc1.lon) * 3.14159265359 / 180
    
    a := (sin(dlat/2) * sin(dlat/2)) + 
         (cos(lat1) * cos(lat2) * sin(dlon/2) * sin(dlon/2))
    c := 2 * atan2(sqrt(a), sqrt(1-a))
    
    distance_km := 6371 * c  # Earth's radius in km
}

You're now equipped to write powerful, secure, and efficient Rego policies! Start with simple patterns and gradually incorporate more advanced techniques as needed. The no-code policy builder is great for basic policies, but use the code editor when you need these advanced features.

For hands-on practice, use the Control Core policy editor with live testing.