BLAKE3 Key Derivation Function

Overview

BLAKE3 KDF is a key derivation function based on the BLAKE3 cryptographic hash function. It leverages BLAKE3’s tree structure and unlimited output capability to provide fast, secure key derivation with excellent performance characteristics. BLAKE3 KDF is particularly well-suited for applications requiring high-speed key generation or large amounts of derived key material.

Key Features

Common Use Cases

Algorithm Details

Parameters

Parameter Description Default Range/Type
input_key_material Source key material Required Any length bytes
context Domain separation string Empty Any length bytes
output_length Desired key length Required 1 to unlimited bytes
derive_key_context BLAKE3 derive key mode “BLAKE3 KDF” String

Security Properties

Implementation

Python Example

from metamui_crypto import BLAKE3_KDF, generate_random_bytes
import os

# Basic key derivation
blake3_kdf = BLAKE3_KDF()

# Derive a 32-byte key
input_key = generate_random_bytes(32)
context = b"application-specific-context"
derived_key = blake3_kdf.derive(
    input_key_material=input_key,
    context=context,
    output_length=32
)

# Derive multiple keys of different lengths
encryption_key = blake3_kdf.derive(input_key, b"encryption", 32)
mac_key = blake3_kdf.derive(input_key, b"authentication", 32)
iv = blake3_kdf.derive(input_key, b"initialization_vector", 16)

# Derive very long key material
long_key = blake3_kdf.derive(
    input_key_material=input_key,
    context=b"long_key_material",
    output_length=1024  # 1KB of key material
)

# Stream key derivation
def derive_stream_keys(master_key: bytes, count: int, key_size: int = 32):
    """Derive multiple keys efficiently"""
    kdf = BLAKE3_KDF()
    keys = []
    
    for i in range(count):
        context = f"stream_key_{i}".encode()
        key = kdf.derive(master_key, context, key_size)
        keys.append(key)
    
    return keys

# Generate 100 keys
master_key = generate_random_bytes(32)
stream_keys = derive_stream_keys(master_key, 100, 32)

Advanced Usage

# High-performance key server
class BLAKE3KeyServer:
    def __init__(self, master_secret: bytes):
        self.master_secret = master_secret
        self.kdf = BLAKE3_KDF()
        self.key_cache = {}
    
    def derive_user_key(self, user_id: str, purpose: str, 
                       key_length: int = 32) -> bytes:
        """Derive user-specific key"""
        context = f"user:{user_id}:purpose:{purpose}".encode()
        
        # Check cache first
        cache_key = (user_id, purpose, key_length)
        if cache_key in self.key_cache:
            return self.key_cache[cache_key]
        
        # Derive new key
        derived_key = self.kdf.derive(
            input_key_material=self.master_secret,
            context=context,
            output_length=key_length
        )
        
        # Cache for future use
        self.key_cache[cache_key] = derived_key
        return derived_key
    
    def derive_session_keys(self, session_id: str) -> dict:
        """Derive complete set of session keys"""
        base_context = f"session:{session_id}".encode()
        
        return {
            'encryption': self.kdf.derive(
                self.master_secret, 
                base_context + b":encryption", 
                32
            ),
            'authentication': self.kdf.derive(
                self.master_secret, 
                base_context + b":authentication", 
                32
            ),
            'key_confirmation': self.kdf.derive(
                self.master_secret, 
                base_context + b":confirmation", 
                16
            )
        }
    
    def derive_hierarchical_key(self, path: list) -> bytes:
        """Derive key using hierarchical path"""
        current_key = self.master_secret
        
        for level, component in enumerate(path):
            context = f"level:{level}:component:{component}".encode()
            current_key = self.kdf.derive(
                input_key_material=current_key,
                context=context,
                output_length=32
            )
        
        return current_key

# Parallel key derivation
import multiprocessing
from concurrent.futures import ThreadPoolExecutor

