This guide covers best practices for deploying Blindfold in production, optimizing performance, and ensuring security.
Choosing the Right Privacy Method
Different use cases require different privacy approaches. Here’s how to choose:
Use Tokenization for AI Processing
When to use:
AI chatbots and assistants
LLM-powered applications
When you need to restore original data after processing
Why:
Reversible - you can restore original PII after AI responds
Maintains context for AI (tokens preserve sentence structure)
Best for user-facing applications
# AI Chatbot Example
protected = client.tokenize(user_input, policy = "gdpr_eu" )
ai_response = send_to_openai(protected.text)
final = client.detokenize(ai_response, protected.mapping)
return final.text # User gets personalized response
When to use:
Showing data to users (e.g., “Card ending in 3456”)
Audit logs that need partial visibility
Customer support interfaces
Why:
Not reversible - safe for display
Shows enough context to be useful
Prevents accidental exposure
# Display to User
masked = client.mask(
"Card: 4532-7562-9102-3456" ,
policy = "pci_dss"
)
# "Card: ***************3456"
Use Redaction for Permanent Removal
When to use:
Audit logs with no PII requirement
Public data sharing
Compliance with “right to be forgotten”
Why:
Completely removes PII
Cannot be reversed
Safest for long-term storage
# Audit Logs
logged = client.redact(
"User John Doe (SSN: 123-45-6789) logged in" ,
policy = "strict"
)
# "User (SSN: ) logged in"
Use Hashing for Analytics
When to use:
User tracking across sessions
Analytics and aggregation
Deduplication without storing PII
Why:
Same input = same hash (consistent IDs)
Cannot reverse to original value
Safe for analytics databases
# Analytics
user_id = client.hash( "john@example.com" )
# Always "ID_a3f8b9c2" for this email
analytics.track(user_id, event = "login" )
Use Encryption for Secure Storage
When to use:
Long-term data storage
Database encryption
Regulatory compliance requiring encrypted PII
Why:
Reversible with your encryption key
Industry-standard AES-256 encryption
You control the decryption key
# Secure Storage
encrypted = client.encrypt(
"Patient: Jane Smith, DOB: 1985-03-15" ,
encryption_key = "your-32-byte-key" ,
policy = "hipaa_us"
)
# Store encrypted.text safely
# Later retrieve
original = client.decrypt(encrypted.text, encryption_key = "your-32-byte-key" )
Use Synthesis for Testing
When to use:
Creating test datasets
Demos and screenshots
Development environments
Why:
Generates realistic fake data
Maintains format and structure
Safe for public sharing
# Test Data
synthetic = client.synthesize(
"John Doe, john@example.com, +1-555-1234" ,
policy = "gdpr_eu"
)
# "Jane Smith, jane.smith@sample.com, +1-555-9876"
Choosing the Right Policy
Policies simplify compliance by providing pre-configured entity sets.
Policy Selection Guide
Policy Use Case Entity Count Compliance basicGeneral PII protection 3 types General privacy gdpr_euEuropean applications 15+ types GDPR Article 4 hipaa_usHealthcare applications 11+ types HIPAA Privacy Rule pci_dssPayment processing 8+ types PCI DSS 3.2.1 strictMaximum protection 60+ types All regulations
When to Create Custom Policies
Create custom policies when:
Industry-Specific Requirements
Your industry has unique PII definitions not covered by standard policies # Legal Industry Example
legal_policy = client.create_policy(
name = "legal_discovery" ,
entities = [
"person" , "organization" , "email address" ,
"case number" , "attorney name" , "client name"
],
threshold = 0.40
)
Performance Optimization
You only need specific entity types and want faster processing # Fast Contact Detection
contact_policy = client.create_policy(
name = "contact_only" ,
entities = [ "email address" , "phone number" ],
threshold = 0.35 # Lower threshold = more detections
)
Custom Entity Detection
You need to detect domain-specific identifiers # E-commerce Example
ecommerce_policy = client.create_policy(
name = "ecommerce_sensitive" ,
entities = [
"person" , "email address" , "phone number" ,
"order number" , "tracking number" , "customer id"
],
threshold = 0.40
)
Security Best Practices
API Key Management
Never commit API keys to version control or expose them in client-side code.
Recommended Approaches:
# .env file (add to .gitignore)
BLINDFOLD_API_KEY = sk_live_...
OPENAI_API_KEY = sk-...
# Python
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv( "BLINDFOLD_API_KEY" )
// JavaScript
import dotenv from 'dotenv' ;
dotenv . config ();
const apiKey = process . env . BLINDFOLD_API_KEY ;
AWS Secrets Manager: import boto3
from botocore.exceptions import ClientError
def get_secret ():
session = boto3.session.Session()
client = session.client( 'secretsmanager' )
try :
response = client.get_secret_value( SecretId = 'blindfold/api-key' )
return response[ 'SecretString' ]
except ClientError as e:
raise e
api_key = get_secret()
Google Secret Manager: from google.cloud import secretmanager
def access_secret ():
client = secretmanager.SecretManagerServiceClient()
name = "projects/PROJECT_ID/secrets/blindfold-api-key/versions/latest"
response = client.access_secret_version( request = { "name" : name})
return response.payload.data.decode( "UTF-8" )
api_key = access_secret()
Never in Browser: // ❌ WRONG - Client-side code
const client = new Blindfold ({ apiKey: 'sk_live_...' });
Use API Routes: // ✅ CORRECT - Next.js API Route
// app/api/protect/route.js
import { Blindfold } from '@blindfold/sdk' ;
const client = new Blindfold ({
apiKey: process . env . BLINDFOLD_API_KEY // Server-side only
});
export async function POST ( request ) {
const { text } = await request . json ();
const result = await client . tokenize ( text , { policy: "gdpr_eu" });
return Response . json ( result );
}
Mapping Storage
Mappings must be stored securely to enable detokenization.
Benefits:
Fast access
Built-in expiration
Encrypted in transit
import redis
import json
from datetime import timedelta
redis_client = redis.Redis(
host = 'localhost' ,
port = 6379 ,
ssl = True , # Use TLS
password = os.getenv( 'REDIS_PASSWORD' )
)
# Store mapping with 24-hour expiration
protected = client.tokenize(user_input, policy = "gdpr_eu" )
session_id = generate_session_id()
redis_client.setex(
f "mapping: { session_id } " ,
timedelta( hours = 24 ),
json.dumps(protected.mapping)
)
# Retrieve later
mapping_json = redis_client.get( f "mapping: { session_id } " )
mapping = json.loads(mapping_json)
original = client.detokenize(ai_response, mapping)
Benefits:
Persistent storage
Query capabilities
Backup support
from cryptography.fernet import Fernet
import json
# Generate encryption key (store securely)
encryption_key = Fernet.generate_key()
cipher = Fernet(encryption_key)
# Encrypt mapping before storing
protected = client.tokenize(user_input, policy = "gdpr_eu" )
encrypted_mapping = cipher.encrypt(
json.dumps(protected.mapping).encode()
)
# Store in database
db.execute(
"INSERT INTO mappings (session_id, encrypted_data, expires_at) VALUES (?, ?, ?)" ,
(session_id, encrypted_mapping, datetime.now() + timedelta( hours = 24 ))
)
# Retrieve and decrypt
row = db.execute( "SELECT encrypted_data FROM mappings WHERE session_id = ?" , (session_id,))
decrypted = cipher.decrypt(row[ 0 ])
mapping = json.loads(decrypted)
Benefits:
Simple implementation
Automatic cleanup
No external dependencies
Use only for short-lived sessions (< 1 hour) # Flask example
from flask import session
@app.route ( '/protect' , methods = [ 'POST' ])
def protect ():
protected = client.tokenize(request.json[ 'text' ], policy = "gdpr_eu" )
# Store in encrypted session cookie
session[ 'mapping' ] = protected.mapping
session[ 'expires' ] = (datetime.now() + timedelta( hours = 1 )).isoformat()
return jsonify({ "text" : protected.text})
@app.route ( '/restore' , methods = [ 'POST' ])
def restore ():
mapping = session.get( 'mapping' )
if not mapping:
return jsonify({ "error" : "Mapping expired" }), 400
original = client.detokenize(request.json[ 'text' ], mapping)
return jsonify({ "text" : original.text})
Mapping Security Checklist:
Use Async for Concurrency
Process multiple requests in parallel for better throughput.
import asyncio
from blindfold import AsyncBlindfold
async def process_messages ( messages ):
async with AsyncBlindfold( api_key = api_key) as client:
# Process concurrently
tasks = [
client.tokenize(msg, policy = "gdpr_eu" )
for msg in messages
]
results = await asyncio.gather( * tasks)
return results
# Process 100 messages concurrently
messages = get_user_messages()
results = asyncio.run(process_messages(messages))
Performance:
Sequential: 100 requests × 200ms = 20 seconds
Async: ~2-3 seconds (limited by API rate limits)
import { Blindfold } from '@blindfold/sdk' ;
const client = new Blindfold ({ apiKey: apiKey });
async function processMessages ( messages ) {
// Process concurrently with Promise.all
const promises = messages . map ( msg =>
client . tokenize ( msg , { policy: "gdpr_eu" })
);
const results = await Promise . all ( promises );
return results ;
}
// Process 100 messages concurrently
const messages = getUserMessages ();
const results = await processMessages ( messages );
Performance:
Sequential: 100 requests × 200ms = 20 seconds
Async: ~2-3 seconds (limited by API rate limits)
Batch Similar Requests
Combine similar text into single requests when possible.
# ❌ Inefficient - 3 API calls
result1 = client.tokenize( "User 1: john@example.com" )
result2 = client.tokenize( "User 2: jane@example.com" )
result3 = client.tokenize( "User 3: bob@example.com" )
# ✅ Efficient - 1 API call
combined_text = """
User 1: john@example.com
User 2: jane@example.com
User 3: bob@example.com
"""
result = client.tokenize(combined_text, policy = "basic" )
# Parse results by line
lines = result.text.split( ' \n ' )
Cache Results
Cache tokenization results for frequently used text.
from functools import lru_cache
import hashlib
class CachedBlindfold :
def __init__ ( self , api_key ):
self .client = Blindfold( api_key = api_key)
self ._cache = {}
def tokenize ( self , text , policy = "basic" ):
# Create cache key
cache_key = hashlib.sha256(
f " { text } : { policy } " .encode()
).hexdigest()
# Check cache
if cache_key in self ._cache:
return self ._cache[cache_key]
# Call API
result = self .client.tokenize(text, policy = policy)
# Store in cache
self ._cache[cache_key] = result
return result
# Use cached client
client = CachedBlindfold( api_key = api_key)
# First call - hits API
result1 = client.tokenize( "john@example.com" , policy = "basic" )
# Second call - uses cache (instant)
result2 = client.tokenize( "john@example.com" , policy = "basic" )
Optimize Detection Threshold
Higher thresholds = faster processing, fewer detections.
# Using GDPR policy (threshold: 0.35)
result = client.tokenize(text, policy = "gdpr_eu" )
# Using custom entities and threshold for specific needs
result = client.tokenize(
text,
entities = [ "person" , "email address" , "phone number" ],
score_threshold = 0.60 # Higher threshold for fewer false positives
)
# Lower threshold to catch more edge cases
result = client.tokenize(
text,
entities = [ "person" , "email address" , "phone number" ],
score_threshold = 0.25
)
Threshold Selection:
0.60+ : High confidence only, fast processing
0.35-0.60 : Balanced (recommended for most use cases)
0.25-0.35 : Catch more edge cases, may have false positives
< 0.25 : Maximum detection, slower, more false positives
Error Handling
Comprehensive Error Handling
Handle all error types gracefully.
from blindfold import (
Blindfold,
AuthenticationError,
APIError,
RateLimitError,
ValidationError
)
import logging
logger = logging.getLogger( __name__ )
def safe_tokenize ( text , policy = "gdpr_eu" ):
try :
result = client.tokenize(text, policy = policy)
return result
except AuthenticationError:
# Invalid API key - alert admin immediately
logger.critical( "Blindfold API key is invalid or expired" )
# Send alert to ops team
send_admin_alert( "Invalid Blindfold API key" )
return None
except RateLimitError as e:
# Rate limited - implement backoff
logger.warning( f "Rate limited. Retry after { e.retry_after } s" )
time.sleep(e.retry_after)
# Retry once
try :
return client.tokenize(text, policy = policy)
except Exception as retry_error:
logger.error( f "Retry failed: { retry_error } " )
return None
except ValidationError as e:
# Invalid input - return user-friendly message
logger.error( f "Validation error: { e.message } " )
return { "error" : "Invalid input provided" }
except APIError as e:
# API error - log details and fail gracefully
logger.error( f "Blindfold API error ( { e.status_code } ): { e.message } " )
# Could be 500, 503, etc.
return None
except Exception as e:
# Unexpected error - log and alert
logger.exception( f "Unexpected error in tokenization: { e } " )
send_admin_alert( f "Unexpected Blindfold error: { e } " )
return None
import {
Blindfold ,
AuthenticationError ,
APIError ,
RateLimitError ,
ValidationError
} from '@blindfold/sdk' ;
const logger = console ; // Use your logging library
async function safeTokenize ( text , policy = "gdpr_eu" ) {
try {
const result = await client . tokenize ( text , { policy });
return result ;
} catch ( error ) {
if ( error instanceof AuthenticationError ) {
// Invalid API key - alert admin immediately
logger . error ( "Blindfold API key is invalid or expired" );
await sendAdminAlert ( "Invalid Blindfold API key" );
return null ;
} else if ( error instanceof RateLimitError ) {
// Rate limited - implement backoff
logger . warn ( `Rate limited. Retry after ${ error . retryAfter } s` );
await sleep ( error . retryAfter * 1000 );
// Retry once
try {
return await client . tokenize ( text , { policy });
} catch ( retryError ) {
logger . error ( `Retry failed: ${ retryError . message } ` );
return null ;
}
} else if ( error instanceof ValidationError ) {
// Invalid input - return user-friendly message
logger . error ( `Validation error: ${ error . message } ` );
return { error: "Invalid input provided" };
} else if ( error instanceof APIError ) {
// API error - log details and fail gracefully
logger . error ( `Blindfold API error ( ${ error . statusCode } ): ${ error . message } ` );
return null ;
} else {
// Unexpected error - log and alert
logger . error ( `Unexpected error in tokenization: ${ error . message } ` );
await sendAdminAlert ( `Unexpected Blindfold error: ${ error . message } ` );
return null ;
}
}
}
Retry Strategy with Exponential Backoff
import time
from blindfold import Blindfold, APIError, RateLimitError
def tokenize_with_retry ( text , policy = "gdpr_eu" , max_retries = 3 ):
"""Tokenize with exponential backoff retry strategy"""
for attempt in range (max_retries):
try :
result = client.tokenize(text, policy = policy)
return result
except RateLimitError as e:
# Use retry-after header
wait_time = e.retry_after
logger.warning( f "Rate limited. Waiting { wait_time } s..." )
time.sleep(wait_time)
continue
except APIError as e:
if e.status_code >= 500 :
# Server error - retry with exponential backoff
wait_time = ( 2 ** attempt) + random.uniform( 0 , 1 )
logger.warning( f "Server error. Retry { attempt + 1 } / { max_retries } after { wait_time } s" )
time.sleep(wait_time)
continue
else :
# Client error - don't retry
raise
except Exception as e:
# Unexpected error - don't retry
raise
raise Exception ( f "Failed after { max_retries } retries" )
Monitoring and Logging
Track API Usage
Monitor your API usage to prevent unexpected rate limit hits.
import logging
from datetime import datetime
class MonitoredBlindfold :
def __init__ ( self , api_key ):
self .client = Blindfold( api_key = api_key)
self .request_count = 0
self .error_count = 0
self .logger = logging.getLogger( __name__ )
def tokenize ( self , text , policy = "basic" ):
start_time = datetime.now()
try :
result = self .client.tokenize(text, policy = policy)
self .request_count += 1
# Log successful request
duration = (datetime.now() - start_time).total_seconds()
self .logger.info(
f "Tokenize success: { result.entities_count } entities, "
f " { duration :.2f} s, policy= { policy } "
)
return result
except Exception as e:
self .error_count += 1
duration = (datetime.now() - start_time).total_seconds()
# Log error
self .logger.error(
f "Tokenize error: { str (e) } , "
f " { duration :.2f} s, policy= { policy } "
)
raise
def get_stats ( self ):
return {
"total_requests" : self .request_count,
"total_errors" : self .error_count,
"error_rate" : self .error_count / max ( self .request_count, 1 )
}
Set Up Alerts
Monitor critical metrics and set up alerts.
# Example: DataDog monitoring
from datadog import statsd
def tokenize_with_metrics ( text , policy = "gdpr_eu" ):
with statsd.timed( 'blindfold.tokenize.duration' ):
try :
result = client.tokenize(text, policy = policy)
# Track success
statsd.increment( 'blindfold.tokenize.success' )
statsd.gauge( 'blindfold.entities_detected' , result.entities_count)
return result
except RateLimitError:
statsd.increment( 'blindfold.tokenize.rate_limited' )
raise
except Exception as e:
statsd.increment( 'blindfold.tokenize.error' )
raise
Compliance Considerations
GDPR Compliance
Only detect and protect the entities you need. # ✅ Good - Only detect what's needed
result = client.tokenize(
text,
entities = [ "person" , "email address" ] # Minimal set
)
# ❌ Overkill - Detecting everything
result = client.tokenize(text, policy = "strict" ) # 60+ entities
Implement data deletion for user requests. def delete_user_data ( user_id ):
# 1. Delete mappings from storage
redis_client.delete( f "mapping: { user_id } " )
# 2. Delete encrypted data from database
db.execute( "DELETE FROM mappings WHERE user_id = ?" , (user_id,))
# 3. Log deletion for audit trail
logger.info( f "Deleted all data for user { user_id } " )
return { "status" : "deleted" }
Data Processing Agreement
Request a DPA from Blindfold for your records. Contact: hello@blindfold.dev
Subject: “DPA Request - [Your Company]“
HIPAA Compliance
# Always use hipaa_us policy for healthcare data
result = client.tokenize(
patient_data,
policy = "hipaa_us"
)
# Ensure TLS/SSL for all API calls (enabled by default)
client = Blindfold(
api_key = api_key,
# SDK uses HTTPS by default
)
Business Associate Agreement
Request a BAA from Blindfold if processing PHI. Contact: hello@blindfold.dev
Subject: “BAA Request - [Your Organization]“
Testing Best Practices
Unit Testing
Test your privacy protection logic thoroughly.
import unittest
from blindfold import Blindfold
class TestPrivacyProtection ( unittest . TestCase ):
def setUp ( self ):
self .client = Blindfold( api_key = "test_key" )
def test_email_detection ( self ):
"""Test that emails are properly detected"""
result = self .client.tokenize(
"Contact: john@example.com" ,
policy = "basic"
)
self .assertEqual(result.entities_count, 1 )
self .assertIn( "<email_address_1>" , result.text)
self .assertIn( "<email_address_1>" , result.mapping)
self .assertEqual(result.mapping[ "<email_address_1>" ], "john@example.com" )
def test_detokenization_restores_original ( self ):
"""Test that detokenization works correctly"""
original = "My email is john@example.com"
# Tokenize
protected = self .client.tokenize(original, policy = "basic" )
# Detokenize
restored = self .client.detokenize(
protected.text,
protected.mapping
)
self .assertEqual(restored.text, original)
def test_no_pii_returns_unchanged ( self ):
"""Test that text without PII is unchanged"""
text = "The weather is nice today"
result = self .client.tokenize(text, policy = "basic" )
self .assertEqual(result.entities_count, 0 )
self .assertEqual(result.text, text)
self .assertEqual(result.mapping, {})
Integration Testing
Test the complete flow with real AI providers.
import pytest
from blindfold import Blindfold
from openai import OpenAI
@pytest.fixture
def clients ():
return {
'blindfold' : Blindfold( api_key = os.getenv( "BLINDFOLD_API_KEY" )),
'openai' : OpenAI( api_key = os.getenv( "OPENAI_API_KEY" ))
}
def test_ai_integration_flow ( clients ):
"""Test complete privacy-preserving AI flow"""
user_input = "My name is John Doe and my email is john@example.com"
# Step 1: Protect PII
protected = clients[ 'blindfold' ].tokenize(
user_input,
policy = "gdpr_eu"
)
assert protected.entities_count > 0
assert "John Doe" not in protected.text
assert "john@example.com" not in protected.text
# Step 2: Send to AI
completion = clients[ 'openai' ].chat.completions.create(
model = "gpt-4" ,
messages = [
{ "role" : "system" , "content" : "You are a helpful assistant." },
{ "role" : "user" , "content" : protected.text}
]
)
ai_response = completion.choices[ 0 ].message.content
# Step 3: Restore original data
final_response = clients[ 'blindfold' ].detokenize(
ai_response,
protected.mapping
)
# Verify restoration worked
if "<person_1>" in ai_response:
assert "John Doe" in final_response.text
if "<email_address_1>" in ai_response:
assert "john@example.com" in final_response.text
Production Deployment Checklist
Before deploying to production:
Security
Error Handling
Monitoring
Compliance
Testing
Need Help?