Rego Policy Language Guidelines

This comprehensive guide covers the Rego policy language as used in Control Core, including language basics, Control Core-specific patterns, real-world examples, and best practices.

API-First References

Use these pages alongside this guide when implementing policy workflows in your IDE or CI pipeline:

Introduction to Rego

Rego is a declarative policy language designed for expressing authorization rules. It's used by Open Policy Agent (OPA) and forms the foundation of Control Core's policy engine.

Why Rego?

  • Declarative: Express what should happen, not how
  • Expressive: Handle complex authorization logic elegantly
  • Testable: Built-in support for unit testing
  • Fast: Sub-millisecond evaluation with caching
  • Safe: No side effects, no infinite loops

Key Concepts

Rules: Define authorization decisions

allow if {
    input.user.role == "admin"
}

Queries: Ask questions about data

admin_users[user] if {
    user := data.users[_]
    user.role == "admin"
}

Assignments: Compute intermediate values

is_admin := input.user.role == "admin"

Rego Basics

Package Declaration

Every policy starts with a package declaration:

package controlcore.policy

# Your rules here

Best Practice: Use hierarchical naming:

  • controlcore.policy.api - API policies
  • controlcore.policy.data - Data access policies
  • controlcore.policy.ai - AI agent policies

Imports

Import built-in functions and future keywords:

import rego.v1           # Use Rego v1 syntax
import future.keywords   # Use future keywords (if, contains, etc.)

Recommended imports for Control Core:

package controlcore.policy

import rego.v1
import future.keywords.if
import future.keywords.in
import future.keywords.contains
import future.keywords.every

Data Types

Strings:

name := "John Doe"
role := "admin"

Numbers:

age := 30
clearance_level := 3
rate_limit := 1000

Booleans:

is_active := true
mfa_enabled := false

Arrays:

roles := ["admin", "developer", "manager"]
permissions := ["read", "write", "delete"]

Objects:

user := {
    "id": "user-123",
    "name": "John Doe",
    "roles": ["admin"],
    "attributes": {
        "department": "Engineering"
    }
}

Sets:

allowed_roles := {"admin", "manager", "developer"}

Null:

value := null

Variables and Assignment

Simple assignment:

user_id := input.user.id
role := input.user.roles[0]

Pattern matching:

user := input.user
action := input.action.name

Iteration:

# Iterate over array
role := input.user.roles[_]

# Iterate over object
value := input.resource.attributes[key]

# Iterate with index
role := input.user.roles[i]

Operators

Comparison:

x == y   # Equal
x != y   # Not equal
x < y    # Less than
x <= y   # Less than or equal
x > y    # Greater than
x >= y   # Greater than or equal

Logical:

x; y     # OR (either x or y)
x
y        # AND (both x and y)
not x    # NOT

Arithmetic:

x + y    # Addition
x - y    # Subtraction
x * y    # Multiplication
x / y    # Division
x % y    # Modulo

Set operations:

x | y    # Union
x & y    # Intersection
x - y    # Difference

String operations:

contains(x, y)        # String contains
startswith(x, y)      # Starts with
endswith(x, y)        # Ends with

Array operations:

x in array           # Element in array
count(array)         # Array length

Control Core Input Schema

All Control Core policies receive this input structure:

package controlcore.policy

import rego.v1

# Input is provided by Control Core
# {
#   "user": {
#     "id": "user-123",
#     "email": "user@company.com",
#     "roles": ["developer", "team-lead"],
#     "department": "Engineering",
#     "attributes": {
#       "clearance_level": 3,
#       "manager": "user-456",
#       "mfa_enabled": true
#     }
#   },
#   "resource": {
#     "id": "resource-789",
#     "type": "api",
#     "path": "/api/v1/users",
#     "attributes": {
#       "owner": "user-123",
#       "sensitivity": "confidential",
#       "environment": "production"
#     }
#   },
#   "action": {
#     "name": "read",
#     "method": "GET"
#   },
#   "context": {
#     "ip_address": "192.168.1.100",
#     "time": "2025-01-25T10:30:00Z",
#     "request_id": "req-123"
#   },
#   "workload": {                    // Optional - when Work ID is enabled
#     "spiffe_id": "spiffe://trust-domain/bouncer/bouncer-id"
#   }
# }

# Access input fields
user_id := input.user.id
user_roles := input.user.roles
resource_type := input.resource.type
action_name := input.action.name

# Workload identity (when Work ID enabled and mTLS present)
spiffe_id := input.workload.spiffe_id  # e.g. "spiffe://trust-domain/bouncer/bouncer-id"

Workload Identity: When Work ID is enabled, input.workload.spiffe_id contains the workload identity from the client certificate (when mTLS is used). Use it for workload-based authorization. See Work ID Configuration.

Basic Policy Patterns

Pattern 1: Simple Allow/Deny

package controlcore.policy

import rego.v1

# Default deny (fail secure)
default allow := false

# Allow admin users
allow if {
    input.user.roles[_] == "admin"
}

Pattern 2: Multiple Conditions (AND)

package controlcore.policy

import rego.v1

default allow := false

# All conditions must be true
allow if {
    input.user.roles[_] == "developer"
    input.resource.type == "api"
    input.action.name == "read"
    input.resource.attributes.environment == "development"
}

Pattern 3: Alternative Conditions (OR)

package controlcore.policy

import rego.v1

default allow := false

# Method 1: Multiple rules (implicit OR)
allow if {
    input.user.roles[_] == "admin"
}

allow if {
    input.user.roles[_] == "manager"
}

# Method 2: Using 'in'
allow if {
    input.user.roles[_] in ["admin", "manager", "owner"]
}

Pattern 4: Negative Conditions

package controlcore.policy

import rego.v1

default allow := false

allow if {
    # User has developer role
    input.user.roles[_] == "developer"
    
    # BUT not on suspended list
    not user_suspended
}

user_suspended if {
    input.user.id in data.suspended_users
}

Pattern 5: Helper Functions

package controlcore.policy

import rego.v1

default allow := false

# Helper function
is_admin if {
    input.user.roles[_] == "admin"
}

is_manager if {
    input.user.roles[_] == "manager"
}

has_elevated_access if {
    is_admin
}

has_elevated_access if {
    is_manager
}

# Use in main rule
allow if {
    has_elevated_access
    input.action.name == "write"
}

Advanced Patterns

Pattern 1: Set Operations

package controlcore.policy

import rego.v1

default allow := false

required_roles := {"admin", "security-officer"}
user_roles := {role | role := input.user.roles[_]}

# User has all required roles
allow if {
    required_roles & user_roles == required_roles
}

# Alternative: using every
allow if {
    every role in required_roles {
        role in user_roles
    }
}

Pattern 2: Comprehensions

package controlcore.policy

import rego.v1

# Array comprehension
admin_users := [user |
    user := data.users[_]
    user.roles[_] == "admin"
]

# Object comprehension
user_map := {user.id: user.name |
    user := data.users[_]
}

# Set comprehension
admin_emails := {user.email |
    user := data.users[_]
    user.roles[_] == "admin"
}

Pattern 3: Recursive Rules

package controlcore.policy

import rego.v1