class ParallelBLAKE3KDF:
    def __init__(self, num_workers=None):
        self.num_workers = num_workers or multiprocessing.cpu_count()
        self.kdf = BLAKE3_KDF()
    
    def derive_batch(self, master_key: bytes, contexts: list, 
                    key_length: int = 32) -> list:
        """Derive multiple keys in parallel"""
        def derive_single(context):
            return self.kdf.derive(master_key, context, key_length)
        
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            results = list(executor.map(derive_single, contexts))
        
        return results
    
    def derive_large_keystream(self, master_key: bytes, 
                              total_length: int, 
                              chunk_size: int = 1024) -> bytes:
        """Derive large keystream in parallel chunks"""
        num_chunks = (total_length + chunk_size - 1) // chunk_size
        
        def derive_chunk(chunk_index):
            context = f"chunk:{chunk_index}".encode()
            actual_size = min(chunk_size, total_length - chunk_index * chunk_size)
            return self.kdf.derive(master_key, context, actual_size)
        
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            chunks = list(executor.map(derive_chunk, range(num_chunks)))
        
        # Combine chunks
        result = b''.join(chunks)
        return result[:total_length]

# Protocol key derivation
class BLAKE3ProtocolKDF:
    def __init__(self, protocol_name: str, version: int):
        self.protocol_name = protocol_name
        self.version = version
        self.kdf = BLAKE3_KDF()
    
    def derive_handshake_keys(self, shared_secret: bytes, 
                             handshake_hash: bytes) -> dict:
        """Derive handshake keys for secure protocol"""
        base_context = (
            f"{self.protocol_name}:v{self.version}:handshake".encode() +
            handshake_hash
        )
        
        return {
            'client_write_key': self.kdf.derive(
                shared_secret, base_context + b":client_write", 32
            ),
            'server_write_key': self.kdf.derive(
                shared_secret, base_context + b":server_write", 32
            ),
            'client_write_iv': self.kdf.derive(
                shared_secret, base_context + b":client_iv", 12
            ),
            'server_write_iv': self.kdf.derive(
                shared_secret, base_context + b":server_iv", 12
            ),
            'finished_key': self.kdf.derive(
                shared_secret, base_context + b":finished", 32
            )
        }
    
    def derive_application_keys(self, handshake_secret: bytes) -> dict:
        """Derive application-level keys"""
        context = f"{self.protocol_name}:v{self.version}:application".encode()
        
        return {
            'application_secret': self.kdf.derive(
                handshake_secret, context + b":secret", 32
            ),
            'exporter_master_secret': self.kdf.derive(
                handshake_secret, context + b":exporter", 32
            ),
            'resumption_master_secret': self.kdf.derive(
                handshake_secret, context + b":resumption", 32
            )
        }
    
    def export_key(self, exporter_secret: bytes, label: bytes, 
                  context_value: bytes, length: int) -> bytes:
        """Export key for external use"""
        export_context = (
            f"{self.protocol_name}:export:".encode() +
            label + b":" + context_value
        )
        
        return self.kdf.derive(exporter_secret, export_context, length)

# Streaming key derivation
class BLAKE3StreamingKDF:
    def __init__(self, master_key: bytes, context: bytes):
        self.master_key = master_key
        self.context = context
        self.kdf = BLAKE3_KDF()
        self.position = 0
    
    def read_key_bytes(self, length: int) -> bytes:
        """Read key bytes from stream"""
        # Derive key material at current position
        position_context = self.context + f":pos:{self.position}".encode()
        key_material = self.kdf.derive(
            self.master_key, 
            position_context, 
            length
        )
        
        self.position += length
        return key_material
    
    def seek(self, position: int):
        """Seek to specific position in key stream"""
        self.position = position
    
    def derive_at_position(self, position: int, length: int) -> bytes:
        """Derive key material at specific position"""
        position_context = self.context + f":pos:{position}".encode()
        return self.kdf.derive(self.master_key, position_context, length)

