001package com.nimbusds.infinispan.persistence.ldap; 002 003 004import java.util.concurrent.Executor; 005import java.util.concurrent.atomic.AtomicInteger; 006 007import com.nimbusds.infinispan.persistence.common.InfinispanEntry; 008import com.nimbusds.infinispan.persistence.common.InfinispanStore; 009import com.nimbusds.infinispan.persistence.ldap.backend.LDAPConnector; 010import com.unboundid.ldap.sdk.DN; 011import com.unboundid.ldap.sdk.ReadOnlyEntry; 012import net.jcip.annotations.ThreadSafe; 013import org.infinispan.commons.configuration.ConfiguredBy; 014import org.infinispan.filter.KeyFilter; 015import org.infinispan.marshall.core.MarshalledEntry; 016import org.infinispan.marshall.core.MarshalledEntryFactory; 017import org.infinispan.persistence.TaskContextImpl; 018import org.infinispan.persistence.spi.InitializationContext; 019import org.infinispan.persistence.spi.PersistenceException; 020import org.kohsuke.MetaInfServices; 021 022 023/** 024 * LDAP store for Infinispan 8.2+ caches and maps. 025 */ 026@ThreadSafe 027@MetaInfServices 028@ConfiguredBy(LDAPStoreConfiguration.class) 029public class LDAPStore<K,V> extends InfinispanStore<K,V> { 030 031 032 /** 033 * The LDAP store configuration. 034 */ 035 private LDAPStoreConfiguration config; 036 037 038 /** 039 * The LDAP backend connector. 040 */ 041 private LDAPConnector ldapConnector; 042 043 044 /** 045 * The LDAP entry transformer (to / from Infinispan entries). 046 */ 047 private LDAPEntryTransformer<K,V> ldapEntryTransformer; 048 049 050 /** 051 * The marshalled Infinispan entry factory. 052 */ 053 private MarshalledEntryFactory<K, V> marshalledEntryFactory; 054 055 056 /** 057 * Purges expired entries found in the LDAP store, as indicated by 058 * their persisted metadata (optional, may be ignored / not stored). 059 */ 060 private ExpiredEntryReaper<K,V> reaper; 061 062 063 /** 064 * Loads an LDAP entry transformer with the specified class name. 065 * 066 * @param className The class name. Must not be {@code null}. 067 * 068 * @return The LDAP entry transformer. 069 */ 070 @SuppressWarnings( "unchecked" ) 071 private LDAPEntryTransformer<K,V> loadEntryTransformerClass(final String className) { 072 073 try { 074 Class<LDAPEntryTransformer<K,V>> clazz = (Class<LDAPEntryTransformer<K,V>>)Class.forName(className); 075 return clazz.newInstance(); 076 } catch (Exception e) { 077 throw new PersistenceException("Couldn't load LDAP entry transformer class: " + e.getMessage(), e); 078 } 079 } 080 081 082 @Override 083 @SuppressWarnings("unchecked") 084 public void init(final InitializationContext ctx) { 085 086 // This method will be invoked by the PersistenceManager during initialization. The InitializationContext 087 // contains: 088 // - this CacheLoader's configuration 089 // - the cache to which this loader is applied. Your loader might want to use the cache's name to construct 090 // cache-specific identifiers 091 // - the StreamingMarshaller that needs to be used to marshall/unmarshall the entries 092 // - a TimeService which the loader can use to determine expired entries 093 // - a ByteBufferFactory which needs to be used to construct ByteBuffers 094 // - a MarshalledEntryFactory which needs to be used to construct entries from the data retrieved by the loader 095 096 super.init(ctx); 097 098 this.config = ctx.getConfiguration(); 099 100 Loggers.MAIN_LOG.info("[IL0201] LDAP store configuration properties for cache {}:", getCacheName()); 101 config.log(); 102 103 Loggers.MAIN_LOG.debug("[IL0202] Loading LDAP entry transformer class {} for cache {}...", 104 config.ldapDirectory.entryTransformer, 105 getCacheName()); 106 107 ldapEntryTransformer = loadEntryTransformerClass(config.ldapDirectory.entryTransformer); 108 109 marshalledEntryFactory = (MarshalledEntryFactory<K, V>)ctx.getMarshalledEntryFactory(); 110 111 Loggers.MAIN_LOG.info("[IL0203] Initialized LDAP external store for cache {}", getCacheName()); 112 } 113 114 115 @Override 116 public void start() { 117 118 // This method will be invoked by the PersistenceManager to start the CacheLoader. At this stage configuration 119 // is complete and the loader can perform operations such as opening a connection to the external storage, 120 // initialize internal data structures, etc. 121 122 if (ldapConnector != null) { 123 throw new IllegalStateException("LDAP store connector already started"); 124 } 125 126 ldapConnector = new LDAPConnector( 127 config, 128 getCacheName(), 129 ldapEntryTransformer.getModifiableAttributes(), 130 ldapEntryTransformer.includesAttributesWithOptions()); 131 132 Loggers.MAIN_LOG.info("[IL0204] Started LDAP external store connector for cache {}", getCacheName()); 133 134 reaper = new ExpiredEntryReaper<>(ldapConnector, ldapEntryTransformer); 135 } 136 137 138 @Override 139 public void stop() { 140 141 if (ldapConnector != null) { 142 ldapConnector.shutdown(); 143 Loggers.MAIN_LOG.info("[IL0205] Stopped LDAP external store connector for cache {}", getCacheName()); 144 } 145 146 super.stop(); 147 } 148 149 150 @SuppressWarnings("unchecked") 151 private K resolveKey(final Object key) { 152 153 if (key instanceof byte[]) { 154 throw new PersistenceException("Cannot resolve " + getCacheName() + " cache key from byte[], enable compatibility mode"); 155 } 156 157 return (K)key; 158 } 159 160 161 @Override 162 public boolean contains(final Object key) { 163 164 // This method will be invoked by the PersistenceManager to determine if the loader contains the specified key. 165 // The implementation should be as fast as possible, e.g. it should strive to transfer the least amount of data possible 166 // from the external storage to perform the check. Also, if possible, make sure the field is indexed on the external storage 167 // so that its existence can be determined as quickly as possible. 168 // 169 // Note that keys will be in the cache's native format, which means that if the cache is being used by a remoting protocol 170 // such as HotRod or REST and compatibility mode has not been enabled, then they will be encoded in a byte[]. 171 172 Loggers.LDAP_LOG.trace("[IL0250] LDAP store: Checking {} cache key {}", getCacheName(), key); 173 174 DN dn = new DN(ldapEntryTransformer.resolveRDN(resolveKey(key)), config.ldapDirectory.baseDN); 175 176 return ldapConnector.entryExists(dn); 177 } 178 179 180 @Override 181 @SuppressWarnings("unchecked") 182 public MarshalledEntry<K,V> load(final Object key) { 183 184 // Fetches an entry from the storage using the specified key. The CacheLoader should retrieve from the external storage all 185 // of the data that is needed to reconstruct the entry in memory, i.e. the value and optionally the metadata. This method 186 // needs to return a MarshalledEntry which can be constructed as follows: 187 // 188 // ctx.getMarshalledEntryFactory().new MarshalledEntry(key, value, metadata); 189 // 190 // If the entry does not exist or has expired, this method should return null. 191 // If an error occurs while retrieving data from the external storage, this method should throw a PersistenceException 192 // 193 // Note that keys and values will be in the cache's native format, which means that if the cache is being used by a remoting protocol 194 // such as HotRod or REST and compatibility mode has not been enabled, then they will be encoded in a byte[]. 195 // If the loader needs to have knowledge of the key/value data beyond their binary representation, then it needs access to the key's and value's 196 // classes and the marshaller used to encode them. 197 198 Loggers.LDAP_LOG.trace("[IL0251] LDAP store: Loading {} cache entry with key {}", getCacheName(), key); 199 200 DN dn = new DN(ldapEntryTransformer.resolveRDN(resolveKey(key)), config.ldapDirectory.baseDN); 201 202 Loggers.LDAP_LOG.trace("[IL0257] LDAP store: Resolved DN {}", dn); 203 204 ReadOnlyEntry ldapEntry = ldapConnector.retrieveEntry(dn); 205 206 if (ldapEntry == null) { 207 // Not found 208 Loggers.LDAP_LOG.trace("[IL0258] LDAP store: Entry not found"); 209 return null; 210 } 211 212 213 if (Loggers.LDAP_LOG.isTraceEnabled()) { 214 Loggers.LDAP_LOG.trace("[IL0259] LDAP store: Retrieved entry: {}", ldapEntry.toLDIFString()); 215 } 216 217 // Transform LDAP entry to Infinispan entry 218 InfinispanEntry<K,V> infinispanEntry = ldapEntryTransformer.toInfinispanEntry(new LDAPEntry(ldapEntry)); 219 220 return marshalledEntryFactory.newMarshalledEntry(infinispanEntry.getKey(), infinispanEntry.getValue(), infinispanEntry.getMetadata()); 221 } 222 223 224 @Override 225 public boolean delete(final Object key) { 226 227 // The CacheWriter should remove from the external storage the entry identified by the specified key. 228 // Note that keys will be in the cache's native format, which means that if the cache is being used by a remoting protocol 229 // such as HotRod or REST and compatibility mode has not been enabled, then they will be encoded in a byte[]. 230 231 Loggers.LDAP_LOG.trace("[IL0252] LDAP store: Deleting {} entry with key {}", getCacheName(), key); 232 233 DN dn = new DN(ldapEntryTransformer.resolveRDN(resolveKey(key)), config.ldapDirectory.baseDN); 234 235 return ldapConnector.deleteEntry(dn); 236 } 237 238 239 @Override 240 public void write(final MarshalledEntry<? extends K, ? extends V> marshalledEntry) { 241 242 // The CacheWriter should write the specified entry to the external storage. 243 // 244 // The PersistenceManager uses MarshalledEntry as the default format so that CacheWriters can efficiently store data coming 245 // from a remote node, thus avoiding any additional transformation steps. 246 // 247 // Note that keys and values will be in the cache's native format, which means that if the cache is being used by a remoting protocol 248 // such as HotRod or REST and compatibility mode has not been enabled, then they will be encoded in a byte[]. 249 250 Loggers.LDAP_LOG.trace("[IL0253] LDAP store: Writing {} entry {}", getCacheName(), marshalledEntry); 251 252 LDAPEntry ldapEntry = ldapEntryTransformer.toLDAPEntry( 253 config.ldapDirectory.baseDN, 254 new InfinispanEntry<>( 255 marshalledEntry.getKey(), 256 marshalledEntry.getValue(), 257 marshalledEntry.getMetadata())); 258 259 // Resolve the LDAP write strategy 260 LDAPWriteStrategy writeStrategy = ldapEntry.getWriteStrategy(); 261 262 if (writeStrategy != null) { 263 Loggers.LDAP_LOG.trace("[IL0263] LDAP store: Entry transformer suggested {} write strategy", writeStrategy); 264 } else { 265 writeStrategy = LDAPWriteStrategy.getDefault(); 266 Loggers.LDAP_LOG.trace("[IL0264] LDAP store: Defaulted to {} write strategy", writeStrategy); 267 } 268 269 // Entry metadata created timestamp unreliable, cannot be used 270 // as hint whether LDAP ADD or LDAP MODIFY should be attempted 271 // first 272 // InternalMetadataImpl{actual=EmbeddedExpirableMetadata{lifespan=-1, maxIdle=-1, version=null}, created=-1, lastUsed=-1} 273 274 275 switch (writeStrategy) { 276 277 case TRY_LDAP_ADD_FIRST: 278 279 if (ldapConnector.addEntry(ldapEntry.getEntry())) { 280 Loggers.LDAP_LOG.trace("[IL0256] LDAP store: Added new {} entry with DN {}", getCacheName(), ldapEntry.getEntry().getDN()); 281 return; // success 282 } 283 284 // Entry already exists, attempt LDAP modify 285 if (ldapConnector.replaceEntry(ldapEntry.getEntry())) { 286 Loggers.LDAP_LOG.trace("[IL0257] LDAP store: Replaced {} entry with DN {}", getCacheName(), ldapEntry.getEntry().getDN()); 287 return; // success 288 } 289 290 // Try to recover from concurrent LDAP delete 291 // (entry deleted between first failed add and second modify attempt) 292 if (! ldapConnector.addEntry(ldapEntry.getEntry())) { 293 // This should be highly unlikely 294 throw new PersistenceException("Failed recovery from concurrent LDAP delete (" + getCacheName() + " cache): " + ldapEntry.getEntry().getDN()); 295 } 296 297 break; 298 299 case TRY_LDAP_MODIFY_FIRST: 300 301 if (ldapConnector.replaceEntry(ldapEntry.getEntry())) { 302 Loggers.LDAP_LOG.trace("[IL0265] LDAP store: Replaced {} entry with DN {}", getCacheName(), ldapEntry.getEntry().getDN()); 303 return; // success 304 } 305 306 // Entry doesn't exist, try LDAP add 307 if (ldapConnector.addEntry(ldapEntry.getEntry())) { 308 Loggers.LDAP_LOG.trace("[IL0266] LDAP store: Added new {} entry with DN {}", getCacheName(), ldapEntry.getEntry().getDN()); 309 return; // success 310 } 311 312 // Try to recover from concurrent LDAP add 313 // (entry added between first failed replace and second add attempt) 314 if (! ldapConnector.replaceEntry(ldapEntry.getEntry())) { 315 // This should be highly unlikely 316 throw new PersistenceException("Failed recovery from concurrent LDAP add (" + getCacheName() + " cache): " + ldapEntry.getEntry().getDN()); 317 } 318 319 break; 320 321 default: 322 throw new PersistenceException("Unexpected LDAP write strategy: " + writeStrategy); 323 } 324 } 325 326 327 @Override 328 public void process(final KeyFilter<? super K> keyFilter, 329 final CacheLoaderTask<K, V> cacheLoaderTask, 330 final Executor executor, 331 final boolean fetchValue, 332 final boolean fetchMetadata) { 333 334 Loggers.LDAP_LOG.trace("[IL0262] LDAP store: Processing key filter for {} cache: fetchValue={} fetchMetadata=", getCacheName(), fetchValue, fetchMetadata); 335 336 final TaskContext taskContext = new TaskContextImpl(); 337 338 // TODO consider multi-threaded LDAP retrieval? 339 executor.execute(() -> ldapConnector.retrieveEntries(ldapEntry -> { 340 341 // Retrieves entire entry, fetchValue / fetchMetadata params are ignored TODO consider 342 343 if (taskContext.isStopped()) { 344 // TODO Consider pushing task context to LDAP connector routine 345 return; 346 } 347 348 InfinispanEntry<K,V> infinispanEntry = ldapEntryTransformer.toInfinispanEntry(new LDAPEntry(ldapEntry)); 349 350 if (keyFilter.accept(infinispanEntry.getKey())) { 351 352 MarshalledEntry<K,V> marshalledEntry = marshalledEntryFactory.newMarshalledEntry( 353 infinispanEntry.getKey(), 354 infinispanEntry.getValue(), 355 infinispanEntry.getMetadata()); 356 357 try { 358 cacheLoaderTask.processEntry(marshalledEntry, taskContext); 359 360 } catch (InterruptedException e) { 361 throw new PersistenceException(e.getMessage(), e); 362 } 363 } 364 })); 365 } 366 367 368 @Override 369 public int size() { 370 371 // Infinispan code analysis on 8.2 shows that this method is never called in practise, and 372 // is not wired to the data / cache container API 373 374 Loggers.LDAP_LOG.trace("[IL0258] LDAP store: Counting {} entries", getCacheName()); 375 376 final int count = ldapConnector.countEntries(); 377 378 Loggers.LDAP_LOG.trace("[IL0259] LDAP store: Counted {} {} entries", count, getCacheName()); 379 380 return count; 381 } 382 383 384 @Override 385 public void clear() { 386 387 Loggers.LDAP_LOG.trace("[IL0260] LDAP store: Clearing {} entries", getCacheName()); 388 389 int numDeleted = ldapConnector.deleteEntries(); 390 391 Loggers.LDAP_LOG.debug("[IL0254] LDAP store: Cleared {} {} entries", numDeleted, getCacheName()); 392 } 393 394 395 @Override 396 public void purge(final Executor executor, final PurgeListener<? super K> purgeListener) { 397 398 Loggers.LDAP_LOG.trace("[IL0261] LDAP store: Purging {} entries", getCacheName()); 399 400 final AtomicInteger numPurged = new AtomicInteger(); 401 402 executor.execute(() -> numPurged.set(reaper.purge(purgeListener))); 403 404 Loggers.LDAP_LOG.debug("[IL0255] LDAP store: Purged {} expired {} entries", numPurged.get(), getCacheName()); 405 } 406}