# Check if user is in reporting chain
is_manager_of(user_id, employee_id) if {
    employee := data.users[employee_id]
    employee.manager == user_id
}

# Recursive: Check entire chain
is_manager_of(user_id, employee_id) if {
    employee := data.users[employee_id]
    employee.manager != user_id
    is_manager_of(user_id, employee.manager)
}

# Allow managers to access their reports' data
allow if {
    is_manager_of(input.user.id, input.resource.attributes.owner)
}

Pattern 4: Dynamic Rules

package controlcore.policy

import rego.v1

# Generate rules dynamically based on data
allow if {
    policy := data.dynamic_policies[input.resource.type]
    evaluate_policy(policy)
}

evaluate_policy(policy) if {
    required_role := policy.required_role
    input.user.roles[_] == required_role
}

Control Core Specific Patterns

Pattern 1: Role-Based Access Control (RBAC)

package controlcore.policy

import rego.v1

default allow := false

# Role hierarchy
role_hierarchy := {
    "admin": 4,
    "manager": 3,
    "developer": 2,
    "viewer": 1
}

# Get user's highest role level
user_level := max([level |
    role := input.user.roles[_]
    level := role_hierarchy[role]
])

# Required level for action
required_levels := {
    "delete": 4,
    "write": 3,
    "read": 2,
    "view": 1
}

# Allow if user level >= required level
allow if {
    required := required_levels[input.action.name]
    user_level >= required
}

Pattern 2: Attribute-Based Access Control (ABAC)

package controlcore.policy

import rego.v1

default allow := false

allow if {
    # User attributes
    input.user.department == input.resource.attributes.department
    input.user.attributes.clearance_level >= input.resource.attributes.required_clearance
    
    # Resource attributes
    input.resource.attributes.environment in ["development", "staging"]
    
    # Action attributes
    input.action.name in ["read", "list"]
    
    # Context attributes
    is_business_hours
}

is_business_hours if {
    hour := time.clock([time.now_ns()])[0]
    hour >= 9
    hour < 17
}

Pattern 3: Resource Ownership

package controlcore.policy

import rego.v1

default allow := false

# Users can access their own resources
allow if {
    input.resource.attributes.owner == input.user.id
}

# Users can access resources in their department
allow if {
    input.resource.attributes.department == input.user.department
    input.action.name in ["read", "list"]
}

# Managers can access team resources
allow if {
    input.user.roles[_] == "manager"
    employee_ids := {id | 
        employee := data.users[_]
        employee.manager == input.user.id
        id := employee.id
    }
    input.resource.attributes.owner in employee_ids
}

Pattern 4: Data Filtering (Row-Level Security)

package controlcore.policy

import rego.v1

# Filter visible rows based on user permissions
visible_rows[row] if {
    some row in input.query.result
    row.owner_id == input.user.id
}

visible_rows[row] if {
    some row in input.query.result
    input.user.roles[_] == "manager"
    row.department == input.user.department
}

visible_rows[row] if {
    some row in input.query.result
    input.user.roles[_] == "admin"
}

# Mask sensitive columns
masked_data := {key: value |
    some key, value in input.data
    not key in sensitive_fields
}

masked_data[key] := "***REDACTED***" if {
    some key, _ in input.data
    key in sensitive_fields
    not user_can_view_sensitive
}

sensitive_fields := {"ssn", "salary", "password", "api_key"}

user_can_view_sensitive if {
    input.user.attributes.clearance_level >= 3
}

AI/LLM Policy Patterns

Pattern 1: AI Agent Access Control

package controlcore.policy.ai

import rego.v1

default allow := false

# Allow AI agent with safety checks
allow if {
    # User authorized for AI
    input.user.roles[_] in ["developer", "data-scientist", "ai-engineer"]
    
    # Prompt safety validated
    prompt_safe
    
    # Usage limits not exceeded
    within_usage_limits
    
    # Content policy compliant
    content_compliant
}

# Validate prompt safety
prompt_safe if {
    prompt := input.context.prompt
    
    # Check length
    count(prompt) < 4000
    
    # No sensitive keywords
    not contains_sensitive_keywords(prompt)
    
    # Safety score acceptable
    input.context.safety_score > 0.8
}

contains_sensitive_keywords(prompt) if {
    sensitive := ["password", "api-key", "secret", "private-key", "token"]
    lower_prompt := lower(prompt)
    some keyword in sensitive
    contains(lower_prompt, keyword)
}

# Check usage limits
within_usage_limits if {
    user_stats := data.usage_stats[input.user.id]
    user_stats.requests_today < 1000
    user_stats.tokens_today < 100000
}

# Content policy compliance
content_compliant if {
    prompt := input.context.prompt
    not contains(lower(prompt), "malicious")
    not contains(lower(prompt), "hack")
    not contains(lower(prompt), "exploit")
}

Pattern 2: LLM Context-Aware Policy

package controlcore.policy.llm

import rego.v1

default allow := false

# Allow LLM access with context checks
allow if {
    user_has_llm_access
    llm_context_valid
    prompt_safety_validated
}

user_has_llm_access if {
    input.user.roles[_] in ["admin", "developer", "analyst"]
    input.user.attributes.llm_access_level in ["full", "limited"]
}

llm_context_valid if {
    context := input.context
    
    # Prompt context available
    context.prompt_context.available == true
    
    # Model capabilities match
    context.model_context.capabilities[_] in ["text_generation", "completion"]
    
    # Performance acceptable
    context.model_context.performance.score > 0.8
}

prompt_safety_validated if {
    prompt := input.context.prompt
    
    # Length check
    prompt.length < 4000
    
    # No sensitive data
    not contains_pii(prompt.content)
    
    # Follows safety guidelines
    follows_safety_guidelines(prompt.content)
}

contains_pii(text) if {
    pii_patterns := ["ssn", "credit card", "password", "api key"]
    lower_text := lower(text)
    some pattern in pii_patterns
    contains(lower_text, pattern)
}

follows_safety_guidelines(text) if {
    prohibited := ["harmful", "dangerous", "illegal", "malicious"]
    lower_text := lower(text)
    every term in prohibited {
        not contains(lower_text, term)
    }
}

Pattern 3: RAG System Authorization

package controlcore.policy.rag

import rego.v1

default allow := false

# Allow RAG system access
allow if {
    user_authorized_for_rag
    query_safe
    retrieval_context_valid
}

user_authorized_for_rag if {
    input.user.roles[_] in ["researcher", "analyst", "developer"]
    input.user.attributes.rag_access == true
}

query_safe if {
    query := input.context.query
    
    # Query not malicious
    not injection_attempt(query)
    
    # Query scope appropriate
    query_scope_valid(query)
}

injection_attempt(query) if {
    injection_patterns := ["'; DROP TABLE", "OR 1=1", "../", "<script>"]
    some pattern in injection_patterns
    contains(query, pattern)
}

query_scope_valid(query) if {
    # User can only query their department's data
    query_scope := input.context.query_scope
    query_scope.department == input.user.department
}

query_scope_valid(query) if {
    # Admins can query all data
    input.user.roles[_] == "admin"
}

retrieval_context_valid if {
    context := input.context.retrieval
    
    # Source documents accessible
    every doc_id in context.source_documents {
        user_can_access_document(doc_id)
    }
    
    # Retrieval count reasonable
    count(context.source_documents) <= 10
}