# Memory-efficient large key derivation
class BLAKE3LargeKeyDerivation:
    def __init__(self, chunk_size: int = 64 * 1024):  # 64KB chunks
        self.chunk_size = chunk_size
        self.kdf = BLAKE3_KDF()
    
    def derive_to_file(self, master_key: bytes, context: bytes, 
                      total_length: int, output_file: str):
        """Derive large key directly to file"""
        with open(output_file, 'wb') as f:
            remaining = total_length
            chunk_index = 0
            
            while remaining > 0:
                chunk_length = min(self.chunk_size, remaining)
                chunk_context = context + f":chunk:{chunk_index}".encode()
                
                chunk_data = self.kdf.derive(
                    master_key, chunk_context, chunk_length
                )
                
                f.write(chunk_data)
                remaining -= chunk_length
                chunk_index += 1
                
                # Progress reporting
                progress = ((total_length - remaining) / total_length) * 100
                if chunk_index % 100 == 0:
                    print(f"Progress: {progress:.1f}%")
    
    def derive_generator(self, master_key: bytes, context: bytes):
        """Generator for infinite key stream"""
        chunk_index = 0
        
        while True:
            chunk_context = context + f":chunk:{chunk_index}".encode()
            chunk_data = self.kdf.derive(
                master_key, chunk_context, self.chunk_size
            )
            
            yield chunk_data
            chunk_index += 1

Implementation Details

# BLAKE3 KDF implementation (simplified)
class BLAKE3_KDF_Core:
    def __init__(self):
        self.derive_key_context = b"BLAKE3 2019-12-27 16:29:52 Derive"
    
    def derive_key(self, input_key_material: bytes, context: bytes, 
                  output_length: int) -> bytes:
        """Core BLAKE3 key derivation"""
        # Create BLAKE3 hasher in derive key mode
        hasher = BLAKE3(key=input_key_material, 
                       derive_key_context=self.derive_key_context)
        
        # Add context to input
        hasher.update(context)
        
        # Generate output of desired length
        return hasher.finalize(output_length)
    
    def derive_key_streaming(self, input_key_material: bytes, 
                           context: bytes) -> 'BLAKE3_KDF_Stream':
        """Create streaming key derivation"""
        hasher = BLAKE3(key=input_key_material,
                       derive_key_context=self.derive_key_context)
        hasher.update(context)
        hasher.finalize()  # Finalize input
        
        return BLAKE3_KDF_Stream(hasher)

class BLAKE3_KDF_Stream:
    def __init__(self, finalized_hasher):
        self.hasher = finalized_hasher
        self.position = 0
    
    def read(self, length: int) -> bytes:
        """Read bytes from key stream"""
        output = self.hasher.squeeze(length, offset=self.position)
        self.position += length
        return output
    
    def seek(self, position: int):
        """Seek to position in stream"""
        self.position = position
    
    def read_at(self, position: int, length: int) -> bytes:
        """Read at specific position"""
        return self.hasher.squeeze(length, offset=position)

# Optimized batch derivation
class OptimizedBLAKE3KDF:
    def __init__(self):
        self.kdf = BLAKE3_KDF()
    
    def derive_multiple_contexts(self, master_key: bytes, 
                                contexts: list, 
                                output_length: int) -> list:
        """Efficiently derive keys for multiple contexts"""
        # Pre-compute base hasher state
        base_hasher = BLAKE3(key=master_key)
        
        results = []
        for context in contexts:
            # Clone base hasher and add context
            hasher = base_hasher.copy()
            hasher.update(context)
            result = hasher.finalize(output_length)
            results.append(result)
        
        return results
    
    def derive_tree_keys(self, master_key: bytes, tree_depth: int, 
                        fanout: int = 2) -> dict:
        """Derive keys in tree structure"""
        keys = {}
        
        def derive_level(level: int, parent_path: str = ""):
            if level > tree_depth:
                return
            
            for i in range(fanout ** level):
                path = f"{parent_path}/{i}" if parent_path else str(i)
                context = f"tree:level:{level}:path:{path}".encode()
                
                key = self.kdf.derive(master_key, context, 32)
                keys[path] = key
                
                # Recursively derive children
                if level < tree_depth:
                    derive_level(level + 1, path)
        
        derive_level(0)
        return keys

Security Considerations

Best Practices

  1. Input Key Material
  2. Context Usage
  3. Output Length
  4. Key Management

Common Pitfalls

# DON'T: Use weak input key material
weak_key = b"password123"  # Predictable!
# WRONG: Low entropy input
derived = blake3_kdf.derive(weak_key, b"context", 32)

