🛡️ Rego Policy Language Guidelines
This comprehensive guide covers the Rego policy language as used in Control Core, including language basics, Control Core-specific patterns, real-world examples, and best practices.
🔧 API-First References
Use these pages alongside this guide when implementing policy workflows in your IDE or CI pipeline:
🛡️ Introduction to Rego
Rego is a declarative policy language designed for expressing authorization rules. It's used by Open Policy Agent (OPA) and forms the foundation of Control Core's policy engine.
Why Rego?
- Declarative: Express what should happen, not how
- Expressive: Handle complex authorization logic elegantly
- Testable: Built-in support for unit testing
- Fast: Sub-millisecond evaluation with caching
- Safe: No side effects, no infinite loops
Key Concepts
Rules: Define authorization decisions
allow if {
input.user.role == "admin"
}
Queries: Ask questions about data
admin_users[user] if {
user := data.users[_]
user.role == "admin"
}
Assignments: Compute intermediate values
is_admin := input.user.role == "admin"
🛡️ Rego Basics
Package Declaration
Every policy starts with a package declaration:
package controlcore.policy
# Your rules here
Best Practice: Use hierarchical naming:
controlcore.policy.api- API policiescontrolcore.policy.data- Data access policiescontrolcore.policy.ai- AI agent policies
Imports
Import built-in functions and future keywords:
import rego.v1 # Use Rego v1 syntax
import future.keywords # Use future keywords (if, contains, etc.)
Recommended imports for Control Core:
package controlcore.policy
import rego.v1
import future.keywords.if
import future.keywords.in
import future.keywords.contains
import future.keywords.every
Data Types
Strings:
name := "John Doe"
role := "admin"
Numbers:
age := 30
clearance_level := 3
rate_limit := 1000
Booleans:
is_active := true
mfa_enabled := false
Arrays:
roles := ["admin", "developer", "manager"]
permissions := ["read", "write", "delete"]
Objects:
user := {
"id": "user-123",
"name": "John Doe",
"roles": ["admin"],
"attributes": {
"department": "Engineering"
}
}
Sets:
allowed_roles := {"admin", "manager", "developer"}
Null:
value := null
Variables and Assignment
Simple assignment:
user_id := input.user.id
role := input.user.roles[0]
Pattern matching:
user := input.user
action := input.action.name
Iteration:
# Iterate over array
role := input.user.roles[_]
# Iterate over object
value := input.resource.attributes[key]
# Iterate with index
role := input.user.roles[i]
Operators
Comparison:
x == y # Equal
x != y # Not equal
x < y # Less than
x <= y # Less than or equal
x > y # Greater than
x >= y # Greater than or equal
Logical:
x; y # OR (either x or y)
x
y # AND (both x and y)
not x # NOT
Arithmetic:
x + y # Addition
x - y # Subtraction
x * y # Multiplication
x / y # Division
x % y # Modulo
Set operations:
x | y # Union
x & y # Intersection
x - y # Difference
String operations:
contains(x, y) # String contains
startswith(x, y) # Starts with
endswith(x, y) # Ends with
Array operations:
x in array # Element in array
count(array) # Array length
📌 Control Core Input Schema
All Control Core policies receive this input structure:
package controlcore.policy
import rego.v1
# Input is provided by Control Core
# {
# "user": {
# "id": "user-123",
# "email": "user@company.com",
# "roles": ["developer", "team-lead"],
# "department": "Engineering",
# "attributes": {
# "clearance_level": 3,
# "manager": "user-456",
# "mfa_enabled": true
# }
# },
# "resource": {
# "id": "resource-789",
# "type": "api",
# "path": "/api/v1/users",
# "attributes": {
# "owner": "user-123",
# "sensitivity": "confidential",
# "environment": "production"
# }
# },
# "action": {
# "name": "read",
# "method": "GET"
# },
# "context": {
# "ip_address": "192.168.1.100",
# "time": "2025-01-25T10:30:00Z",
# "request_id": "req-123"
# },
# "workload": { // Optional - when Work ID is enabled
# "spiffe_id": "spiffe://trust-domain/bouncer/bouncer-id"
# }
# }
# Access input fields
user_id := input.user.id
user_roles := input.user.roles
resource_type := input.resource.type
action_name := input.action.name
# Workload identity (when Work ID enabled and mTLS present)
spiffe_id := input.workload.spiffe_id # e.g. "spiffe://trust-domain/bouncer/bouncer-id"
Workload Identity: When Work ID is enabled, input.workload.spiffe_id contains the workload identity from the client certificate (when mTLS is used). Use it for workload-based authorization. See Work ID Configuration.
🛡️ Basic Policy Patterns
Pattern 1: Simple Allow/Deny
package controlcore.policy
import rego.v1
# Default deny (fail secure)
default allow := false
# Allow admin users
allow if {
input.user.roles[_] == "admin"
}
Pattern 2: Multiple Conditions (AND)
package controlcore.policy
import rego.v1
default allow := false
# All conditions must be true
allow if {
input.user.roles[_] == "developer"
input.resource.type == "api"
input.action.name == "read"
input.resource.attributes.environment == "development"
}
Pattern 3: Alternative Conditions (OR)
package controlcore.policy
import rego.v1
default allow := false
# Method 1: Multiple rules (implicit OR)
allow if {
input.user.roles[_] == "admin"
}
allow if {
input.user.roles[_] == "manager"
}
# Method 2: Using 'in'
allow if {
input.user.roles[_] in ["admin", "manager", "owner"]
}
Pattern 4: Negative Conditions
package controlcore.policy
import rego.v1
default allow := false
allow if {
# User has developer role
input.user.roles[_] == "developer"
# BUT not on suspended list
not user_suspended
}
user_suspended if {
input.user.id in data.suspended_users
}
Pattern 5: Helper Functions
package controlcore.policy
import rego.v1
default allow := false
# Helper function
is_admin if {
input.user.roles[_] == "admin"
}
is_manager if {
input.user.roles[_] == "manager"
}
has_elevated_access if {
is_admin
}
has_elevated_access if {
is_manager
}
# Use in main rule
allow if {
has_elevated_access
input.action.name == "write"
}
📌 Advanced Patterns
Pattern 1: Set Operations
package controlcore.policy
import rego.v1
default allow := false
required_roles := {"admin", "security-officer"}
user_roles := {role | role := input.user.roles[_]}
# User has all required roles
allow if {
required_roles & user_roles == required_roles
}
# Alternative: using every
allow if {
every role in required_roles {
role in user_roles
}
}
Pattern 2: Comprehensions
package controlcore.policy
import rego.v1
# Array comprehension
admin_users := [user |
user := data.users[_]
user.roles[_] == "admin"
]
# Object comprehension
user_map := {user.id: user.name |
user := data.users[_]
}
# Set comprehension
admin_emails := {user.email |
user := data.users[_]
user.roles[_] == "admin"
}
Pattern 3: Recursive Rules
package controlcore.policy
import rego.v1
# Check if user is in reporting chain
is_manager_of(user_id, employee_id) if {
employee := data.users[employee_id]
employee.manager == user_id
}
# Recursive: Check entire chain
is_manager_of(user_id, employee_id) if {
employee := data.users[employee_id]
employee.manager != user_id
is_manager_of(user_id, employee.manager)
}
# Allow managers to access their reports' data
allow if {
is_manager_of(input.user.id, input.resource.attributes.owner)
}
Pattern 4: Dynamic Rules
package controlcore.policy
import rego.v1
# Generate rules dynamically based on data
allow if {
policy := data.dynamic_policies[input.resource.type]
evaluate_policy(policy)
}
evaluate_policy(policy) if {
required_role := policy.required_role
input.user.roles[_] == required_role
}
📌 Control Core Specific Patterns
Pattern 1: Role-Based Access Control (RBAC)
package controlcore.policy
import rego.v1
default allow := false
# Role hierarchy
role_hierarchy := {
"admin": 4,
"manager": 3,
"developer": 2,
"viewer": 1
}
# Get user's highest role level
user_level := max([level |
role := input.user.roles[_]
level := role_hierarchy[role]
])
# Required level for action
required_levels := {
"delete": 4,
"write": 3,
"read": 2,
"view": 1
}
# Allow if user level >= required level
allow if {
required := required_levels[input.action.name]
user_level >= required
}
Pattern 2: Attribute-Based Access Control (ABAC)
package controlcore.policy
import rego.v1
default allow := false
allow if {
# User attributes
input.user.department == input.resource.attributes.department
input.user.attributes.clearance_level >= input.resource.attributes.required_clearance
# Resource attributes
input.resource.attributes.environment in ["development", "staging"]
# Action attributes
input.action.name in ["read", "list"]
# Context attributes
is_business_hours
}
is_business_hours if {
hour := time.clock([time.now_ns()])[0]
hour >= 9
hour < 17
}
Pattern 3: Resource Ownership
package controlcore.policy
import rego.v1
default allow := false
# Users can access their own resources
allow if {
input.resource.attributes.owner == input.user.id
}
# Users can access resources in their department
allow if {
input.resource.attributes.department == input.user.department
input.action.name in ["read", "list"]
}
# Managers can access team resources
allow if {
input.user.roles[_] == "manager"
employee_ids := {id |
employee := data.users[_]
employee.manager == input.user.id
id := employee.id
}
input.resource.attributes.owner in employee_ids
}
Pattern 4: Data Filtering (Row-Level Security)
package controlcore.policy
import rego.v1
# Filter visible rows based on user permissions
visible_rows[row] if {
some row in input.query.result
row.owner_id == input.user.id
}
visible_rows[row] if {
some row in input.query.result
input.user.roles[_] == "manager"
row.department == input.user.department
}
visible_rows[row] if {
some row in input.query.result
input.user.roles[_] == "admin"
}
# Mask sensitive columns
masked_data := {key: value |
some key, value in input.data
not key in sensitive_fields
}
masked_data[key] := "***REDACTED***" if {
some key, _ in input.data
key in sensitive_fields
not user_can_view_sensitive
}
sensitive_fields := {"ssn", "salary", "password", "api_key"}
user_can_view_sensitive if {
input.user.attributes.clearance_level >= 3
}
🤖 AI/LLM Policy Patterns
Pattern 1: AI Agent Access Control
package controlcore.policy.ai
import rego.v1
default allow := false
# Allow AI agent with safety checks
allow if {
# User authorized for AI
input.user.roles[_] in ["developer", "data-scientist", "ai-engineer"]
# Prompt safety validated
prompt_safe
# Usage limits not exceeded
within_usage_limits
# Content policy compliant
content_compliant
}
# Validate prompt safety
prompt_safe if {
prompt := input.context.prompt
# Check length
count(prompt) < 4000
# No sensitive keywords
not contains_sensitive_keywords(prompt)
# Safety score acceptable
input.context.safety_score > 0.8
}
contains_sensitive_keywords(prompt) if {
sensitive := ["password", "api-key", "secret", "private-key", "token"]
lower_prompt := lower(prompt)
some keyword in sensitive
contains(lower_prompt, keyword)
}
# Check usage limits
within_usage_limits if {
user_stats := data.usage_stats[input.user.id]
user_stats.requests_today < 1000
user_stats.tokens_today < 100000
}
# Content policy compliance
content_compliant if {
prompt := input.context.prompt
not contains(lower(prompt), "malicious")
not contains(lower(prompt), "hack")
not contains(lower(prompt), "exploit")
}
Pattern 2: LLM Context-Aware Policy
package controlcore.policy.llm
import rego.v1
default allow := false
# Allow LLM access with context checks
allow if {
user_has_llm_access
llm_context_valid
prompt_safety_validated
}
user_has_llm_access if {
input.user.roles[_] in ["admin", "developer", "analyst"]
input.user.attributes.llm_access_level in ["full", "limited"]
}
llm_context_valid if {
context := input.context
# Prompt context available
context.prompt_context.available == true
# Model capabilities match
context.model_context.capabilities[_] in ["text_generation", "completion"]
# Performance acceptable
context.model_context.performance.score > 0.8
}
prompt_safety_validated if {
prompt := input.context.prompt
# Length check
prompt.length < 4000
# No sensitive data
not contains_pii(prompt.content)
# Follows safety guidelines
follows_safety_guidelines(prompt.content)
}
contains_pii(text) if {
pii_patterns := ["ssn", "credit card", "password", "api key"]
lower_text := lower(text)
some pattern in pii_patterns
contains(lower_text, pattern)
}
follows_safety_guidelines(text) if {
prohibited := ["harmful", "dangerous", "illegal", "malicious"]
lower_text := lower(text)
every term in prohibited {
not contains(lower_text, term)
}
}
Pattern 3: RAG System Authorization
package controlcore.policy.rag
import rego.v1
default allow := false
# Allow RAG system access
allow if {
user_authorized_for_rag
query_safe
retrieval_context_valid
}
user_authorized_for_rag if {
input.user.roles[_] in ["researcher", "analyst", "developer"]
input.user.attributes.rag_access == true
}
query_safe if {
query := input.context.query
# Query not malicious
not injection_attempt(query)
# Query scope appropriate
query_scope_valid(query)
}
injection_attempt(query) if {
injection_patterns := ["'; DROP TABLE", "OR 1=1", "../", "<script>"]
some pattern in injection_patterns
contains(query, pattern)
}
query_scope_valid(query) if {
# User can only query their department's data
query_scope := input.context.query_scope
query_scope.department == input.user.department
}
query_scope_valid(query) if {
# Admins can query all data
input.user.roles[_] == "admin"
}
retrieval_context_valid if {
context := input.context.retrieval
# Source documents accessible
every doc_id in context.source_documents {
user_can_access_document(doc_id)
}
# Retrieval count reasonable
count(context.source_documents) <= 10
}
user_can_access_document(doc_id) if {
doc := data.documents[doc_id]
doc.owner == input.user.id
}
user_can_access_document(doc_id) if {
doc := data.documents[doc_id]
doc.classification == "public"
}
Pattern 4: Content Injection Rules
package controlcore.policy.content
import rego.v1
# Pre-prompt content injection
pre_prompt_injection := {
"type": "context_enrichment",
"content": enriched_context
} if {
input.context.injection_enabled == true
}
enriched_context := concat("\n", [
sprintf("User: %s", [input.user.email]),
sprintf("Department: %s", [input.user.department]),
sprintf("Clearance Level: %d", [input.user.attributes.clearance_level]),
"Guidelines: Follow company security policies",
"Restrictions: Do not share sensitive information"
])
# Post-response filtering
filtered_response := response if {
response := mask_sensitive_data(input.response)
}
mask_sensitive_data(response) := masked if {
# Replace sensitive patterns
masked := replace(response, ssn_pattern, "***-**-****")
}
ssn_pattern := `\d{3}-\d{2}-\d{4}`
📌 Built-in Functions
String Functions
# Concatenation
result := concat("-", ["hello", "world"]) # "hello-world"
# Formatting
message := sprintf("User %s has role %s", [user, role])
# Case conversion
lower_text := lower("HELLO") # "hello"
upper_text := upper("hello") # "HELLO"
# String operations
contains("hello world", "world") # true
startswith("hello", "hel") # true
endswith("world", "ld") # true
trim(" hello ", " ") # "hello"
replace("hello", "l", "r") # "herro"
Array Functions
# Count
count([1, 2, 3]) # 3
# Concatenation
concat_array := array.concat([1, 2], [3, 4]) # [1, 2, 3, 4]
# Slice
array.slice([1, 2, 3, 4], 1, 3) # [2, 3]
Set Functions
# Union
{"a", "b"} | {"b", "c"} # {"a", "b", "c"}
# Intersection
{"a", "b"} & {"b", "c"} # {"b"}
# Difference
{"a", "b"} - {"b", "c"} # {"a"}
Aggregation Functions
# Sum
sum([1, 2, 3, 4]) # 10
# Product
product([2, 3, 4]) # 24
# Max/Min
max([1, 5, 3]) # 5
min([1, 5, 3]) # 1
# Count
count([1, 2, 3]) # 3
Type Functions
# Type checking
is_string("hello") # true
is_number(42) # true
is_boolean(true) # true
is_array([1, 2]) # true
is_object({"a": 1}) # true
is_set({1, 2}) # true
is_null(null) # true
Time Functions
# Current time (nanoseconds since epoch)
now := time.now_ns()
# Parse time
parsed := time.parse_rfc3339_ns("2025-01-25T10:30:00Z")
# Clock components [hour, minute, second]
[hour, minute, second] := time.clock([now])
# Date components [year, month, day]
[year, month, day] := time.date([now])
# Day of week (0=Sunday, 6=Saturday)
weekday := time.weekday([now])
JSON Functions
# Parse JSON string
data := json.unmarshal('{"name": "John", "age": 30}')
# Marshal to JSON
json_str := json.marshal({"name": "John"})
Encoding Functions
# Base64
encoded := base64.encode("hello")
decoded := base64.decode("aGVsbG8=")
# URL encoding
url_encoded := urlquery.encode("hello world")
url_decoded := urlquery.decode("hello+world")
Crypto Functions
# ⚠️ DEPRECATED: MD5 and SHA1 are deprecated for security reasons
# Use SHA-256 or SHA-512 instead in production code:
sha256_hash := crypto.sha256("hello")
sha512_hash := crypto.sha512("hello")
# MD5 (DEPRECATED - Do not use in production)
# md5_hash := crypto.md5("hello")
# SHA1 (DEPRECATED - Do not use in production)
# sha1_hash := crypto.sha1("hello")
Regex Functions
# Match
regex.match("^[a-z]+$", "hello") # true
# Find all matches
matches := regex.find_n("[0-9]+", "abc123def456", -1) # ["123", "456"]
# Replace
result := regex.replace("hello", "l+", "L") # "heLo"
📌 Testing Policies
Unit Testing
package controlcore.policy
import rego.v1
# Policy under test
default allow := false
allow if {
input.user.roles[_] == "admin"
}
# Test cases
test_admin_allowed if {
allow with input as {
"user": {"roles": ["admin"]},
"resource": {"type": "api"},
"action": {"name": "read"}
}
}
test_developer_denied if {
not allow with input as {
"user": {"roles": ["developer"]},
"resource": {"type": "api"},
"action": {"name": "read"}
}
}
test_no_role_denied if {
not allow with input as {
"user": {"roles": []},
"resource": {"type": "api"},
"action": {"name": "read"}
}
}
Testing with Mock Data
package controlcore.policy
import rego.v1
# Mock data
mock_users := {
"user-1": {"id": "user-1", "roles": ["admin"]},
"user-2": {"id": "user-2", "roles": ["developer"]},
}
test_with_mock_data if {
allow with input as {
"user": {"id": "user-1"},
"resource": {"type": "api"},
"action": {"name": "write"}
}
with data.users as mock_users
}
📌 Best Practices
1. Always Default Deny
# Good: Explicit default deny
default allow := false
allow if {
# conditions
}
# Bad: No default (implicit undefined)
allow if {
# conditions
}
2. Use Helper Functions
# Good: Reusable helper functions
is_admin if {
input.user.roles[_] == "admin"
}
is_resource_owner if {
input.resource.owner == input.user.id
}
allow if {
is_admin
}
allow if {
is_resource_owner
}
# Bad: Duplicate logic
allow if {
input.user.roles[_] == "admin"
}
allow if {
input.user.roles[_] == "admin"
input.resource.type == "api"
}
3. Document Your Policies
package controlcore.policy
import rego.v1
# Policy: API Access Control
# Owner: Security Team
# Last Updated: 2025-01-25
#
# This policy controls access to production APIs based on:
# - User roles and permissions
# - Resource sensitivity levels
# - Time-based restrictions
# - MFA requirements for sensitive operations
#
# Related Documentation: https://wiki.company.com/api-security
default allow := false
# Allow admin users full access
# Admins are defined in the identity provider
allow if {
input.user.roles[_] == "admin"
}
4. Keep Policies Simple
# Good: Simple and clear
allow if {
input.user.roles[_] == "developer"
input.action.name == "read"
}
# Bad: Overly complex
allow if {
count([r | r := input.user.roles[_]; r in ["developer", "engineer", "coder"]]) > 0
[a | a := ["read", "get", "list"]][_] == input.action.name
}
5. Use Type Checking
# Good: Check types
allow if {
is_string(input.user.id)
is_array(input.user.roles)
count(input.user.roles) > 0
input.user.roles[_] == "admin"
}
# Bad: Assume types
allow if {
input.user.roles[_] == "admin" # Could fail if roles is not an array
}
⚡ Performance Optimization
1. Use Early Returns
# Good: Check cheap conditions first
allow if {
input.action.name == "read" # Quick check
input.user.roles[_] == "developer" # Quick check
expensive_database_lookup # Expensive check last
}
# Bad: Expensive check first
allow if {
expensive_database_lookup # Slow
input.action.name == "read" # Fast but checked last
}
2. Cache-Friendly Policies
# Good: Deterministic (cacheable)
allow if {
input.user.roles[_] == "admin"
input.resource.type == "api"
}
# Bad: Non-deterministic (not cacheable)
allow if {
time.now_ns() < input.session.expiry # Changes every nanosecond!
}
# Better: Use reasonable granularity
allow if {
current_hour := time.clock([time.now_ns()])[0]
current_hour < 17 # Changes hourly, more cacheable
}
3. Avoid Expensive Operations
# Good: Simple operations
allow if {
input.user.department == "Engineering"
}
# Bad: Complex iteration
allow if {
count([u | u := data.users[_]; u.department == "Engineering"]) > 100
}
🔒 Regulatory Compliance Patterns
FINTRAC Compliance (Canada)
Large Cash Transaction Reporting (LCTR):
package fintrac.lctr
import rego.v1
default allow := false
default requires_lctr := false
# FINTRAC: Large Cash Transaction Report required for CAD $10,000+
requires_lctr if {
input.transaction.amount >= 10000
input.transaction.currency == "CAD"
input.transaction.type in ["cash", "cash_equivalent"]
}
# Allow with LCTR requirement
allow if {
# User must be FINTRAC-trained
input.user.attributes.fintrac_certified == true
input.user.roles[_] in ["teller", "financial-advisor", "compliance-officer"]
# Required transaction details present
input.transaction.customer_id != null
input.transaction.conductor_id != null
input.transaction.purpose != null
input.transaction.source_of_funds != null
# Create LCTR if required
requires_lctr
}
# Suspicious Transaction Report (STR) Detection
str_indicators[reason] if {
# Structuring: Multiple transactions just under reporting threshold
recent_txns := data.transactions_last_30_days[input.transaction.customer_id]
near_threshold := [t | t := recent_txns[_]; t.amount >= 9000; t.amount < 10000]
count(near_threshold) >= 3
reason := "possible_structuring"
}
str_indicators[reason] if {
# Unusual pattern: Amount significantly higher than normal
customer := data.customers[input.transaction.customer_id]
input.transaction.amount > (customer.average_transaction * 10)
reason := "unusual_amount"
}
str_indicators[reason] if {
# Rapid movement of funds
input.transaction.type == "wire_transfer"
input.transaction.received_within_hours < 24
input.transaction.immediate_withdrawal == true
reason := "rapid_movement_of_funds"
}
# Block suspicious transactions for compliance review
allow if {
count(str_indicators) == 0
input.user.attributes.aml_cleared == true
}
# Compliance officer can process flagged transactions
allow if {
count(str_indicators) > 0
input.user.roles[_] == "compliance-officer"
input.transaction.compliance_review_completed == true
}
Third Party Determination:
package fintrac.third_party
import rego.v1
# FINTRAC: Identify if conducting on behalf of third party
third_party_determination_required if {
input.transaction.amount >= 10000
input.transaction.customer_id != input.transaction.conductor_id
}
allow if {
third_party_determination_required
# Third party information collected
input.transaction.third_party.name != null
input.transaction.third_party.address != null
input.transaction.third_party.relationship != null
# User verified third party information
input.user.roles[_] in ["teller", "compliance-officer"]
}
OSFI Guidelines (Canada)
Guideline B-10: Outsourcing of Business Activities:
package osfi.outsourcing
import rego.v1
default allow := false
# OSFI B-10: Third-party access requires controls
allow if {
input.user.type == "third_party_vendor"
# Vendor must be approved
vendor := data.approved_vendors[input.user.vendor_id]
vendor.approval_status == "active"
vendor.due_diligence_completed == true
# Access limited to contracted services only
input.resource.type in vendor.contracted_services
# Monitoring and audit enabled
input.context.audit_enabled == true
input.context.vendor_activity_logged == true
}
# OSFI: Segregation of Duties
allow if {
not has_conflicting_duties(input.user.roles)
duty_appropriate_for_action(input.user.roles, input.action.name)
}
has_conflicting_duties(roles) if {
"maker" in roles
"checker" in roles
}
has_conflicting_duties(roles) if {
"initiator" in roles
"approver" in roles
}
Guideline E-21: Technology and Cyber Security:
package osfi.cyber_security
import rego.v1
default allow := false
# OSFI E-21: Multi-Factor Authentication for sensitive operations
allow if {
input.resource.attributes.sensitivity in ["confidential", "restricted"]
# MFA required
input.user.attributes.mfa_verified == true
input.context.mfa_timestamp_age_seconds < 300 # MFA within last 5 minutes
# Access from approved location
input.context.ip_address in data.approved_ip_ranges
}
# OSFI E-21: Privileged Access Management
allow if {
input.user.type == "privileged_user"
# Privileged sessions must be recorded
input.context.session_recording_enabled == true
# Just-in-time access (time-limited)
session_timestamp := time.parse_rfc3339_ns(input.context.session_start)
session_age_seconds := (time.now_ns() - session_timestamp) / 1000000000
session_age_seconds < 3600 # 1 hour max
# Approval required
input.context.privileged_access_approved == true
}
AML Regulations (US & Canada)
Transaction Monitoring:
package aml.monitoring
import rego.v1
default allow := false
# AML: Currency Transaction Report (CTR) for USD $10,000+
requires_ctr if {
input.transaction.amount >= 10000
input.transaction.currency == "USD"
}
# AML: Suspicious Activity Report (SAR) triggers
sar_triggers[trigger] if {
# Trigger 1: Transactions with no apparent business purpose
input.transaction.business_purpose == "unknown"
input.transaction.amount >= 5000
trigger := "no_business_purpose"
}
sar_triggers[trigger] if {
# Trigger 2: Customer refuses to provide information
customer := data.customers[input.transaction.customer_id]
customer.info_refusal_count > 0
trigger := "customer_refused_information"
}
sar_triggers[trigger] if {
# Trigger 3: Funds transfers to high-risk jurisdictions
input.transaction.type == "wire_transfer"
input.transaction.destination_country in data.high_risk_countries
trigger := "high_risk_jurisdiction"
}
# Allow with SAR filing requirement
allow if {
count(sar_triggers) > 0
input.user.roles[_] == "compliance-officer"
input.transaction.sar_filed == true
}
allow if {
count(sar_triggers) == 0
requires_ctr
input.transaction.ctr_filed == true
}
Customer Due Diligence (CDD):
package aml.cdd
import rego.v1
default allow := false
# AML: Customer Due Diligence requirements
allow if {
customer := data.customers[input.resource.attributes.customer_id]
# Standard CDD (all customers)
customer.identity_verified == true
customer.name != null
customer.address != null
customer.date_of_birth != null
customer.identification_number != null
# Beneficial ownership (for entities)
beneficial_ownership_satisfied(customer)
# Risk assessment completed
customer.risk_assessment_date != null
risk_assessment_current(customer.risk_assessment_date)
}
beneficial_ownership_satisfied(customer) if {
customer.type == "individual"
}
beneficial_ownership_satisfied(customer) if {
customer.type == "entity"
count(customer.beneficial_owners) > 0
every owner in customer.beneficial_owners {
owner.ownership_percentage >= 25
owner.identity_verified == true
}
}
risk_assessment_current(assessment_date) if {
days_since := (time.now_ns() - time.parse_rfc3339_ns(assessment_date)) / (24 * 60 * 60 * 1000000000)
days_since < 365 # Annual review required
}
Office of Foreign Assets Control (OFAC) Screening:
package aml.ofac
import rego.v1
default allow := false
# OFAC: Sanctions screening required
allow if {
customer := data.customers[input.transaction.customer_id]
# Not on OFAC SDN (Specially Designated Nationals) list
not customer.id in data.ofac_sdn_list
not customer.name in data.ofac_sdn_names
# Not transacting with sanctioned countries
not input.transaction.destination_country in data.ofac_sanctioned_countries
not input.transaction.origination_country in data.ofac_sanctioned_countries
# Screening completed recently
screening_age := (time.now_ns() - time.parse_rfc3339_ns(customer.last_ofac_screening)) / (24 * 60 * 60 * 1000000000)
screening_age < 1 # Daily screening
}
# OFAC: 50% Rule (entities owned by sanctioned persons)
allow if {
entity := data.entities[input.transaction.entity_id]
# Check all owners
sanctioned_ownership := sum([owner.ownership_percentage |
owner := entity.owners[_]
owner.id in data.ofac_sdn_list
])
# Total sanctioned ownership must be < 50%
sanctioned_ownership < 50
}
FinCEN Regulations (US)
Bank Secrecy Act (BSA) Compliance:
package fincen.bsa
import rego.v1
default allow := false
# BSA: Currency Transaction Report (CTR) for $10,000+
requires_ctr if {
input.transaction.amount >= 10000
input.transaction.currency == "USD"
input.transaction.type in ["cash", "deposit", "withdrawal"]
}
# BSA: Suspicious Activity Report (SAR)
requires_sar if {
count(suspicious_indicators) >= 2
}
suspicious_indicators[indicator] if {
# Multiple transactions just under $10,000
recent := data.transactions_last_7_days[input.transaction.customer_id]
count([t | t := recent[_]; t.amount >= 9000; t.amount < 10000]) >= 2
indicator := "structuring"
}
suspicious_indicators[indicator] if {
# Unusual transaction for customer profile
customer := data.customers[input.transaction.customer_id]
input.transaction.amount > (customer.average_monthly_transactions * 5)
indicator := "unusual_activity"
}
suspicious_indicators[indicator] if {
# Transactions with known high-risk indicators
input.transaction.destination_country in data.high_risk_jurisdictions
input.transaction.amount >= 5000
indicator := "high_risk_jurisdiction"
}
allow if {
requires_ctr
input.transaction.ctr_filed == true
input.user.roles[_] in ["teller", "compliance-officer"]
}
allow if {
requires_sar
input.transaction.sar_filed == true
input.user.roles[_] == "compliance-officer"
input.transaction.senior_management_notified == true
}
Customer Identification Program (CIP):
package fincen.cip
import rego.v1
default allow := false
# FinCEN: CIP requirements for account opening
allow if {
input.action.name == "open_account"
customer := input.request.customer_info
# Four key pieces of information required
customer.name != null
customer.date_of_birth != null
customer.address != null
customer.identification_number != null # SSN or ITIN
# Identification document verified
customer.id_verification.method in ["documentary", "non_documentary"]
customer.id_verification.status == "verified"
# User authorized to open accounts
input.user.roles[_] in ["account-officer", "branch-manager"]
}
GDPR Compliance (EU)
Right to Access (Article 15):
package gdpr.access
import rego.v1
default allow := false
# GDPR: Data subject has right to access their personal data
allow if {
input.action.name == "read"
input.resource.type == "personal_data"
# User accessing their own data
input.resource.attributes.data_subject_id == input.user.id
}
# Allow data protection officer to access for compliance
allow if {
input.user.roles[_] == "data-protection-officer"
input.context.purpose == "compliance_review"
input.context.audit_enabled == true
}
Right to Erasure (Article 17):
package gdpr.erasure
import rego.v1
default allow := false
# GDPR: Right to be forgotten
allow if {
input.action.name == "delete"
input.resource.type == "personal_data"
# Verify erasure request
erasure_request := data.erasure_requests[input.resource.attributes.data_subject_id]
erasure_request.status == "approved"
erasure_request.legal_grounds_verified == true
# Only DPO can execute erasure
input.user.roles[_] == "data-protection-officer"
# No legal obligation to retain
not legal_retention_required(input.resource)
}
legal_retention_required(resource) if {
# Financial records must be kept for 7 years
resource.attributes.type == "financial_record"
record_age_days := (time.now_ns() - time.parse_rfc3339_ns(resource.attributes.created_at)) / (24 * 60 * 60 * 1000000000)
record_age_days < 2555 # 7 years
}
Data Processing Purpose Limitation (Article 5):
package gdpr.purpose_limitation
import rego.v1
default allow := false
# GDPR: Data can only be used for specified, explicit purposes
allow if {
consent := data.user_consents[input.resource.attributes.data_subject_id]
# Purpose matches consent
input.context.processing_purpose in consent.approved_purposes
# Consent is still valid
consent.status == "active"
consent.withdrawal_date == null
}
HIPAA Compliance (US Healthcare)
Minimum Necessary Rule:
package hipaa.minimum_necessary
import rego.v1
default allow := false
# HIPAA: Access only minimum necessary PHI
allow if {
input.user.roles[_] in ["physician", "nurse", "healthcare-provider"]
# User is part of patient's care team
patient_id := input.resource.attributes.patient_id
care_team := data.patient_care_teams[patient_id]
input.user.id in care_team.members
# Purpose of use is valid
input.context.purpose_of_use in ["treatment", "payment", "healthcare-operations"]
# Access limited to necessary fields only
requested_fields := input.query.fields
every field in requested_fields {
field in necessary_fields_for_purpose[input.context.purpose_of_use]
}
}
necessary_fields_for_purpose := {
"treatment": ["patient_id", "diagnosis", "medications", "allergies", "vital_signs"],
"payment": ["patient_id", "insurance_info", "billing_codes", "charges"],
"healthcare-operations": ["patient_id", "encounter_date", "provider_id"]
}
Break-the-Glass Emergency Access:
package hipaa.emergency_access
import rego.v1
default allow := false
# HIPAA: Emergency access with logging
allow if {
input.context.emergency == true
input.user.roles[_] in ["physician", "emergency-physician", "nurse"]
# Emergency access logged
input.context.audit_enabled == true
input.context.emergency_justification != null
# Access reviewed post-emergency
schedule_emergency_access_review(input.user.id, input.resource.id)
}
schedule_emergency_access_review(user_id, resource_id) if {
# Creates audit record for review
true
}
PCI-DSS Compliance
Cardholder Data Access Control:
package pci.cardholder_data
import rego.v1
default allow := false
# PCI-DSS Requirement 7: Restrict access to cardholder data
allow if {
input.resource.attributes.contains_cardholder_data == true
# User has business need to know
input.user.attributes.pci_access_justification != null
input.user.roles[_] in data.pci_authorized_roles
# Access from secured network only
input.context.network_segment == "cardholder_data_environment"
# Strong authentication
input.user.attributes.mfa_verified == true
# Automatic logout after 15 minutes
session_age := (time.now_ns() - time.parse_rfc3339_ns(input.context.session_start)) / 1000000000
session_age < 900
}
# PCI-DSS Requirement 8: Unique ID for each user
allow if {
# Ensure unique user identification
input.user.id != null
input.user.id != "shared"
input.user.id != "generic"
# No shared accounts
not is_shared_account(input.user.id)
}
is_shared_account(user_id) if {
user_id in ["admin", "root", "service", "shared"]
}
SOC 2 Compliance
Access Control (CC6.1, CC6.2):
package soc2.access_control
import rego.v1
default allow := false
# SOC 2 CC6.1: Logical access requires authentication
allow if {
# User must be authenticated
input.user.authenticated == true
input.user.authentication_method in ["saml", "oauth", "mfa"]
# Session must be valid
session_valid(input.context.session_id)
# Authorization based on role
input.user.roles[_] in authorized_roles_for_resource[input.resource.type]
}
session_valid(session_id) if {
session := data.active_sessions[session_id]
session.status == "active"
# Session not expired
expiry := time.parse_rfc3339_ns(session.expiry)
time.now_ns() < expiry
}
# SOC 2 CC6.2: Prior to issuing credentials, registry approved
allow if {
input.action.name == "create_user_account"
# Approval workflow completed
request := input.request.account_request
request.manager_approval == true
request.security_approval == true
# User provisioning follows principle of least privilege
every role in request.requested_roles {
role in data.approved_roles
}
}
Change Management (CC8.1):
package soc2.change_management
import rego.v1
default allow := false
# SOC 2 CC8.1: Changes to policies require approval
allow if {
input.action.name in ["create_policy", "update_policy", "delete_policy"]
# Change request submitted
change_request := input.context.change_request
change_request.id != null
# Approvals obtained
change_request.technical_approval == true
change_request.business_approval == true
# Testing completed
change_request.testing_status == "passed"
# User authorized to deploy
input.user.roles[_] in ["policy-admin", "devops-engineer"]
}
🛠️ Troubleshooting
| Issue | What to check |
|---|---|
| Policy validation errors | Ensure syntax matches Rego rules (e.g. no missing if, correct indentation). Use the policy editor's validation or run tests locally. |
| Unexpected allow/deny | Trace rules with sample input; check that input schema (user, resource, action, context) matches what the bouncer sends. |
| Tests pass locally but fail in Control Plane | Confirm the same Rego version and built-ins; check that bundled policies and dependencies (e.g. PIP data) are in sync. |
| Policy slow or timeout | Simplify rules, avoid heavy comprehensions on large collections; use indexing and see Performance Optimization. |
For more, see the Troubleshooting Guide.
📌 External Resources
Official Documentation
- Open Policy Agent: https://www.openpolicyagent.org/docs/latest/
- Rego Language Reference: https://www.openpolicyagent.org/docs/latest/policy-language/
- OPA Playground: https://play.openpolicyagent.org/
- Built-in Functions: https://www.openpolicyagent.org/docs/latest/policy-reference/
Tutorials and Learning
- Rego Tutorial: https://www.openpolicyagent.org/docs/latest/policy-language/
- Policy Testing: https://www.openpolicyagent.org/docs/latest/policy-testing/
- Best Practices: https://www.openpolicyagent.org/docs/latest/policy-performance/
Community Resources
- OPA Slack: https://slack.openpolicyagent.org/
- GitHub: https://github.com/open-policy-agent/opa
- Stack Overflow: Tagged with
open-policy-agentorrego
Style Guides
- Rego Style Guide: https://www.styra.com/rego-style-guide/
- Policy Best Practices: https://www.styra.com/blog/rego-unit-testing-and-policies/
📌 Control Core Specific Resources
- User Guide: Learn to create and manage policies
- Policy Templates: Pre-built policy examples
- PBAC Best Practices: Advanced patterns
- PIP Developer Guide: Use external data in policies
- API Reference: Policy API documentation
🔧 Quick Reference
Common Patterns Cheat Sheet
# Basic access control
default allow := false
allow if { input.user.roles[_] == "admin" }
# Multiple conditions (AND)
allow if {
input.user.roles[_] == "developer"
input.action.name == "read"
}
# Alternative conditions (OR)
allow if { input.user.roles[_] == "admin" }
allow if { input.user.roles[_] == "manager" }
# Set membership
allow if { input.user.roles[_] in ["admin", "manager"] }
# Negation
allow if { not user_suspended }
# Iteration
user_ids := [user.id | user := data.users[_]]
# Helper function
is_admin if { input.user.roles[_] == "admin" }
# Time-based
is_business_hours if {
hour := time.clock([time.now_ns()])[0]
hour >= 9; hour < 17
}
# Resource ownership
allow if { input.resource.owner == input.user.id }
# Data filtering
visible_rows[row] if {
some row in input.query.result
row.owner == input.user.id
}
🛡️ Advanced Rego Features
This section covers advanced Rego capabilities for building sophisticated policies that go beyond basic conditions.
Advanced Conditions & Logic
Nested Conditions with some/every
The some and every keywords enable powerful iteration and validation patterns:
Using some for iteration:
# Check if ANY user has admin role
has_admin_user if {
some user in data.users
user.role == "admin"
}
# Find admin user with specific permission
admin_with_permission(permission) if {
some user in data.users
user.role == "admin"
permission in user.permissions
}
Using every for validation:
# ALL users must have MFA enabled
all_users_have_mfa if {
every user in data.users {
user.mfa_enabled == true
}
}
# All transactions must be under limit
transactions_valid if {
every txn in input.transactions {
txn.amount < data.limits.max_transaction
}
}
Complex boolean logic:
# Multi-level approval workflow
allow if {
# Manager approval required
some approval in input.approvals
approval.role == "manager"
approval.status == "approved"
# AND either VP approval OR amount under threshold
{
some vp_approval in input.approvals
vp_approval.role == "vp"
vp_approval.status == "approved"
} or {
input.transaction.amount < 10000
}
}
Nested Rule Structures
# Hierarchical permission checking
allow if {
resource_accessible
action_permitted
}
resource_accessible if {
input.resource.type == "document"
document_access_granted
}
document_access_granted if {
input.resource.classification == "public"
}
document_access_granted if {
input.resource.classification == "internal"
employee_access
}
document_access_granted if {
input.resource.classification == "confidential"
manager_access
}
employee_access if {
input.user.employee_id != ""
}
manager_access if {
input.user.role in ["manager", "director", "vp"]
}
action_permitted if {
input.action in allowed_actions
}
allowed_actions contains "read" if { employee_access }
allowed_actions contains "write" if { manager_access }
allowed_actions contains "delete" if { input.user.role == "admin" }
Comprehensions
Comprehensions are powerful tools for transforming and filtering data.
Array Comprehensions
Filtering:
# Get all admin users
admin_users := [user |
some user in data.users
user.role == "admin"
]
# Get user IDs with high clearance
high_clearance_ids := [user.id |
some user in data.users
user.clearance_level >= 5
]
# Complex filtering with multiple conditions
eligible_approvers := [user |
some user in data.users
user.role in ["manager", "director"]
user.department == input.resource.department
user.active == true
]
Transformation:
# Extract just the names
user_names := [user.name | some user in data.users]
# Create structured list
user_summaries := [{"id": user.id, "name": user.name} |
some user in data.users
]
# Permission aggregation
all_user_permissions := [perm |
some user in data.users
some perm in user.permissions
]
Object Comprehensions
Creating mappings:
# Map user ID to role
user_roles := {user.id: user.role |
some user in data.users
}
# Build permission matrix
permission_matrix := {user.id: user.permissions |
some user in data.users
user.active == true
}
# Department user count
dept_counts := {dept: count(users) |
some dept in department_list
users := [u | some u in data.users; u.department == dept]
}
Data transformation:
# Transform array to lookup object
resource_by_id := {resource.id: resource |
some resource in input.resources
}
# Index by multiple keys
users_by_email := {user.email: user |
some user in data.users
}
Set Comprehensions
Creating unique sets:
# All unique roles in the system
all_roles := {user.role |
some user in data.users
}
# All departments with active users
active_departments := {user.department |
some user in data.users
user.active == true
}
# Permissions across all admins
admin_permissions := {perm |
some user in data.users
user.role == "admin"
some perm in user.permissions
}
Functions and Helper Rules
Create reusable logic with functions and helper rules.
Helper Rules
# Reusable validation functions
is_admin if {
input.user.role == "admin"
}
is_manager if {
input.user.role == "manager"
}
is_employee if {
input.user.employee_id != ""
}
# Composite helpers
has_elevated_access if { is_admin }
has_elevated_access if { is_manager }
# Parameterized helpers
user_has_role(role) if {
input.user.roles[_] == role
}
user_has_permission(permission) if {
input.user.permissions[_] == permission
}
Functions with Arguments
# Calculate risk score
risk_score(transaction) := score if {
base_score := transaction.amount / 1000
multiplier := transaction_type_multiplier(transaction.type)
score := base_score * multiplier
}
transaction_type_multiplier("wire_transfer") := 2.0
transaction_type_multiplier("ach") := 1.0
transaction_type_multiplier("check") := 0.5
transaction_type_multiplier(_) := 1.0 # default
# Permission calculation
effective_permissions(user) := permissions if {
# Direct permissions
direct := user.permissions
# Role-based permissions
role_perms := [perm |
some role in user.roles
some perm in data.role_permissions[role]
]
# Combine and deduplicate
permissions := array.concat(direct, role_perms)
}
Custom Validation Functions
# Validate email format
is_valid_email(email) if {
regex.match(`^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`, email)
}
# Validate phone number
is_valid_phone(phone) if {
regex.match(`^\+?[1-9]\d{1,14}$`, phone)
}
# Validate credit card (Luhn algorithm)
is_valid_card(number) if {
# Implementation of Luhn algorithm
digits := [to_number(d) | some d in split(number, "")]
sum := luhn_sum(digits, count(digits) - 1, false)
sum % 10 == 0
}
luhn_sum(digits, index, double) := sum if {
index < 0
sum := 0
} else := sum if {
digit := digits[index]
value := digit * (1 + to_number(double))
adjusted := value - (9 * (value > 9))
sum := adjusted + luhn_sum(digits, index - 1, not double)
}
Data Lookups and External Data
Integrate external data sources into policy decisions.
Accessing the Data Document
# Direct data access
allow if {
user_role := data.user_roles[input.user.id]
user_role == "admin"
}
# Hierarchical data access
allow if {
dept := data.departments[input.user.department]
dept.classification == "secure"
input.resource.classification <= dept.max_resource_level
}
PIP Integration Patterns
Policy Information Points (PIPs) provide real-time context:
# User data from Auth0 PIP
allow if {
# PIP data available at data.pip.auth0
user_profile := data.pip.auth0.users[input.user.id]
user_profile.email_verified == true
user_profile.mfa_enabled == true
}
# Location data from IP Geolocation PIP
allow if {
# PIP data available at data.pip.ipgeo
location := data.pip.ipgeo.lookup[input.request.ip]
location.country in data.allowed_countries
}
# Real-time risk scoring from custom PIP
deny if {
risk_data := data.pip.risk_engine.score[input.user.id]
risk_data.score > data.thresholds.max_risk
}
Dynamic Permission Lookups
# Load permissions from external system
user_permissions contains permission if {
# Fetch from PIP at policy evaluation time
user_data := data.pip.permissions.users[input.user.id]
some permission in user_data.permissions
}
# Resource-specific permissions
allow if {
resource_acl := data.pip.acls.resources[input.resource.id]
input.user.id in resource_acl.allowed_users
}
# Role hierarchy from LDAP
user_roles contains role if {
ldap_user := data.pip.ldap.users[input.user.dn]
some group in ldap_user.groups
role := data.role_mappings[group]
}
Query Patterns
Advanced query patterns for complex scenarios.
Universal Quantification (every)
# All team members must approve
allow if {
team := data.teams[input.resource.team_id]
every member in team.members {
some approval in input.approvals
approval.user_id == member.id
approval.status == "approved"
}
}
# All documents must be classified
documents_valid if {
every doc in input.documents {
doc.classification in ["public", "internal", "confidential", "secret"]
}
}
# All regions must be compliant
compliant if {
every region in input.deployment.regions {
region in data.compliant_regions
data.region_config[region].encryption_enabled == true
}
}
Existential Quantification (some)
# At least one admin must approve
allow if {
some approval in input.approvals
approval.role == "admin"
approval.status == "approved"
}
# Any developer can read
allow if {
input.action == "read"
some role in input.user.roles
role == "developer"
}
# Check if any violation exists
has_violations if {
some check in security_checks
not check.passed
}
Combined Patterns
# At least 2 managers must approve, and no director can reject
allow if {
# Count manager approvals
manager_approvals := count([a |
some a in input.approvals
a.role == "manager"
a.status == "approved"
])
manager_approvals >= 2
# Check no director rejections
not director_rejected
}
director_rejected if {
some approval in input.approvals
approval.role == "director"
approval.status == "rejected"
}
OPA v1.10.0+ Features
Take advantage of the latest OPA capabilities.
Enhanced Built-in Functions
import rego.v1
# New string functions (OPA 0.60+)
normalized_email := strings.trim_space(lower(input.user.email))
# Template rendering (OPA 0.58+)
message := sprintf("User %s attempted %s on %s", [
input.user.name,
input.action,
input.resource.name
])
# Advanced JSON operations
filtered_data := json.remove(input.sensitive_data, ["/ssn", "/credit_card"])
# UUID generation for audit trails
audit_id := uuid.rfc4122()
Performance Optimizations
Partial evaluation (Enterprise feature):
# Mark rules for partial evaluation
allow if {
# This will be partially evaluated at compile time
input.user.department in data.authorized_departments
}
Caching strategies:
# Cache expensive computations
user_permissions := cached_permissions(input.user.id)
cached_permissions(user_id) := permissions if {
# Complex permission calculation
permissions := compute_effective_permissions(user_id)
}
API Extensions
Custom built-in functions (requires OPA bundle):
# Use custom functions from your bundle
allow if {
# Custom cryptographic verification
custom.verify_signature(input.payload, input.signature, data.public_key)
}
# Custom geospatial functions
allow if {
distance := custom.geo_distance(
input.user.location,
input.resource.location
)
distance < data.max_distance_km
}
📌 Real-World Advanced Examples
Financial Compliance (FINTRAC, OSFI)
package controlcore.compliance.fintrac
import rego.v1
# FINTRAC: Large cash transaction reporting
reportable_transaction if {
input.transaction.type == "cash"
input.transaction.amount >= 10000 # CAD
input.transaction.currency == "CAD"
}
# FINTRAC: Suspicious transaction monitoring
suspicious_transaction if {
# Multiple transactions just below reporting threshold
recent_transactions := [t |
some t in data.transactions[input.user.id]
t.timestamp > time.now_ns() - (24 * 3600 * 1000000000) # 24 hours
t.amount >= 9000
t.amount < 10000
]
count(recent_transactions) >= 3
}
# OSFI: Know Your Client validation
allow if {
input.action == "high_risk_transaction"
kyc_complete(input.user.id)
risk_assessment_current(input.user.id)
}
kyc_complete(user_id) if {
kyc := data.kyc_records[user_id]
kyc.identity_verified == true
kyc.address_verified == true
kyc.documents_valid == true
}
risk_assessment_current(user_id) if {
assessment := data.risk_assessments[user_id]
age_days := (time.now_ns() - assessment.timestamp) / (24 * 3600 * 1000000000)
age_days <= 365 # Annual refresh required
}
Healthcare (HIPAA)
package controlcore.compliance.hipaa
import rego.v1
# HIPAA: Minimum necessary standard
allow if {
input.action == "read"
input.resource.type == "patient_record"
# Only necessary fields for user's role
requested_fields := input.query.fields
necessary_fields := minimum_necessary_fields(input.user.role)
every field in requested_fields {
field in necessary_fields
}
}
minimum_necessary_fields("doctor") := [
"patient_id", "name", "dob", "diagnosis",
"medications", "allergies", "treatments"
]
minimum_necessary_fields("nurse") := [
"patient_id", "name", "medications", "vital_signs"
]
minimum_necessary_fields("billing") := [
"patient_id", "name", "insurance", "procedures"
]
# Break-the-glass emergency access
allow if {
input.emergency == true
input.user.role in ["doctor", "nurse"]
# Log emergency access for audit
emergency_access_logged
}
emergency_access_logged if {
# Would trigger external logging via PIP
data.pip.audit.log_emergency_access(input)
}
AI Agent Control with Prompt Filtering
package controlcore.ai.prompt_filter
import rego.v1
# Block prompts trying to extract PII
deny if {
prompt := lower(input.prompt)
# Check for PII extraction attempts
some pattern in pii_extraction_patterns
contains(prompt, pattern)
}
pii_extraction_patterns := [
"show me all email addresses",
"list all phone numbers",
"give me social security",
"display credit card",
"export user data"
]
# Block prompt injection attempts
deny if {
prompt := input.prompt
# Check for instruction override attempts
some pattern in injection_patterns
regex.match(pattern, prompt)
}
injection_patterns := [
`(?i)ignore\s+previous\s+instructions`,
`(?i)disregard\s+.*\s+above`,
`(?i)system\s*:\s*you\s+are`,
`(?i)act\s+as\s+if`,
]
# Content filtering for responses
filtered_response := response if {
response := {
"content": mask_sensitive_data(input.ai_response),
"metadata": input.metadata
}
}
mask_sensitive_data(text) := masked if {
# Mask SSNs
step1 := regex.replace(text, `\d{3}-\d{2}-\d{4}`, "***-**-****")
# Mask credit cards
step2 := regex.replace(step1, `\d{4}\s?\d{4}\s?\d{4}\s?\d{4}`, "**** **** **** ****")
# Mask emails
masked := regex.replace(step2, `[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}`, "***@***.***")
}
Data Masking Patterns
package controlcore.data.masking
import rego.v1
# Field-level masking based on user clearance
masked_fields contains field if {
some field in input.response.fields
field_classification := data.field_classifications[field]
field_classification > input.user.clearance_level
}
filtered_response := response if {
response := {k: v |
some k, v in input.response
not k in masked_fields
}
}
# Partial masking for different user types
mask_value(field, value, user_role) := masked if {
field == "ssn"
user_role == "customer_service"
masked := concat("", ["***-**-", substring(value, 7, -1)])
} else := masked if {
field == "credit_card"
masked := concat("", ["**** **** **** ", substring(value, 15, -1)])
} else := value # No masking needed
Time-Based Access Control
package controlcore.temporal
import rego.v1
# Business hours check
is_business_hours if {
[hour, _, _] := time.clock([time.now_ns(), "America/New_York"])
[_, _, weekday] := time.date([time.now_ns()])
weekday >= 1 # Monday
weekday <= 5 # Friday
hour >= 9
hour < 17
}
# Maintenance window check
is_maintenance_window if {
now := time.now_ns()
some window in data.maintenance_windows
now >= window.start_time
now <= window.end_time
}
# Time-limited permissions
allow if {
input.action == "write"
temp_access := data.temporary_access[input.user.id]
time.now_ns() <= temp_access.expires_at
}
Location-Based Restrictions
package controlcore.geo
import rego.v1
# Allowed countries
allow if {
input.action == "access"
location := data.pip.ipgeo.lookup[input.request.ip]
location.country in data.allowed_countries
}
# Distance-based access
allow if {
user_location := input.user.location
resource_location := data.resources[input.resource.id].location
distance_km := geo_distance(user_location, resource_location)
distance_km <= data.max_distance_km
}
# Calculate distance between coordinates
geo_distance(loc1, loc2) := distance_km if {
# Haversine formula
lat1 := loc1.lat * 3.14159265359 / 180
lat2 := loc2.lat * 3.14159265359 / 180
dlat := lat2 - lat1
dlon := (loc2.lon - loc1.lon) * 3.14159265359 / 180
a := (sin(dlat/2) * sin(dlat/2)) +
(cos(lat1) * cos(lat2) * sin(dlon/2) * sin(dlon/2))
c := 2 * atan2(sqrt(a), sqrt(1-a))
distance_km := 6371 * c # Earth's radius in km
}
You're now equipped to write powerful, secure, and efficient Rego policies! Start with simple patterns and gradually incorporate more advanced techniques as needed. The no-code policy builder is great for basic policies, but use the code editor when you need these advanced features.
For hands-on practice, use the Control Core policy editor with live testing.