user_can_access_document(doc_id) if {
    doc := data.documents[doc_id]
    doc.owner == input.user.id
}

user_can_access_document(doc_id) if {
    doc := data.documents[doc_id]
    doc.classification == "public"
}

Pattern 4: Content Injection Rules

package controlcore.policy.content

import rego.v1

# Pre-prompt content injection
pre_prompt_injection := {
    "type": "context_enrichment",
    "content": enriched_context
} if {
    input.context.injection_enabled == true
}

enriched_context := concat("\n", [
    sprintf("User: %s", [input.user.email]),
    sprintf("Department: %s", [input.user.department]),
    sprintf("Clearance Level: %d", [input.user.attributes.clearance_level]),
    "Guidelines: Follow company security policies",
    "Restrictions: Do not share sensitive information"
])

# Post-response filtering
filtered_response := response if {
    response := mask_sensitive_data(input.response)
}

mask_sensitive_data(response) := masked if {
    # Replace sensitive patterns
    masked := replace(response, ssn_pattern, "***-**-****")
}

ssn_pattern := `\d{3}-\d{2}-\d{4}`

Built-in Functions

String Functions

# Concatenation
result := concat("-", ["hello", "world"])  # "hello-world"

# Formatting
message := sprintf("User %s has role %s", [user, role])

# Case conversion
lower_text := lower("HELLO")  # "hello"
upper_text := upper("hello")  # "HELLO"

# String operations
contains("hello world", "world")      # true
startswith("hello", "hel")            # true
endswith("world", "ld")               # true
trim("  hello  ", " ")                # "hello"
replace("hello", "l", "r")            # "herro"

Array Functions

# Count
count([1, 2, 3])  # 3

# Concatenation
concat_array := array.concat([1, 2], [3, 4])  # [1, 2, 3, 4]

# Slice
array.slice([1, 2, 3, 4], 1, 3)  # [2, 3]

Set Functions

# Union
{"a", "b"} | {"b", "c"}  # {"a", "b", "c"}

# Intersection
{"a", "b"} & {"b", "c"}  # {"b"}

# Difference
{"a", "b"} - {"b", "c"}  # {"a"}

Aggregation Functions

# Sum
sum([1, 2, 3, 4])  # 10

# Product
product([2, 3, 4])  # 24

# Max/Min
max([1, 5, 3])  # 5
min([1, 5, 3])  # 1

# Count
count([1, 2, 3])  # 3

Type Functions

# Type checking
is_string("hello")     # true
is_number(42)          # true
is_boolean(true)       # true
is_array([1, 2])       # true
is_object({"a": 1})    # true
is_set({1, 2})         # true
is_null(null)          # true

Time Functions

# Current time (nanoseconds since epoch)
now := time.now_ns()

# Parse time
parsed := time.parse_rfc3339_ns("2025-01-25T10:30:00Z")

# Clock components [hour, minute, second]
[hour, minute, second] := time.clock([now])

# Date components [year, month, day]
[year, month, day] := time.date([now])

# Day of week (0=Sunday, 6=Saturday)
weekday := time.weekday([now])

JSON Functions

# Parse JSON string
data := json.unmarshal('{"name": "John", "age": 30}')

# Marshal to JSON
json_str := json.marshal({"name": "John"})

Encoding Functions

# Base64
encoded := base64.encode("hello")
decoded := base64.decode("aGVsbG8=")

# URL encoding
url_encoded := urlquery.encode("hello world")
url_decoded := urlquery.decode("hello+world")

Crypto Functions

# ⚠️  DEPRECATED: MD5 and SHA1 are deprecated for security reasons
# Use SHA-256 or SHA-512 instead in production code:
sha256_hash := crypto.sha256("hello")
sha512_hash := crypto.sha512("hello")

# MD5 (DEPRECATED - Do not use in production)
# md5_hash := crypto.md5("hello")

# SHA1 (DEPRECATED - Do not use in production)
# sha1_hash := crypto.sha1("hello")

Regex Functions

# Match
regex.match("^[a-z]+$", "hello")  # true

# Find all matches
matches := regex.find_n("[0-9]+", "abc123def456", -1)  # ["123", "456"]

# Replace
result := regex.replace("hello", "l+", "L")  # "heLo"

Testing Policies

Unit Testing

package controlcore.policy

import rego.v1

# Policy under test
default allow := false

allow if {
    input.user.roles[_] == "admin"
}

# Test cases
test_admin_allowed if {
    allow with input as {
        "user": {"roles": ["admin"]},
        "resource": {"type": "api"},
        "action": {"name": "read"}
    }
}

test_developer_denied if {
    not allow with input as {
        "user": {"roles": ["developer"]},
        "resource": {"type": "api"},
        "action": {"name": "read"}
    }
}

test_no_role_denied if {
    not allow with input as {
        "user": {"roles": []},
        "resource": {"type": "api"},
        "action": {"name": "read"}
    }
}

Testing with Mock Data

package controlcore.policy

import rego.v1

# Mock data
mock_users := {
    "user-1": {"id": "user-1", "roles": ["admin"]},
    "user-2": {"id": "user-2", "roles": ["developer"]},
}

test_with_mock_data if {
    allow with input as {
        "user": {"id": "user-1"},
        "resource": {"type": "api"},
        "action": {"name": "write"}
    }
    with data.users as mock_users
}

Best Practices

1. Always Default Deny

# Good: Explicit default deny
default allow := false

allow if {
    # conditions
}

# Bad: No default (implicit undefined)
allow if {
    # conditions
}

2. Use Helper Functions

# Good: Reusable helper functions
is_admin if {
    input.user.roles[_] == "admin"
}

is_resource_owner if {
    input.resource.owner == input.user.id
}

allow if {
    is_admin
}

allow if {
    is_resource_owner
}

# Bad: Duplicate logic
allow if {
    input.user.roles[_] == "admin"
}

allow if {
    input.user.roles[_] == "admin"
    input.resource.type == "api"
}

3. Document Your Policies

package controlcore.policy

import rego.v1

# Policy: API Access Control
# Owner: Security Team
# Last Updated: 2025-01-25
#
# This policy controls access to production APIs based on:
# - User roles and permissions
# - Resource sensitivity levels
# - Time-based restrictions
# - MFA requirements for sensitive operations
#
# Related Documentation: https://wiki.company.com/api-security

default allow := false

# Allow admin users full access
# Admins are defined in the identity provider
allow if {
    input.user.roles[_] == "admin"
}

4. Keep Policies Simple

# Good: Simple and clear
allow if {
    input.user.roles[_] == "developer"
    input.action.name == "read"
}

# Bad: Overly complex
allow if {
    count([r | r := input.user.roles[_]; r in ["developer", "engineer", "coder"]]) > 0
    [a | a := ["read", "get", "list"]][_] == input.action.name
}

5. Use Type Checking

# Good: Check types
allow if {
    is_string(input.user.id)
    is_array(input.user.roles)
    count(input.user.roles) > 0
    input.user.roles[_] == "admin"
}

# Bad: Assume types
allow if {
    input.user.roles[_] == "admin"  # Could fail if roles is not an array
}

Performance Optimization