# DO: Use high-entropy input
strong_key = generate_random_bytes(32)  # 256 bits of entropy
derived = blake3_kdf.derive(strong_key, b"context", 32)

# DON'T: Reuse contexts for different purposes
context = b"myapp"
# WRONG: Same context for different keys
encryption_key = blake3_kdf.derive(master_key, context, 32)
mac_key = blake3_kdf.derive(master_key, context, 32)  # Same context!

# DO: Use unique contexts
encryption_key = blake3_kdf.derive(master_key, b"myapp:encryption", 32)
mac_key = blake3_kdf.derive(master_key, b"myapp:authentication", 32)

# DON'T: Ignore key clearing
derived_key = blake3_kdf.derive(master_key, context, 32)
# Use key...
# WRONG: Key remains in memory

# DO: Clear sensitive data
derived_key = blake3_kdf.derive(master_key, context, 32)
try:
    # Use key...
    pass
finally:
    # Clear key from memory
    derived_key = b'\x00' * len(derived_key)

Performance Characteristics

Benchmarks

Operation Input Size Output Size Throughput Time
Single Derivation 32 B 32 B 2.1 GB/s 15 ns
Single Derivation 32 B 1 KB 2.8 GB/s 357 ns
Single Derivation 32 B 64 KB 3.2 GB/s 20 μs
Batch Derivation 32 B 32 B × 1000 3.5 GB/s 9.1 μs
Streaming 32 B 1 MB 3.4 GB/s 294 μs

Performance vs Other KDFs

KDF Single Key (32B) Large Output (1MB) Relative Speed
BLAKE3 KDF 15 ns 294 μs 1.00x (baseline)
HKDF-SHA256 2.3 μs 1.85 ms 0.006x
PBKDF2-SHA256 45 μs N/A 0.0003x
Argon2id 2.1 ms N/A 0.000007x

Optimization Strategies

# Pre-compute base states for repeated derivation
class OptimizedDerivation:
    def __init__(self, master_key: bytes, base_context: bytes):
        self.kdf = BLAKE3_KDF()
        self.master_key = master_key
        self.base_context = base_context
        
        # Pre-compute base hasher state
        self.base_hasher = BLAKE3(key=master_key)
        self.base_hasher.update(base_context)
    
    def derive_with_suffix(self, suffix: bytes, length: int) -> bytes:
        """Efficiently derive with context suffix"""
        hasher = self.base_hasher.copy()
        hasher.update(suffix)
        return hasher.finalize(length)

# Parallel derivation for multiple keys
def parallel_derive_keys(master_key: bytes, contexts: list, 
                        key_length: int = 32, num_workers: int = None):
    """Derive multiple keys in parallel"""
    import multiprocessing
    from concurrent.futures import ProcessPoolExecutor
    
    if num_workers is None:
        num_workers = multiprocessing.cpu_count()
    
    def derive_single(context):
        kdf = BLAKE3_KDF()
        return kdf.derive(master_key, context, key_length)
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = list(executor.map(derive_single, contexts))
    
    return results

# Memory-efficient large key generation
def generate_large_keyfile(master_key: bytes, context: bytes, 
                          size_gb: float, output_file: str):
    """Generate multi-gigabyte key file efficiently"""
    kdf = BLAKE3_KDF()
    chunk_size = 1024 * 1024  # 1MB chunks
    total_bytes = int(size_gb * 1024 * 1024 * 1024)
    
    with open(output_file, 'wb') as f:
        bytes_written = 0
        chunk_index = 0
        
        while bytes_written < total_bytes:
            remaining = total_bytes - bytes_written
            current_chunk_size = min(chunk_size, remaining)
            
            chunk_context = context + f":chunk:{chunk_index}".encode()
            chunk_data = kdf.derive(master_key, chunk_context, current_chunk_size)
            
            f.write(chunk_data)
            bytes_written += current_chunk_size
            chunk_index += 1
            
            # Progress reporting
            if chunk_index % 1000 == 0:
                progress = (bytes_written / total_bytes) * 100
                print(f"Generated {progress:.1f}% ({bytes_written / (1024**3):.2f} GB)")

