BLAKE3 KDF is a key derivation function based on the BLAKE3 cryptographic hash function. It leverages BLAKE3’s tree structure and unlimited output capability to provide fast, secure key derivation with excellent performance characteristics. BLAKE3 KDF is particularly well-suited for applications requiring high-speed key generation or large amounts of derived key material.
| Parameter | Description | Default | Range/Type |
|---|---|---|---|
input_key_material |
Source key material | Required | Any length bytes |
context |
Domain separation string | Empty | Any length bytes |
output_length |
Desired key length | Required | 1 to unlimited bytes |
derive_key_context |
BLAKE3 derive key mode | “BLAKE3 KDF” | String |
from metamui_crypto import BLAKE3_KDF, generate_random_bytes
import os
# Basic key derivation
blake3_kdf = BLAKE3_KDF()
# Derive a 32-byte key
input_key = generate_random_bytes(32)
context = b"application-specific-context"
derived_key = blake3_kdf.derive(
input_key_material=input_key,
context=context,
output_length=32
)
# Derive multiple keys of different lengths
encryption_key = blake3_kdf.derive(input_key, b"encryption", 32)
mac_key = blake3_kdf.derive(input_key, b"authentication", 32)
iv = blake3_kdf.derive(input_key, b"initialization_vector", 16)
# Derive very long key material
long_key = blake3_kdf.derive(
input_key_material=input_key,
context=b"long_key_material",
output_length=1024 # 1KB of key material
)
# Stream key derivation
def derive_stream_keys(master_key: bytes, count: int, key_size: int = 32):
"""Derive multiple keys efficiently"""
kdf = BLAKE3_KDF()
keys = []
for i in range(count):
context = f"stream_key_{i}".encode()
key = kdf.derive(master_key, context, key_size)
keys.append(key)
return keys
# Generate 100 keys
master_key = generate_random_bytes(32)
stream_keys = derive_stream_keys(master_key, 100, 32)
# High-performance key server
class BLAKE3KeyServer:
def __init__(self, master_secret: bytes):
self.master_secret = master_secret
self.kdf = BLAKE3_KDF()
self.key_cache = {}
def derive_user_key(self, user_id: str, purpose: str,
key_length: int = 32) -> bytes:
"""Derive user-specific key"""
context = f"user:{user_id}:purpose:{purpose}".encode()
# Check cache first
cache_key = (user_id, purpose, key_length)
if cache_key in self.key_cache:
return self.key_cache[cache_key]
# Derive new key
derived_key = self.kdf.derive(
input_key_material=self.master_secret,
context=context,
output_length=key_length
)
# Cache for future use
self.key_cache[cache_key] = derived_key
return derived_key
def derive_session_keys(self, session_id: str) -> dict:
"""Derive complete set of session keys"""
base_context = f"session:{session_id}".encode()
return {
'encryption': self.kdf.derive(
self.master_secret,
base_context + b":encryption",
32
),
'authentication': self.kdf.derive(
self.master_secret,
base_context + b":authentication",
32
),
'key_confirmation': self.kdf.derive(
self.master_secret,
base_context + b":confirmation",
16
)
}
def derive_hierarchical_key(self, path: list) -> bytes:
"""Derive key using hierarchical path"""
current_key = self.master_secret
for level, component in enumerate(path):
context = f"level:{level}:component:{component}".encode()
current_key = self.kdf.derive(
input_key_material=current_key,
context=context,
output_length=32
)
return current_key
# Parallel key derivation
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
class ParallelBLAKE3KDF:
def __init__(self, num_workers=None):
self.num_workers = num_workers or multiprocessing.cpu_count()
self.kdf = BLAKE3_KDF()
def derive_batch(self, master_key: bytes, contexts: list,
key_length: int = 32) -> list:
"""Derive multiple keys in parallel"""
def derive_single(context):
return self.kdf.derive(master_key, context, key_length)
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
results = list(executor.map(derive_single, contexts))
return results
def derive_large_keystream(self, master_key: bytes,
total_length: int,
chunk_size: int = 1024) -> bytes:
"""Derive large keystream in parallel chunks"""
num_chunks = (total_length + chunk_size - 1) // chunk_size
def derive_chunk(chunk_index):
context = f"chunk:{chunk_index}".encode()
actual_size = min(chunk_size, total_length - chunk_index * chunk_size)
return self.kdf.derive(master_key, context, actual_size)
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
chunks = list(executor.map(derive_chunk, range(num_chunks)))
# Combine chunks
result = b''.join(chunks)
return result[:total_length]
# Protocol key derivation
class BLAKE3ProtocolKDF:
def __init__(self, protocol_name: str, version: int):
self.protocol_name = protocol_name
self.version = version
self.kdf = BLAKE3_KDF()
def derive_handshake_keys(self, shared_secret: bytes,
handshake_hash: bytes) -> dict:
"""Derive handshake keys for secure protocol"""
base_context = (
f"{self.protocol_name}:v{self.version}:handshake".encode() +
handshake_hash
)
return {
'client_write_key': self.kdf.derive(
shared_secret, base_context + b":client_write", 32
),
'server_write_key': self.kdf.derive(
shared_secret, base_context + b":server_write", 32
),
'client_write_iv': self.kdf.derive(
shared_secret, base_context + b":client_iv", 12
),
'server_write_iv': self.kdf.derive(
shared_secret, base_context + b":server_iv", 12
),
'finished_key': self.kdf.derive(
shared_secret, base_context + b":finished", 32
)
}
def derive_application_keys(self, handshake_secret: bytes) -> dict:
"""Derive application-level keys"""
context = f"{self.protocol_name}:v{self.version}:application".encode()
return {
'application_secret': self.kdf.derive(
handshake_secret, context + b":secret", 32
),
'exporter_master_secret': self.kdf.derive(
handshake_secret, context + b":exporter", 32
),
'resumption_master_secret': self.kdf.derive(
handshake_secret, context + b":resumption", 32
)
}
def export_key(self, exporter_secret: bytes, label: bytes,
context_value: bytes, length: int) -> bytes:
"""Export key for external use"""
export_context = (
f"{self.protocol_name}:export:".encode() +
label + b":" + context_value
)
return self.kdf.derive(exporter_secret, export_context, length)
# Streaming key derivation
class BLAKE3StreamingKDF:
def __init__(self, master_key: bytes, context: bytes):
self.master_key = master_key
self.context = context
self.kdf = BLAKE3_KDF()
self.position = 0
def read_key_bytes(self, length: int) -> bytes:
"""Read key bytes from stream"""
# Derive key material at current position
position_context = self.context + f":pos:{self.position}".encode()
key_material = self.kdf.derive(
self.master_key,
position_context,
length
)
self.position += length
return key_material
def seek(self, position: int):
"""Seek to specific position in key stream"""
self.position = position
def derive_at_position(self, position: int, length: int) -> bytes:
"""Derive key material at specific position"""
position_context = self.context + f":pos:{position}".encode()
return self.kdf.derive(self.master_key, position_context, length)
# Memory-efficient large key derivation
class BLAKE3LargeKeyDerivation:
def __init__(self, chunk_size: int = 64 * 1024): # 64KB chunks
self.chunk_size = chunk_size
self.kdf = BLAKE3_KDF()
def derive_to_file(self, master_key: bytes, context: bytes,
total_length: int, output_file: str):
"""Derive large key directly to file"""
with open(output_file, 'wb') as f:
remaining = total_length
chunk_index = 0
while remaining > 0:
chunk_length = min(self.chunk_size, remaining)
chunk_context = context + f":chunk:{chunk_index}".encode()
chunk_data = self.kdf.derive(
master_key, chunk_context, chunk_length
)
f.write(chunk_data)
remaining -= chunk_length
chunk_index += 1
# Progress reporting
progress = ((total_length - remaining) / total_length) * 100
if chunk_index % 100 == 0:
print(f"Progress: {progress:.1f}%")
def derive_generator(self, master_key: bytes, context: bytes):
"""Generator for infinite key stream"""
chunk_index = 0
while True:
chunk_context = context + f":chunk:{chunk_index}".encode()
chunk_data = self.kdf.derive(
master_key, chunk_context, self.chunk_size
)
yield chunk_data
chunk_index += 1
# BLAKE3 KDF implementation (simplified)
class BLAKE3_KDF_Core:
def __init__(self):
self.derive_key_context = b"BLAKE3 2019-12-27 16:29:52 Derive"
def derive_key(self, input_key_material: bytes, context: bytes,
output_length: int) -> bytes:
"""Core BLAKE3 key derivation"""
# Create BLAKE3 hasher in derive key mode
hasher = BLAKE3(key=input_key_material,
derive_key_context=self.derive_key_context)
# Add context to input
hasher.update(context)
# Generate output of desired length
return hasher.finalize(output_length)
def derive_key_streaming(self, input_key_material: bytes,
context: bytes) -> 'BLAKE3_KDF_Stream':
"""Create streaming key derivation"""
hasher = BLAKE3(key=input_key_material,
derive_key_context=self.derive_key_context)
hasher.update(context)
hasher.finalize() # Finalize input
return BLAKE3_KDF_Stream(hasher)
class BLAKE3_KDF_Stream:
def __init__(self, finalized_hasher):
self.hasher = finalized_hasher
self.position = 0
def read(self, length: int) -> bytes:
"""Read bytes from key stream"""
output = self.hasher.squeeze(length, offset=self.position)
self.position += length
return output
def seek(self, position: int):
"""Seek to position in stream"""
self.position = position
def read_at(self, position: int, length: int) -> bytes:
"""Read at specific position"""
return self.hasher.squeeze(length, offset=position)
# Optimized batch derivation
class OptimizedBLAKE3KDF:
def __init__(self):
self.kdf = BLAKE3_KDF()
def derive_multiple_contexts(self, master_key: bytes,
contexts: list,
output_length: int) -> list:
"""Efficiently derive keys for multiple contexts"""
# Pre-compute base hasher state
base_hasher = BLAKE3(key=master_key)
results = []
for context in contexts:
# Clone base hasher and add context
hasher = base_hasher.copy()
hasher.update(context)
result = hasher.finalize(output_length)
results.append(result)
return results
def derive_tree_keys(self, master_key: bytes, tree_depth: int,
fanout: int = 2) -> dict:
"""Derive keys in tree structure"""
keys = {}
def derive_level(level: int, parent_path: str = ""):
if level > tree_depth:
return
for i in range(fanout ** level):
path = f"{parent_path}/{i}" if parent_path else str(i)
context = f"tree:level:{level}:path:{path}".encode()
key = self.kdf.derive(master_key, context, 32)
keys[path] = key
# Recursively derive children
if level < tree_depth:
derive_level(level + 1, path)
derive_level(0)
return keys
# DON'T: Use weak input key material
weak_key = b"password123" # Predictable!
# WRONG: Low entropy input
derived = blake3_kdf.derive(weak_key, b"context", 32)
# DO: Use high-entropy input
strong_key = generate_random_bytes(32) # 256 bits of entropy
derived = blake3_kdf.derive(strong_key, b"context", 32)
# DON'T: Reuse contexts for different purposes
context = b"myapp"
# WRONG: Same context for different keys
encryption_key = blake3_kdf.derive(master_key, context, 32)
mac_key = blake3_kdf.derive(master_key, context, 32) # Same context!
# DO: Use unique contexts
encryption_key = blake3_kdf.derive(master_key, b"myapp:encryption", 32)
mac_key = blake3_kdf.derive(master_key, b"myapp:authentication", 32)
# DON'T: Ignore key clearing
derived_key = blake3_kdf.derive(master_key, context, 32)
# Use key...
# WRONG: Key remains in memory
# DO: Clear sensitive data
derived_key = blake3_kdf.derive(master_key, context, 32)
try:
# Use key...
pass
finally:
# Clear key from memory
derived_key = b'\x00' * len(derived_key)
| Operation | Input Size | Output Size | Throughput | Time |
|---|---|---|---|---|
| Single Derivation | 32 B | 32 B | 2.1 GB/s | 15 ns |
| Single Derivation | 32 B | 1 KB | 2.8 GB/s | 357 ns |
| Single Derivation | 32 B | 64 KB | 3.2 GB/s | 20 μs |
| Batch Derivation | 32 B | 32 B × 1000 | 3.5 GB/s | 9.1 μs |
| Streaming | 32 B | 1 MB | 3.4 GB/s | 294 μs |
| KDF | Single Key (32B) | Large Output (1MB) | Relative Speed |
|---|---|---|---|
| BLAKE3 KDF | 15 ns | 294 μs | 1.00x (baseline) |
| HKDF-SHA256 | 2.3 μs | 1.85 ms | 0.006x |
| PBKDF2-SHA256 | 45 μs | N/A | 0.0003x |
| Argon2id | 2.1 ms | N/A | 0.000007x |
# Pre-compute base states for repeated derivation
class OptimizedDerivation:
def __init__(self, master_key: bytes, base_context: bytes):
self.kdf = BLAKE3_KDF()
self.master_key = master_key
self.base_context = base_context
# Pre-compute base hasher state
self.base_hasher = BLAKE3(key=master_key)
self.base_hasher.update(base_context)
def derive_with_suffix(self, suffix: bytes, length: int) -> bytes:
"""Efficiently derive with context suffix"""
hasher = self.base_hasher.copy()
hasher.update(suffix)
return hasher.finalize(length)
# Parallel derivation for multiple keys
def parallel_derive_keys(master_key: bytes, contexts: list,
key_length: int = 32, num_workers: int = None):
"""Derive multiple keys in parallel"""
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
if num_workers is None:
num_workers = multiprocessing.cpu_count()
def derive_single(context):
kdf = BLAKE3_KDF()
return kdf.derive(master_key, context, key_length)
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(derive_single, contexts))
return results
# Memory-efficient large key generation
def generate_large_keyfile(master_key: bytes, context: bytes,
size_gb: float, output_file: str):
"""Generate multi-gigabyte key file efficiently"""
kdf = BLAKE3_KDF()
chunk_size = 1024 * 1024 # 1MB chunks
total_bytes = int(size_gb * 1024 * 1024 * 1024)
with open(output_file, 'wb') as f:
bytes_written = 0
chunk_index = 0
while bytes_written < total_bytes:
remaining = total_bytes - bytes_written
current_chunk_size = min(chunk_size, remaining)
chunk_context = context + f":chunk:{chunk_index}".encode()
chunk_data = kdf.derive(master_key, chunk_context, current_chunk_size)
f.write(chunk_data)
bytes_written += current_chunk_size
chunk_index += 1
# Progress reporting
if chunk_index % 1000 == 0:
progress = (bytes_written / total_bytes) * 100
print(f"Generated {progress:.1f}% ({bytes_written / (1024**3):.2f} GB)")
class BLAKE3VPNKeyManager:
def __init__(self, master_secret: bytes):
self.master_secret = master_secret
self.kdf = BLAKE3_KDF()
def derive_tunnel_keys(self, tunnel_id: str,
client_random: bytes,
server_random: bytes) -> dict:
"""Derive VPN tunnel keys at high speed"""
# Combine randomness
combined_random = client_random + server_random
# Base context for this tunnel
base_context = f"vpn:tunnel:{tunnel_id}:".encode() + combined_random
# Derive all keys in one operation for efficiency
key_material = self.kdf.derive(
input_key_material=self.master_secret,
context=base_context,
output_length=128 # 4 × 32-byte keys
)
return {
'client_encryption': key_material[0:32],
'server_encryption': key_material[32:64],
'client_authentication': key_material[64:96],
'server_authentication': key_material[96:128]
}
def derive_session_keys(self, session_id: str,
key_exchange_output: bytes) -> dict:
"""Derive session keys for established tunnel"""
context = f"vpn:session:{session_id}".encode()
# Derive from key exchange output
session_material = self.kdf.derive(
input_key_material=key_exchange_output,
context=context,
output_length=80 # Multiple keys
)
return {
'encryption_key': session_material[0:32],
'mac_key': session_material[32:64],
'iv_seed': session_material[64:80]
}
class BLAKE3StreamingCrypto:
def __init__(self, master_key: bytes, stream_id: str):
self.master_key = master_key
self.stream_id = stream_id
self.kdf = BLAKE3_KDF()
self.frame_counter = 0
def derive_frame_key(self, frame_number: int) -> bytes:
"""Derive unique key for each frame"""
context = f"stream:{self.stream_id}:frame:{frame_number}".encode()
return self.kdf.derive(self.master_key, context, 32)
def encrypt_frame(self, frame_data: bytes) -> dict:
"""Encrypt streaming frame with derived key"""
frame_key = self.derive_frame_key(self.frame_counter)
# Use frame key for encryption (simplified)
encrypted_frame = self.encrypt_with_key(frame_data, frame_key)
result = {
'frame_number': self.frame_counter,
'encrypted_data': encrypted_frame,
'timestamp': time.time()
}
self.frame_counter += 1
return result
def decrypt_frame(self, encrypted_frame: dict) -> bytes:
"""Decrypt frame using derived key"""
frame_number = encrypted_frame['frame_number']
frame_key = self.derive_frame_key(frame_number)
return self.decrypt_with_key(
encrypted_frame['encrypted_data'],
frame_key
)
def derive_keystream(self, length: int) -> bytes:
"""Derive keystream for stream cipher"""
context = f"stream:{self.stream_id}:keystream".encode()
return self.kdf.derive(self.master_key, context, length)
class BLAKE3DistributedKeys:
def __init__(self, cluster_secret: bytes, node_id: str):
self.cluster_secret = cluster_secret
self.node_id = node_id
self.kdf = BLAKE3_KDF()
def derive_node_identity(self) -> dict:
"""Derive node identity keys"""
base_context = f"cluster:node:{self.node_id}".encode()
# Derive multiple identity keys
key_material = self.kdf.derive(
self.cluster_secret,
base_context + b":identity",
96
)
return {
'signing_key': key_material[0:32],
'encryption_key': key_material[32:64],
'authentication_key': key_material[64:96]
}
def derive_communication_keys(self, peer_node_id: str,
session_id: str) -> dict:
"""Derive keys for node-to-node communication"""
# Create deterministic context for both nodes
nodes = sorted([self.node_id, peer_node_id])
context = f"cluster:comm:{nodes[0]}:{nodes[1]}:{session_id}".encode()
comm_keys = self.kdf.derive(self.cluster_secret, context, 64)
return {
'send_key': comm_keys[0:32],
'receive_key': comm_keys[32:64]
}
def derive_storage_keys(self, data_partition: str) -> dict:
"""Derive keys for data storage"""
context = f"cluster:storage:node:{self.node_id}:partition:{data_partition}".encode()
storage_material = self.kdf.derive(self.cluster_secret, context, 80)
return {
'encryption_key': storage_material[0:32],
'integrity_key': storage_material[32:64],
'backup_key': storage_material[64:80]
}
class BLAKE3AntiCheat:
def __init__(self, game_secret: bytes):
self.game_secret = game_secret
self.kdf = BLAKE3_KDF()
def derive_client_keys(self, player_id: str, session_start: int) -> dict:
"""Derive anti-cheat keys for game client"""
context = f"anticheat:player:{player_id}:session:{session_start}".encode()
# Derive keys for different anti-cheat components
ac_material = self.kdf.derive(self.game_secret, context, 128)
return {
'memory_scan_key': ac_material[0:32],
'process_verify_key': ac_material[32:64],
'network_auth_key': ac_material[64:96],
'heartbeat_key': ac_material[96:128]
}
def derive_challenge_response(self, player_id: str,
challenge_data: bytes) -> bytes:
"""Derive expected response to anti-cheat challenge"""
context = f"anticheat:challenge:player:{player_id}".encode()
# Combine game secret with challenge
challenge_input = self.kdf.derive(
self.game_secret,
context,
32
)
# Derive response from challenge input and data
return self.kdf.derive(challenge_input, challenge_data, 32)
def derive_obfuscation_keys(self, code_section: str) -> bytes:
"""Derive keys for code obfuscation"""
context = f"anticheat:obfuscation:section:{code_section}".encode()
return self.kdf.derive(self.game_secret, context, 32)
class BLAKE3IoTProvisioning:
def __init__(self, manufacturer_secret: bytes):
self.manufacturer_secret = manufacturer_secret
self.kdf = BLAKE3_KDF()
def provision_device(self, device_id: str,
device_type: str,
production_batch: str) -> dict:
"""Provision IoT device with derived keys"""
base_context = (
f"iot:provision:type:{device_type}:"
f"batch:{production_batch}:device:{device_id}"
).encode()
# Derive comprehensive key set
key_material = self.kdf.derive(
self.manufacturer_secret,
base_context,
160 # 5 × 32-byte keys
)
return {
'device_identity': key_material[0:32],
'attestation_key': key_material[32:64],
'communication_key': key_material[64:96],
'storage_encryption': key_material[96:128],
'update_verification': key_material[128:160]
}
def derive_firmware_key(self, device_id: str,
firmware_version: str) -> bytes:
"""Derive firmware-specific encryption key"""
context = f"iot:firmware:device:{device_id}:version:{firmware_version}".encode()
return self.kdf.derive(self.manufacturer_secret, context, 32)
def derive_sensor_keys(self, device_id: str,
sensor_types: list) -> dict:
"""Derive keys for different sensor types"""
keys = {}
for sensor_type in sensor_types:
context = f"iot:sensor:device:{device_id}:type:{sensor_type}".encode()
keys[sensor_type] = self.kdf.derive(
self.manufacturer_secret,
context,
32
)
return keys
| Feature | BLAKE3 KDF | HKDF |
|---|---|---|
| Speed | Very Fast | Moderate |
| Output Length | Unlimited | Limited (255 × hash_len) |
| Parallelization | Yes | No |
| Memory Usage | Low | Low |
| Standardization | Emerging | RFC 5869 |
| Hardware Support | Limited | Widespread |
| Feature | BLAKE3 KDF | PBKDF2 |
|---|---|---|
| Purpose | General KDF | Password-based |
| Speed | Very Fast | Intentionally Slow |
| Memory Usage | Low | Low |
| Iterations | None | High |
| Salt Support | Context parameter | Required |
| Use Case | High-entropy input | Low-entropy passwords |
# Use BLAKE3 KDF for high-performance applications
if performance_critical:
kdf = BLAKE3_KDF()
# Use BLAKE3 KDF for large key derivation
if need_large_amounts_of_key_material:
kdf = BLAKE3_KDF() # Unlimited output
# Use BLAKE3 KDF for parallel processing
if can_utilize_multiple_cores:
kdf = BLAKE3_KDF() # Parallelizable
# Use BLAKE3 KDF for streaming applications
if need_streaming_key_derivation:
kdf = BLAKE3_KDF() # Excellent streaming support
# Before: HKDF
from metamui_crypto import HKDF, SHA256
hkdf = HKDF(hash_function=SHA256())
derived_key = hkdf.derive(ikm=input_key, info=context, length=32)
# After: BLAKE3 KDF
from metamui_crypto import BLAKE3_KDF
blake3_kdf = BLAKE3_KDF()
derived_key = blake3_kdf.derive(
input_key_material=input_key,
context=context,
output_length=32
)
# Migration wrapper
class KDFMigrator:
def __init__(self, use_blake3=True):
self.use_blake3 = use_blake3
def derive(self, input_key: bytes, context: bytes, length: int) -> bytes:
if self.use_blake3:
kdf = BLAKE3_KDF()
return kdf.derive(input_key, context, length)
else:
hkdf = HKDF(hash_function=SHA256())
return hkdf.derive(ikm=input_key, info=context, length=length)
# Before: Custom hash-based KDF
def custom_kdf(key: bytes, context: bytes, length: int) -> bytes:
output = b""
counter = 0
while len(output) < length:
hash_input = key + context + counter.to_bytes(4, 'big')
output += hashlib.sha256(hash_input).digest()
counter += 1
return output[:length]
# After: BLAKE3 KDF (more secure and faster)
blake3_kdf = BLAKE3_KDF()
output = blake3_kdf.derive(key, context, length)
# Test Vector 1: Basic derivation
input_key = bytes.fromhex("0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b")
context = b"BLAKE3 KDF test"
length = 32
blake3_kdf = BLAKE3_KDF()
output = blake3_kdf.derive(input_key, context, length)
# Expected output depends on BLAKE3 implementation
# Test Vector 2: Empty context
output_empty = blake3_kdf.derive(input_key, b"", length)
assert output != output_empty # Different contexts produce different outputs
# Test Vector 3: Different lengths
output_16 = blake3_kdf.derive(input_key, context, 16)
output_64 = blake3_kdf.derive(input_key, context, 64)
assert output_16 == output_64[:16] # Shorter output is prefix of longer
# Test Vector 4: Streaming vs batch
input_key = generate_random_bytes(32)
context = b"streaming test"
# Batch derivation
batch_output = blake3_kdf.derive(input_key, context, 1024)
# Streaming derivation
stream = BLAKE3StreamingKDF(input_key, context)
stream_output = b""
for _ in range(32): # 32 × 32 bytes = 1024 bytes
stream_output += stream.read_key_bytes(32)
assert batch_output == stream_output
# Test Vector 5: Context separation
key = generate_random_bytes(32)
context1 = b"context_one"
context2 = b"context_two"
output1 = blake3_kdf.derive(key, context1, 32)
output2 = blake3_kdf.derive(key, context2, 32)
assert output1 != output2 # Different contexts must produce different outputs