1. Use Early Returns

# Good: Check cheap conditions first
allow if {
    input.action.name == "read"  # Quick check
    input.user.roles[_] == "developer"  # Quick check
    expensive_database_lookup  # Expensive check last
}

# Bad: Expensive check first
allow if {
    expensive_database_lookup  # Slow
    input.action.name == "read"  # Fast but checked last
}

2. Cache-Friendly Policies

# Good: Deterministic (cacheable)
allow if {
    input.user.roles[_] == "admin"
    input.resource.type == "api"
}

# Bad: Non-deterministic (not cacheable)
allow if {
    time.now_ns() < input.session.expiry  # Changes every nanosecond!
}

# Better: Use reasonable granularity
allow if {
    current_hour := time.clock([time.now_ns()])[0]
    current_hour < 17  # Changes hourly, more cacheable
}

3. Avoid Expensive Operations

# Good: Simple operations
allow if {
    input.user.department == "Engineering"
}

# Bad: Complex iteration
allow if {
    count([u | u := data.users[_]; u.department == "Engineering"]) > 100
}

Regulatory Compliance Patterns

FINTRAC Compliance (Canada)

Large Cash Transaction Reporting (LCTR):

package fintrac.lctr

import rego.v1

default allow := false
default requires_lctr := false

# FINTRAC: Large Cash Transaction Report required for CAD $10,000+
requires_lctr if {
    input.transaction.amount >= 10000
    input.transaction.currency == "CAD"
    input.transaction.type in ["cash", "cash_equivalent"]
}

# Allow with LCTR requirement
allow if {
    # User must be FINTRAC-trained
    input.user.attributes.fintrac_certified == true
    input.user.roles[_] in ["teller", "financial-advisor", "compliance-officer"]
    
    # Required transaction details present
    input.transaction.customer_id != null
    input.transaction.conductor_id != null
    input.transaction.purpose != null
    input.transaction.source_of_funds != null
    
    # Create LCTR if required
    requires_lctr
}

# Suspicious Transaction Report (STR) Detection
str_indicators[reason] if {
    # Structuring: Multiple transactions just under reporting threshold
    recent_txns := data.transactions_last_30_days[input.transaction.customer_id]
    near_threshold := [t | t := recent_txns[_]; t.amount >= 9000; t.amount < 10000]
    count(near_threshold) >= 3
    reason := "possible_structuring"
}

str_indicators[reason] if {
    # Unusual pattern: Amount significantly higher than normal
    customer := data.customers[input.transaction.customer_id]
    input.transaction.amount > (customer.average_transaction * 10)
    reason := "unusual_amount"
}

str_indicators[reason] if {
    # Rapid movement of funds
    input.transaction.type == "wire_transfer"
    input.transaction.received_within_hours < 24
    input.transaction.immediate_withdrawal == true
    reason := "rapid_movement_of_funds"
}

# Block suspicious transactions for compliance review
allow if {
    count(str_indicators) == 0
    input.user.attributes.aml_cleared == true
}

# Compliance officer can process flagged transactions
allow if {
    count(str_indicators) > 0
    input.user.roles[_] == "compliance-officer"
    input.transaction.compliance_review_completed == true
}

Third Party Determination:

package fintrac.third_party

import rego.v1

# FINTRAC: Identify if conducting on behalf of third party
third_party_determination_required if {
    input.transaction.amount >= 10000
    input.transaction.customer_id != input.transaction.conductor_id
}

allow if {
    third_party_determination_required
    
    # Third party information collected
    input.transaction.third_party.name != null
    input.transaction.third_party.address != null
    input.transaction.third_party.relationship != null
    
    # User verified third party information
    input.user.roles[_] in ["teller", "compliance-officer"]
}

OSFI Guidelines (Canada)

Guideline B-10: Outsourcing of Business Activities:

package osfi.outsourcing

import rego.v1

default allow := false

# OSFI B-10: Third-party access requires controls
allow if {
    input.user.type == "third_party_vendor"
    
    # Vendor must be approved
    vendor := data.approved_vendors[input.user.vendor_id]
    vendor.approval_status == "active"
    vendor.due_diligence_completed == true
    
    # Access limited to contracted services only
    input.resource.type in vendor.contracted_services
    
    # Monitoring and audit enabled
    input.context.audit_enabled == true
    input.context.vendor_activity_logged == true
}

# OSFI: Segregation of Duties
allow if {
    not has_conflicting_duties(input.user.roles)
    duty_appropriate_for_action(input.user.roles, input.action.name)
}

has_conflicting_duties(roles) if {
    "maker" in roles
    "checker" in roles
}

has_conflicting_duties(roles) if {
    "initiator" in roles
    "approver" in roles
}

Guideline E-21: Technology and Cyber Security:

package osfi.cyber_security

import rego.v1

default allow := false

# OSFI E-21: Multi-Factor Authentication for sensitive operations
allow if {
    input.resource.attributes.sensitivity in ["confidential", "restricted"]
    
    # MFA required
    input.user.attributes.mfa_verified == true
    input.context.mfa_timestamp_age_seconds < 300  # MFA within last 5 minutes
    
    # Access from approved location
    input.context.ip_address in data.approved_ip_ranges
}

# OSFI E-21: Privileged Access Management
allow if {
    input.user.type == "privileged_user"
    
    # Privileged sessions must be recorded
    input.context.session_recording_enabled == true
    
    # Just-in-time access (time-limited)
    session_timestamp := time.parse_rfc3339_ns(input.context.session_start)
    session_age_seconds := (time.now_ns() - session_timestamp) / 1000000000
    session_age_seconds < 3600  # 1 hour max
    
    # Approval required
    input.context.privileged_access_approved == true
}

AML Regulations (US & Canada)

Transaction Monitoring:

package aml.monitoring

import rego.v1

default allow := false

# AML: Currency Transaction Report (CTR) for USD $10,000+
requires_ctr if {
    input.transaction.amount >= 10000
    input.transaction.currency == "USD"
}

# AML: Suspicious Activity Report (SAR) triggers
sar_triggers[trigger] if {
    # Trigger 1: Transactions with no apparent business purpose
    input.transaction.business_purpose == "unknown"
    input.transaction.amount >= 5000
    trigger := "no_business_purpose"
}

sar_triggers[trigger] if {
    # Trigger 2: Customer refuses to provide information
    customer := data.customers[input.transaction.customer_id]
    customer.info_refusal_count > 0
    trigger := "customer_refused_information"
}

sar_triggers[trigger] if {
    # Trigger 3: Funds transfers to high-risk jurisdictions
    input.transaction.type == "wire_transfer"
    input.transaction.destination_country in data.high_risk_countries
    trigger := "high_risk_jurisdiction"
}

# Allow with SAR filing requirement
allow if {
    count(sar_triggers) > 0
    input.user.roles[_] == "compliance-officer"
    input.transaction.sar_filed == true
}

allow if {
    count(sar_triggers) == 0
    requires_ctr
    input.transaction.ctr_filed == true
}

Customer Due Diligence (CDD):

package aml.cdd

import rego.v1

default allow := false