Use Cases

1. High-Speed VPN Key Derivation

class BLAKE3VPNKeyManager:
    def __init__(self, master_secret: bytes):
        self.master_secret = master_secret
        self.kdf = BLAKE3_KDF()
    
    def derive_tunnel_keys(self, tunnel_id: str, 
                          client_random: bytes, 
                          server_random: bytes) -> dict:
        """Derive VPN tunnel keys at high speed"""
        # Combine randomness
        combined_random = client_random + server_random
        
        # Base context for this tunnel
        base_context = f"vpn:tunnel:{tunnel_id}:".encode() + combined_random
        
        # Derive all keys in one operation for efficiency
        key_material = self.kdf.derive(
            input_key_material=self.master_secret,
            context=base_context,
            output_length=128  # 4 × 32-byte keys
        )
        
        return {
            'client_encryption': key_material[0:32],
            'server_encryption': key_material[32:64],
            'client_authentication': key_material[64:96],
            'server_authentication': key_material[96:128]
        }
    
    def derive_session_keys(self, session_id: str, 
                           key_exchange_output: bytes) -> dict:
        """Derive session keys for established tunnel"""
        context = f"vpn:session:{session_id}".encode()
        
        # Derive from key exchange output
        session_material = self.kdf.derive(
            input_key_material=key_exchange_output,
            context=context,
            output_length=80  # Multiple keys
        )
        
        return {
            'encryption_key': session_material[0:32],
            'mac_key': session_material[32:64],
            'iv_seed': session_material[64:80]
        }

2. Real-Time Streaming Encryption

class BLAKE3StreamingCrypto:
    def __init__(self, master_key: bytes, stream_id: str):
        self.master_key = master_key
        self.stream_id = stream_id
        self.kdf = BLAKE3_KDF()
        self.frame_counter = 0
    
    def derive_frame_key(self, frame_number: int) -> bytes:
        """Derive unique key for each frame"""
        context = f"stream:{self.stream_id}:frame:{frame_number}".encode()
        return self.kdf.derive(self.master_key, context, 32)
    
    def encrypt_frame(self, frame_data: bytes) -> dict:
        """Encrypt streaming frame with derived key"""
        frame_key = self.derive_frame_key(self.frame_counter)
        
        # Use frame key for encryption (simplified)
        encrypted_frame = self.encrypt_with_key(frame_data, frame_key)
        
        result = {
            'frame_number': self.frame_counter,
            'encrypted_data': encrypted_frame,
            'timestamp': time.time()
        }
        
        self.frame_counter += 1
        return result
    
    def decrypt_frame(self, encrypted_frame: dict) -> bytes:
        """Decrypt frame using derived key"""
        frame_number = encrypted_frame['frame_number']
        frame_key = self.derive_frame_key(frame_number)
        
        return self.decrypt_with_key(
            encrypted_frame['encrypted_data'], 
            frame_key
        )
    
    def derive_keystream(self, length: int) -> bytes:
        """Derive keystream for stream cipher"""
        context = f"stream:{self.stream_id}:keystream".encode()
        return self.kdf.derive(self.master_key, context, length)

3. Distributed System Key Management

class BLAKE3DistributedKeys:
    def __init__(self, cluster_secret: bytes, node_id: str):
        self.cluster_secret = cluster_secret
        self.node_id = node_id
        self.kdf = BLAKE3_KDF()
    
    def derive_node_identity(self) -> dict:
        """Derive node identity keys"""
        base_context = f"cluster:node:{self.node_id}".encode()
        
        # Derive multiple identity keys
        key_material = self.kdf.derive(
            self.cluster_secret, 
            base_context + b":identity", 
            96
        )
        
        return {
            'signing_key': key_material[0:32],
            'encryption_key': key_material[32:64],
            'authentication_key': key_material[64:96]
        }
    
    def derive_communication_keys(self, peer_node_id: str, 
                                 session_id: str) -> dict:
        """Derive keys for node-to-node communication"""
        # Create deterministic context for both nodes
        nodes = sorted([self.node_id, peer_node_id])
        context = f"cluster:comm:{nodes[0]}:{nodes[1]}:{session_id}".encode()
        
        comm_keys = self.kdf.derive(self.cluster_secret, context, 64)
        
        return {
            'send_key': comm_keys[0:32],
            'receive_key': comm_keys[32:64]
        }
    
    def derive_storage_keys(self, data_partition: str) -> dict:
        """Derive keys for data storage"""
        context = f"cluster:storage:node:{self.node_id}:partition:{data_partition}".encode()
        
        storage_material = self.kdf.derive(self.cluster_secret, context, 80)
        
        return {
            'encryption_key': storage_material[0:32],
            'integrity_key': storage_material[32:64],
            'backup_key': storage_material[64:80]
        }