# AML: Customer Due Diligence requirements
allow if {
    customer := data.customers[input.resource.attributes.customer_id]
    
    # Standard CDD (all customers)
    customer.identity_verified == true
    customer.name != null
    customer.address != null
    customer.date_of_birth != null
    customer.identification_number != null
    
    # Beneficial ownership (for entities)
    beneficial_ownership_satisfied(customer)
    
    # Risk assessment completed
    customer.risk_assessment_date != null
    risk_assessment_current(customer.risk_assessment_date)
}

beneficial_ownership_satisfied(customer) if {
    customer.type == "individual"
}

beneficial_ownership_satisfied(customer) if {
    customer.type == "entity"
    count(customer.beneficial_owners) > 0
    every owner in customer.beneficial_owners {
        owner.ownership_percentage >= 25
        owner.identity_verified == true
    }
}

risk_assessment_current(assessment_date) if {
    days_since := (time.now_ns() - time.parse_rfc3339_ns(assessment_date)) / (24 * 60 * 60 * 1000000000)
    days_since < 365  # Annual review required
}

Office of Foreign Assets Control (OFAC) Screening:

package aml.ofac

import rego.v1

default allow := false

# OFAC: Sanctions screening required
allow if {
    customer := data.customers[input.transaction.customer_id]
    
    # Not on OFAC SDN (Specially Designated Nationals) list
    not customer.id in data.ofac_sdn_list
    not customer.name in data.ofac_sdn_names
    
    # Not transacting with sanctioned countries
    not input.transaction.destination_country in data.ofac_sanctioned_countries
    not input.transaction.origination_country in data.ofac_sanctioned_countries
    
    # Screening completed recently
    screening_age := (time.now_ns() - time.parse_rfc3339_ns(customer.last_ofac_screening)) / (24 * 60 * 60 * 1000000000)
    screening_age < 1  # Daily screening
}

# OFAC: 50% Rule (entities owned by sanctioned persons)
allow if {
    entity := data.entities[input.transaction.entity_id]
    
    # Check all owners
    sanctioned_ownership := sum([owner.ownership_percentage |
        owner := entity.owners[_]
        owner.id in data.ofac_sdn_list
    ])
    
    # Total sanctioned ownership must be < 50%
    sanctioned_ownership < 50
}

FinCEN Regulations (US)

Bank Secrecy Act (BSA) Compliance:

package fincen.bsa

import rego.v1

default allow := false

# BSA: Currency Transaction Report (CTR) for $10,000+
requires_ctr if {
    input.transaction.amount >= 10000
    input.transaction.currency == "USD"
    input.transaction.type in ["cash", "deposit", "withdrawal"]
}

# BSA: Suspicious Activity Report (SAR)
requires_sar if {
    count(suspicious_indicators) >= 2
}

suspicious_indicators[indicator] if {
    # Multiple transactions just under $10,000
    recent := data.transactions_last_7_days[input.transaction.customer_id]
    count([t | t := recent[_]; t.amount >= 9000; t.amount < 10000]) >= 2
    indicator := "structuring"
}

suspicious_indicators[indicator] if {
    # Unusual transaction for customer profile
    customer := data.customers[input.transaction.customer_id]
    input.transaction.amount > (customer.average_monthly_transactions * 5)
    indicator := "unusual_activity"
}

suspicious_indicators[indicator] if {
    # Transactions with known high-risk indicators
    input.transaction.destination_country in data.high_risk_jurisdictions
    input.transaction.amount >= 5000
    indicator := "high_risk_jurisdiction"
}

allow if {
    requires_ctr
    input.transaction.ctr_filed == true
    input.user.roles[_] in ["teller", "compliance-officer"]
}

allow if {
    requires_sar
    input.transaction.sar_filed == true
    input.user.roles[_] == "compliance-officer"
    input.transaction.senior_management_notified == true
}

Customer Identification Program (CIP):

package fincen.cip

import rego.v1

default allow := false

# FinCEN: CIP requirements for account opening
allow if {
    input.action.name == "open_account"
    customer := input.request.customer_info
    
    # Four key pieces of information required
    customer.name != null
    customer.date_of_birth != null
    customer.address != null
    customer.identification_number != null  # SSN or ITIN
    
    # Identification document verified
    customer.id_verification.method in ["documentary", "non_documentary"]
    customer.id_verification.status == "verified"
    
    # User authorized to open accounts
    input.user.roles[_] in ["account-officer", "branch-manager"]
}

GDPR Compliance (EU)

Right to Access (Article 15):

package gdpr.access

import rego.v1

default allow := false

# GDPR: Data subject has right to access their personal data
allow if {
    input.action.name == "read"
    input.resource.type == "personal_data"
    
    # User accessing their own data
    input.resource.attributes.data_subject_id == input.user.id
}

# Allow data protection officer to access for compliance
allow if {
    input.user.roles[_] == "data-protection-officer"
    input.context.purpose == "compliance_review"
    input.context.audit_enabled == true
}

Right to Erasure (Article 17):

package gdpr.erasure

import rego.v1

default allow := false

# GDPR: Right to be forgotten
allow if {
    input.action.name == "delete"
    input.resource.type == "personal_data"
    
    # Verify erasure request
    erasure_request := data.erasure_requests[input.resource.attributes.data_subject_id]
    erasure_request.status == "approved"
    erasure_request.legal_grounds_verified == true
    
    # Only DPO can execute erasure
    input.user.roles[_] == "data-protection-officer"
    
    # No legal obligation to retain
    not legal_retention_required(input.resource)
}

legal_retention_required(resource) if {
    # Financial records must be kept for 7 years
    resource.attributes.type == "financial_record"
    record_age_days := (time.now_ns() - time.parse_rfc3339_ns(resource.attributes.created_at)) / (24 * 60 * 60 * 1000000000)
    record_age_days < 2555  # 7 years
}

Data Processing Purpose Limitation (Article 5):

package gdpr.purpose_limitation

import rego.v1

default allow := false

# GDPR: Data can only be used for specified, explicit purposes
allow if {
    consent := data.user_consents[input.resource.attributes.data_subject_id]
    
    # Purpose matches consent
    input.context.processing_purpose in consent.approved_purposes
    
    # Consent is still valid
    consent.status == "active"
    consent.withdrawal_date == null
}

HIPAA Compliance (US Healthcare)

Minimum Necessary Rule:

package hipaa.minimum_necessary

import rego.v1

default allow := false

# HIPAA: Access only minimum necessary PHI
allow if {
    input.user.roles[_] in ["physician", "nurse", "healthcare-provider"]
    
    # User is part of patient's care team
    patient_id := input.resource.attributes.patient_id
    care_team := data.patient_care_teams[patient_id]
    input.user.id in care_team.members
    
    # Purpose of use is valid
    input.context.purpose_of_use in ["treatment", "payment", "healthcare-operations"]
    
    # Access limited to necessary fields only
    requested_fields := input.query.fields
    every field in requested_fields {
        field in necessary_fields_for_purpose[input.context.purpose_of_use]
    }
}

necessary_fields_for_purpose := {
    "treatment": ["patient_id", "diagnosis", "medications", "allergies", "vital_signs"],
    "payment": ["patient_id", "insurance_info", "billing_codes", "charges"],
    "healthcare-operations": ["patient_id", "encounter_date", "provider_id"]
}

Break-the-Glass Emergency Access:

package hipaa.emergency_access

import rego.v1

default allow := false

# HIPAA: Emergency access with logging
allow if {
    input.context.emergency == true
    input.user.roles[_] in ["physician", "emergency-physician", "nurse"]
    
    # Emergency access logged
    input.context.audit_enabled == true
    input.context.emergency_justification != null
    
    # Access reviewed post-emergency
    schedule_emergency_access_review(input.user.id, input.resource.id)
}

schedule_emergency_access_review(user_id, resource_id) if {
    # Creates audit record for review
    true
}

PCI-DSS Compliance

Cardholder Data Access Control:

package pci.cardholder_data

import rego.v1

default allow := false

# PCI-DSS Requirement 7: Restrict access to cardholder data
allow if {
    input.resource.attributes.contains_cardholder_data == true
    
    # User has business need to know
    input.user.attributes.pci_access_justification != null
    input.user.roles[_] in data.pci_authorized_roles
    
    # Access from secured network only
    input.context.network_segment == "cardholder_data_environment"
    
    # Strong authentication
    input.user.attributes.mfa_verified == true
    
    # Automatic logout after 15 minutes
    session_age := (time.now_ns() - time.parse_rfc3339_ns(input.context.session_start)) / 1000000000
    session_age < 900
}

# PCI-DSS Requirement 8: Unique ID for each user
allow if {
    # Ensure unique user identification
    input.user.id != null
    input.user.id != "shared"
    input.user.id != "generic"
    
    # No shared accounts
    not is_shared_account(input.user.id)
}

is_shared_account(user_id) if {
    user_id in ["admin", "root", "service", "shared"]
}

SOC 2 Compliance

Access Control (CC6.1, CC6.2):

package soc2.access_control

import rego.v1

default allow := false

# SOC 2 CC6.1: Logical access requires authentication
allow if {
    # User must be authenticated
    input.user.authenticated == true
    input.user.authentication_method in ["saml", "oauth", "mfa"]
    
    # Session must be valid
    session_valid(input.context.session_id)
    
    # Authorization based on role
    input.user.roles[_] in authorized_roles_for_resource[input.resource.type]
}

session_valid(session_id) if {
    session := data.active_sessions[session_id]
    session.status == "active"
    
    # Session not expired
    expiry := time.parse_rfc3339_ns(session.expiry)
    time.now_ns() < expiry
}

# SOC 2 CC6.2: Prior to issuing credentials, registry approved
allow if {
    input.action.name == "create_user_account"
    
    # Approval workflow completed
    request := input.request.account_request
    request.manager_approval == true
    request.security_approval == true
    
    # User provisioning follows principle of least privilege
    every role in request.requested_roles {
        role in data.approved_roles
    }
}

Change Management (CC8.1):

package soc2.change_management

import rego.v1

default allow := false

# SOC 2 CC8.1: Changes to policies require approval
allow if {
    input.action.name in ["create_policy", "update_policy", "delete_policy"]
    
    # Change request submitted
    change_request := input.context.change_request
    change_request.id != null
    
    # Approvals obtained
    change_request.technical_approval == true
    change_request.business_approval == true
    
    # Testing completed
    change_request.testing_status == "passed"
    
    # User authorized to deploy
    input.user.roles[_] in ["policy-admin", "devops-engineer"]
}

Troubleshooting

IssueWhat to check
Policy validation errorsEnsure syntax matches Rego rules (e.g. no missing if, correct indentation). Use the policy editor's validation or run tests locally.
Unexpected allow/denyTrace rules with sample input; check that input schema (user, resource, action, context) matches what the bouncer sends.
Tests pass locally but fail in Control PlaneConfirm the same Rego version and built-ins; check that bundled policies and dependencies (e.g. PIP data) are in sync.
Policy slow or timeoutSimplify rules, avoid heavy comprehensions on large collections; use indexing and see Performance Optimization.

For more, see the Troubleshooting Guide.

External Resources

Official Documentation

Tutorials and Learning

Community Resources

Style Guides

Control Core Specific Resources

Quick Reference

Common Patterns Cheat Sheet

# Basic access control
default allow := false
allow if { input.user.roles[_] == "admin" }

# Multiple conditions (AND)
allow if {
    input.user.roles[_] == "developer"
    input.action.name == "read"
}

# Alternative conditions (OR)
allow if { input.user.roles[_] == "admin" }
allow if { input.user.roles[_] == "manager" }

# Set membership
allow if { input.user.roles[_] in ["admin", "manager"] }

# Negation
allow if { not user_suspended }

# Iteration
user_ids := [user.id | user := data.users[_]]

# Helper function
is_admin if { input.user.roles[_] == "admin" }

# Time-based
is_business_hours if {
    hour := time.clock([time.now_ns()])[0]
    hour >= 9; hour < 17
}

# Resource ownership
allow if { input.resource.owner == input.user.id }

# Data filtering
visible_rows[row] if {
    some row in input.query.result
    row.owner == input.user.id
}

Advanced Rego Features

This section covers advanced Rego capabilities for building sophisticated policies that go beyond basic conditions.

Advanced Conditions & Logic

Nested Conditions with some/every

The some and every keywords enable powerful iteration and validation patterns:

Using some for iteration:

# Check if ANY user has admin role
has_admin_user if {
    some user in data.users
    user.role == "admin"
}

# Find admin user with specific permission
admin_with_permission(permission) if {
    some user in data.users
    user.role == "admin"
    permission in user.permissions
}

Using every for validation:

# ALL users must have MFA enabled
all_users_have_mfa if {
    every user in data.users {
        user.mfa_enabled == true
    }
}

# All transactions must be under limit
transactions_valid if {
    every txn in input.transactions {
        txn.amount < data.limits.max_transaction
    }
}

Complex boolean logic:

# Multi-level approval workflow
allow if {
    # Manager approval required
    some approval in input.approvals
    approval.role == "manager"
    approval.status == "approved"
    
    # AND either VP approval OR amount under threshold
    {
        some vp_approval in input.approvals
        vp_approval.role == "vp"
        vp_approval.status == "approved"
    } or {
        input.transaction.amount < 10000
    }
}

Nested Rule Structures

# Hierarchical permission checking
allow if {
    resource_accessible
    action_permitted
}

resource_accessible if {
    input.resource.type == "document"
    document_access_granted
}

document_access_granted if {
    input.resource.classification == "public"
}

document_access_granted if {
    input.resource.classification == "internal"
    employee_access
}

document_access_granted if {
    input.resource.classification == "confidential"
    manager_access
}

employee_access if {
    input.user.employee_id != ""
}

manager_access if {
    input.user.role in ["manager", "director", "vp"]
}

action_permitted if {
    input.action in allowed_actions
}

allowed_actions contains "read" if { employee_access }
allowed_actions contains "write" if { manager_access }
allowed_actions contains "delete" if { input.user.role == "admin" }

Comprehensions

Comprehensions are powerful tools for transforming and filtering data.

Array Comprehensions

Filtering:

# Get all admin users
admin_users := [user |
    some user in data.users
    user.role == "admin"
]

# Get user IDs with high clearance
high_clearance_ids := [user.id |
    some user in data.users
    user.clearance_level >= 5
]