4. Gaming Anti-Cheat System

class BLAKE3AntiCheat:
    def __init__(self, game_secret: bytes):
        self.game_secret = game_secret
        self.kdf = BLAKE3_KDF()
    
    def derive_client_keys(self, player_id: str, session_start: int) -> dict:
        """Derive anti-cheat keys for game client"""
        context = f"anticheat:player:{player_id}:session:{session_start}".encode()
        
        # Derive keys for different anti-cheat components
        ac_material = self.kdf.derive(self.game_secret, context, 128)
        
        return {
            'memory_scan_key': ac_material[0:32],
            'process_verify_key': ac_material[32:64],
            'network_auth_key': ac_material[64:96],
            'heartbeat_key': ac_material[96:128]
        }
    
    def derive_challenge_response(self, player_id: str, 
                                 challenge_data: bytes) -> bytes:
        """Derive expected response to anti-cheat challenge"""
        context = f"anticheat:challenge:player:{player_id}".encode()
        
        # Combine game secret with challenge
        challenge_input = self.kdf.derive(
            self.game_secret, 
            context, 
            32
        )
        
        # Derive response from challenge input and data
        return self.kdf.derive(challenge_input, challenge_data, 32)
    
    def derive_obfuscation_keys(self, code_section: str) -> bytes:
        """Derive keys for code obfuscation"""
        context = f"anticheat:obfuscation:section:{code_section}".encode()
        return self.kdf.derive(self.game_secret, context, 32)

5. IoT Device Key Provisioning

class BLAKE3IoTProvisioning:
    def __init__(self, manufacturer_secret: bytes):
        self.manufacturer_secret = manufacturer_secret
        self.kdf = BLAKE3_KDF()
    
    def provision_device(self, device_id: str, 
                        device_type: str, 
                        production_batch: str) -> dict:
        """Provision IoT device with derived keys"""
        base_context = (
            f"iot:provision:type:{device_type}:"
            f"batch:{production_batch}:device:{device_id}"
        ).encode()
        
        # Derive comprehensive key set
        key_material = self.kdf.derive(
            self.manufacturer_secret, 
            base_context, 
            160  # 5 × 32-byte keys
        )
        
        return {
            'device_identity': key_material[0:32],
            'attestation_key': key_material[32:64],
            'communication_key': key_material[64:96],
            'storage_encryption': key_material[96:128],
            'update_verification': key_material[128:160]
        }
    
    def derive_firmware_key(self, device_id: str, 
                           firmware_version: str) -> bytes:
        """Derive firmware-specific encryption key"""
        context = f"iot:firmware:device:{device_id}:version:{firmware_version}".encode()
        return self.kdf.derive(self.manufacturer_secret, context, 32)
    
    def derive_sensor_keys(self, device_id: str, 
                          sensor_types: list) -> dict:
        """Derive keys for different sensor types"""
        keys = {}
        
        for sensor_type in sensor_types:
            context = f"iot:sensor:device:{device_id}:type:{sensor_type}".encode()
            keys[sensor_type] = self.kdf.derive(
                self.manufacturer_secret, 
                context, 
                32
            )
        
        return keys

Comparison with Other KDFs

BLAKE3 KDF vs HKDF

Feature BLAKE3 KDF HKDF
Speed Very Fast Moderate
Output Length Unlimited Limited (255 × hash_len)
Parallelization Yes No
Memory Usage Low Low
Standardization Emerging RFC 5869
Hardware Support Limited Widespread

BLAKE3 KDF vs PBKDF2

Feature BLAKE3 KDF PBKDF2
Purpose General KDF Password-based
Speed Very Fast Intentionally Slow
Memory Usage Low Low
Iterations None High
Salt Support Context parameter Required
Use Case High-entropy input Low-entropy passwords

When to Use BLAKE3 KDF

# Use BLAKE3 KDF for high-performance applications
if performance_critical:
    kdf = BLAKE3_KDF()

# Use BLAKE3 KDF for large key derivation
if need_large_amounts_of_key_material:
    kdf = BLAKE3_KDF()  # Unlimited output

# Use BLAKE3 KDF for parallel processing
if can_utilize_multiple_cores:
    kdf = BLAKE3_KDF()  # Parallelizable

# Use BLAKE3 KDF for streaming applications
if need_streaming_key_derivation:
    kdf = BLAKE3_KDF()  # Excellent streaming support

Migration Guide

From HKDF to BLAKE3 KDF

# Before: HKDF
from metamui_crypto import HKDF, SHA256
hkdf = HKDF(hash_function=SHA256())
derived_key = hkdf.derive(ikm=input_key, info=context, length=32)

# After: BLAKE3 KDF
from metamui_crypto import BLAKE3_KDF
blake3_kdf = BLAKE3_KDF()
derived_key = blake3_kdf.derive(
    input_key_material=input_key,
    context=context,
    output_length=32
)

# Migration wrapper
class KDFMigrator:
    def __init__(self, use_blake3=True):
        self.use_blake3 = use_blake3
    
    def derive(self, input_key: bytes, context: bytes, length: int) -> bytes:
        if self.use_blake3:
            kdf = BLAKE3_KDF()
            return kdf.derive(input_key, context, length)
        else:
            hkdf = HKDF(hash_function=SHA256())
            return hkdf.derive(ikm=input_key, info=context, length=length)

From Custom KDF to BLAKE3 KDF

# Before: Custom hash-based KDF
def custom_kdf(key: bytes, context: bytes, length: int) -> bytes:
    output = b""
    counter = 0
    while len(output) < length:
        hash_input = key + context + counter.to_bytes(4, 'big')
        output += hashlib.sha256(hash_input).digest()
        counter += 1
    return output[:length]

# After: BLAKE3 KDF (more secure and faster)
blake3_kdf = BLAKE3_KDF()
output = blake3_kdf.derive(key, context, length)

Test Vectors

Basic Derivation Test Vectors

# Test Vector 1: Basic derivation
input_key = bytes.fromhex("0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b")
context = b"BLAKE3 KDF test"
length = 32

blake3_kdf = BLAKE3_KDF()
output = blake3_kdf.derive(input_key, context, length)
# Expected output depends on BLAKE3 implementation

# Test Vector 2: Empty context
output_empty = blake3_kdf.derive(input_key, b"", length)
assert output != output_empty  # Different contexts produce different outputs

# Test Vector 3: Different lengths
output_16 = blake3_kdf.derive(input_key, context, 16)
output_64 = blake3_kdf.derive(input_key, context, 64)
assert output_16 == output_64[:16]  # Shorter output is prefix of longer

Streaming Derivation Test

# Test Vector 4: Streaming vs batch
input_key = generate_random_bytes(32)
context = b"streaming test"

# Batch derivation
batch_output = blake3_kdf.derive(input_key, context, 1024)

# Streaming derivation
stream = BLAKE3StreamingKDF(input_key, context)
stream_output = b""
for _ in range(32):  # 32 × 32 bytes = 1024 bytes
    stream_output += stream.read_key_bytes(32)

assert batch_output == stream_output

Context Separation Test

# Test Vector 5: Context separation
key = generate_random_bytes(32)
context1 = b"context_one"
context2 = b"context_two"

output1 = blake3_kdf.derive(key, context1, 32)
output2 = blake3_kdf.derive(key, context2, 32)

assert output1 != output2  # Different contexts must produce different outputs

References