# Complex filtering with multiple conditions
eligible_approvers := [user |
    some user in data.users
    user.role in ["manager", "director"]
    user.department == input.resource.department
    user.active == true
]

Transformation:

# Extract just the names
user_names := [user.name | some user in data.users]

# Create structured list
user_summaries := [{"id": user.id, "name": user.name} |
    some user in data.users
]

# Permission aggregation
all_user_permissions := [perm |
    some user in data.users
    some perm in user.permissions
]

Object Comprehensions

Creating mappings:

# Map user ID to role
user_roles := {user.id: user.role |
    some user in data.users
}

# Build permission matrix
permission_matrix := {user.id: user.permissions |
    some user in data.users
    user.active == true
}

# Department user count
dept_counts := {dept: count(users) |
    some dept in department_list
    users := [u | some u in data.users; u.department == dept]
}

Data transformation:

# Transform array to lookup object
resource_by_id := {resource.id: resource |
    some resource in input.resources
}

# Index by multiple keys
users_by_email := {user.email: user |
    some user in data.users
}

Set Comprehensions

Creating unique sets:

# All unique roles in the system
all_roles := {user.role |
    some user in data.users
}

# All departments with active users
active_departments := {user.department |
    some user in data.users
    user.active == true
}

# Permissions across all admins
admin_permissions := {perm |
    some user in data.users
    user.role == "admin"
    some perm in user.permissions
}

Functions and Helper Rules

Create reusable logic with functions and helper rules.

Helper Rules

# Reusable validation functions
is_admin if {
    input.user.role == "admin"
}

is_manager if {
    input.user.role == "manager"
}

is_employee if {
    input.user.employee_id != ""
}

# Composite helpers
has_elevated_access if { is_admin }
has_elevated_access if { is_manager }

# Parameterized helpers
user_has_role(role) if {
    input.user.roles[_] == role
}

user_has_permission(permission) if {
    input.user.permissions[_] == permission
}

Functions with Arguments

# Calculate risk score
risk_score(transaction) := score if {
    base_score := transaction.amount / 1000
    multiplier := transaction_type_multiplier(transaction.type)
    score := base_score * multiplier
}

transaction_type_multiplier("wire_transfer") := 2.0
transaction_type_multiplier("ach") := 1.0
transaction_type_multiplier("check") := 0.5
transaction_type_multiplier(_) := 1.0  # default

# Permission calculation
effective_permissions(user) := permissions if {
    # Direct permissions
    direct := user.permissions
    
    # Role-based permissions
    role_perms := [perm |
        some role in user.roles
        some perm in data.role_permissions[role]
    ]
    
    # Combine and deduplicate
    permissions := array.concat(direct, role_perms)
}

Custom Validation Functions

# Validate email format
is_valid_email(email) if {
    regex.match(`^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`, email)
}

# Validate phone number
is_valid_phone(phone) if {
    regex.match(`^\+?[1-9]\d{1,14}$`, phone)
}

# Validate credit card (Luhn algorithm)
is_valid_card(number) if {
    # Implementation of Luhn algorithm
    digits := [to_number(d) | some d in split(number, "")]
    sum := luhn_sum(digits, count(digits) - 1, false)
    sum % 10 == 0
}

luhn_sum(digits, index, double) := sum if {
    index < 0
    sum := 0
} else := sum if {
    digit := digits[index]
    value := digit * (1 + to_number(double))
    adjusted := value - (9 * (value > 9))
    sum := adjusted + luhn_sum(digits, index - 1, not double)
}

Data Lookups and External Data

Integrate external data sources into policy decisions.

Accessing the Data Document

# Direct data access
allow if {
    user_role := data.user_roles[input.user.id]
    user_role == "admin"
}

# Hierarchical data access
allow if {
    dept := data.departments[input.user.department]
    dept.classification == "secure"
    input.resource.classification <= dept.max_resource_level
}

PIP Integration Patterns

Policy Information Points (PIPs) provide real-time context:

# User data from Auth0 PIP
allow if {
    # PIP data available at data.pip.auth0
    user_profile := data.pip.auth0.users[input.user.id]
    user_profile.email_verified == true
    user_profile.mfa_enabled == true
}

# Location data from IP Geolocation PIP
allow if {
    # PIP data available at data.pip.ipgeo
    location := data.pip.ipgeo.lookup[input.request.ip]
    location.country in data.allowed_countries
}

# Real-time risk scoring from custom PIP
deny if {
    risk_data := data.pip.risk_engine.score[input.user.id]
    risk_data.score > data.thresholds.max_risk
}

Dynamic Permission Lookups

# Load permissions from external system
user_permissions contains permission if {
    # Fetch from PIP at policy evaluation time
    user_data := data.pip.permissions.users[input.user.id]
    some permission in user_data.permissions
}

# Resource-specific permissions
allow if {
    resource_acl := data.pip.acls.resources[input.resource.id]
    input.user.id in resource_acl.allowed_users
}

# Role hierarchy from LDAP
user_roles contains role if {
    ldap_user := data.pip.ldap.users[input.user.dn]
    some group in ldap_user.groups
    role := data.role_mappings[group]
}

Query Patterns

Advanced query patterns for complex scenarios.

Universal Quantification (every)

# All team members must approve
allow if {
    team := data.teams[input.resource.team_id]
    every member in team.members {
        some approval in input.approvals
        approval.user_id == member.id
        approval.status == "approved"
    }
}

# All documents must be classified
documents_valid if {
    every doc in input.documents {
        doc.classification in ["public", "internal", "confidential", "secret"]
    }
}

# All regions must be compliant
compliant if {
    every region in input.deployment.regions {
        region in data.compliant_regions
        data.region_config[region].encryption_enabled == true
    }
}

Existential Quantification (some)

# At least one admin must approve
allow if {
    some approval in input.approvals
    approval.role == "admin"
    approval.status == "approved"
}

# Any developer can read
allow if {
    input.action == "read"
    some role in input.user.roles
    role == "developer"
}

# Check if any violation exists
has_violations if {
    some check in security_checks
    not check.passed
}

Combined Patterns

# At least 2 managers must approve, and no director can reject
allow if {
    # Count manager approvals
    manager_approvals := count([a |
        some a in input.approvals
        a.role == "manager"
        a.status == "approved"
    ])
    manager_approvals >= 2
    
    # Check no director rejections
    not director_rejected
}

director_rejected if {
    some approval in input.approvals
    approval.role == "director"
    approval.status == "rejected"
}

OPA v1.10.0+ Features

Take advantage of the latest OPA capabilities.

Enhanced Built-in Functions

import rego.v1

# New string functions (OPA 0.60+)
normalized_email := strings.trim_space(lower(input.user.email))

# Template rendering (OPA 0.58+)
message := sprintf("User %s attempted %s on %s", [
    input.user.name,
    input.action,
    input.resource.name
])

# Advanced JSON operations
filtered_data := json.remove(input.sensitive_data, ["/ssn", "/credit_card"])

# UUID generation for audit trails
audit_id := uuid.rfc4122()

Performance Optimizations

Partial evaluation (Enterprise feature):

# Mark rules for partial evaluation
allow if {
    # This will be partially evaluated at compile time
    input.user.department in data.authorized_departments
}

Caching strategies:

# Cache expensive computations
user_permissions := cached_permissions(input.user.id)

cached_permissions(user_id) := permissions if {
    # Complex permission calculation
    permissions := compute_effective_permissions(user_id)
}

API Extensions

Custom built-in functions (requires OPA bundle):

# Use custom functions from your bundle
allow if {
    # Custom cryptographic verification
    custom.verify_signature(input.payload, input.signature, data.public_key)
}

# Custom geospatial functions
allow if {
    distance := custom.geo_distance(
        input.user.location,
        input.resource.location
    )
    distance < data.max_distance_km
}

Real-World Advanced Examples

Financial Compliance (FINTRAC, OSFI)

package controlcore.compliance.fintrac

import rego.v1

# FINTRAC: Large cash transaction reporting
reportable_transaction if {
    input.transaction.type == "cash"
    input.transaction.amount >= 10000  # CAD
    input.transaction.currency == "CAD"
}

# FINTRAC: Suspicious transaction monitoring
suspicious_transaction if {
    # Multiple transactions just below reporting threshold
    recent_transactions := [t |
        some t in data.transactions[input.user.id]
        t.timestamp > time.now_ns() - (24 * 3600 * 1000000000)  # 24 hours
        t.amount >= 9000
        t.amount < 10000
    ]
    count(recent_transactions) >= 3
}

# OSFI: Know Your Client validation
allow if {
    input.action == "high_risk_transaction"
    kyc_complete(input.user.id)
    risk_assessment_current(input.user.id)
}

kyc_complete(user_id) if {
    kyc := data.kyc_records[user_id]
    kyc.identity_verified == true
    kyc.address_verified == true
    kyc.documents_valid == true
}

risk_assessment_current(user_id) if {
    assessment := data.risk_assessments[user_id]
    age_days := (time.now_ns() - assessment.timestamp) / (24 * 3600 * 1000000000)
    age_days <= 365  # Annual refresh required
}

Healthcare (HIPAA)

package controlcore.compliance.hipaa

import rego.v1

# HIPAA: Minimum necessary standard
allow if {
    input.action == "read"
    input.resource.type == "patient_record"
    
    # Only necessary fields for user's role
    requested_fields := input.query.fields
    necessary_fields := minimum_necessary_fields(input.user.role)
    
    every field in requested_fields {
        field in necessary_fields
    }
}

minimum_necessary_fields("doctor") := [
    "patient_id", "name", "dob", "diagnosis",
    "medications", "allergies", "treatments"
]

minimum_necessary_fields("nurse") := [
    "patient_id", "name", "medications", "vital_signs"
]

minimum_necessary_fields("billing") := [
    "patient_id", "name", "insurance", "procedures"
]

# Break-the-glass emergency access
allow if {
    input.emergency == true
    input.user.role in ["doctor", "nurse"]
    
    # Log emergency access for audit
    emergency_access_logged
}

emergency_access_logged if {
    # Would trigger external logging via PIP
    data.pip.audit.log_emergency_access(input)
}

AI Agent Control with Prompt Filtering

package controlcore.ai.prompt_filter

import rego.v1

# Block prompts trying to extract PII
deny if {
    prompt := lower(input.prompt)
    
    # Check for PII extraction attempts
    some pattern in pii_extraction_patterns
    contains(prompt, pattern)
}

pii_extraction_patterns := [
    "show me all email addresses",
    "list all phone numbers",
    "give me social security",
    "display credit card",
    "export user data"
]

# Block prompt injection attempts
deny if {
    prompt := input.prompt
    
    # Check for instruction override attempts
    some pattern in injection_patterns
    regex.match(pattern, prompt)
}

injection_patterns := [
    `(?i)ignore\s+previous\s+instructions`,
    `(?i)disregard\s+.*\s+above`,
    `(?i)system\s*:\s*you\s+are`,
    `(?i)act\s+as\s+if`,
]

# Content filtering for responses
filtered_response := response if {
    response := {
        "content": mask_sensitive_data(input.ai_response),
        "metadata": input.metadata
    }
}

mask_sensitive_data(text) := masked if {
    # Mask SSNs
    step1 := regex.replace(text, `\d{3}-\d{2}-\d{4}`, "***-**-****")
    
    # Mask credit cards
    step2 := regex.replace(step1, `\d{4}\s?\d{4}\s?\d{4}\s?\d{4}`, "**** **** **** ****")
    
    # Mask emails
    masked := regex.replace(step2, `[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}`, "***@***.***")
}

Data Masking Patterns

package controlcore.data.masking

import rego.v1

# Field-level masking based on user clearance
masked_fields contains field if {
    some field in input.response.fields
    field_classification := data.field_classifications[field]
    field_classification > input.user.clearance_level
}

filtered_response := response if {
    response := {k: v |
        some k, v in input.response
        not k in masked_fields
    }
}

# Partial masking for different user types
mask_value(field, value, user_role) := masked if {
    field == "ssn"
    user_role == "customer_service"
    masked := concat("", ["***-**-", substring(value, 7, -1)])
} else := masked if {
    field == "credit_card"
    masked := concat("", ["**** **** **** ", substring(value, 15, -1)])
} else := value  # No masking needed

Time-Based Access Control

package controlcore.temporal

import rego.v1

# Business hours check
is_business_hours if {
    [hour, _, _] := time.clock([time.now_ns(), "America/New_York"])
    [_, _, weekday] := time.date([time.now_ns()])
    
    weekday >= 1  # Monday
    weekday <= 5  # Friday
    hour >= 9
    hour < 17
}

# Maintenance window check
is_maintenance_window if {
    now := time.now_ns()
    some window in data.maintenance_windows
    now >= window.start_time
    now <= window.end_time
}

# Time-limited permissions
allow if {
    input.action == "write"
    temp_access := data.temporary_access[input.user.id]
    time.now_ns() <= temp_access.expires_at
}

Location-Based Restrictions

package controlcore.geo

import rego.v1

# Allowed countries
allow if {
    input.action == "access"
    
    location := data.pip.ipgeo.lookup[input.request.ip]
    location.country in data.allowed_countries
}

# Distance-based access
allow if {
    user_location := input.user.location
    resource_location := data.resources[input.resource.id].location
    
    distance_km := geo_distance(user_location, resource_location)
    distance_km <= data.max_distance_km
}

# Calculate distance between coordinates
geo_distance(loc1, loc2) := distance_km if {
    # Haversine formula
    lat1 := loc1.lat * 3.14159265359 / 180
    lat2 := loc2.lat * 3.14159265359 / 180
    dlat := lat2 - lat1
    dlon := (loc2.lon - loc1.lon) * 3.14159265359 / 180
    
    a := (sin(dlat/2) * sin(dlat/2)) + 
         (cos(lat1) * cos(lat2) * sin(dlon/2) * sin(dlon/2))
    c := 2 * atan2(sqrt(a), sqrt(1-a))
    
    distance_km := 6371 * c  # Earth's radius in km
}

You're now equipped to write powerful, secure, and efficient Rego policies! Start with simple patterns and gradually incorporate more advanced techniques as needed. The no-code policy builder is great for basic policies, but use the code editor when you need these advanced features.

For hands-on practice, use the Control Core policy editor with live testing.