001package com.nimbusds.infinispan.persistence.ldap;
002
003
004import java.util.concurrent.Executor;
005import java.util.concurrent.atomic.AtomicInteger;
006
007import com.nimbusds.infinispan.persistence.common.InfinispanEntry;
008import com.nimbusds.infinispan.persistence.common.InfinispanStore;
009import com.nimbusds.infinispan.persistence.ldap.backend.LDAPConnector;
010import com.unboundid.ldap.sdk.DN;
011import com.unboundid.ldap.sdk.ReadOnlyEntry;
012import net.jcip.annotations.ThreadSafe;
013import org.infinispan.commons.configuration.ConfiguredBy;
014import org.infinispan.filter.KeyFilter;
015import org.infinispan.marshall.core.MarshalledEntry;
016import org.infinispan.marshall.core.MarshalledEntryFactory;
017import org.infinispan.persistence.TaskContextImpl;
018import org.infinispan.persistence.spi.InitializationContext;
019import org.infinispan.persistence.spi.PersistenceException;
020import org.kohsuke.MetaInfServices;
021
022
023/**
024 * LDAP store for Infinispan 8.2+ caches and maps.
025 */
026@ThreadSafe
027@MetaInfServices
028@ConfiguredBy(LDAPStoreConfiguration.class)
029public class LDAPStore<K,V> extends InfinispanStore<K,V> {
030
031
032        /**
033         * The LDAP store configuration.
034         */
035        private LDAPStoreConfiguration config;
036
037
038        /**
039         * The LDAP backend connector.
040         */
041        private LDAPConnector ldapConnector;
042
043
044        /**
045         * The LDAP entry transformer (to / from Infinispan entries).
046         */
047        private LDAPEntryTransformer<K,V> ldapEntryTransformer;
048
049
050        /**
051         * The marshalled Infinispan entry factory.
052         */
053        private MarshalledEntryFactory<K, V> marshalledEntryFactory;
054
055
056        /**
057         * Purges expired entries found in the LDAP store, as indicated by
058         * their persisted metadata (optional, may be ignored / not stored).
059         */
060        private ExpiredEntryReaper<K,V> reaper;
061
062
063        /**
064         * Loads an LDAP entry transformer with the specified class name.
065         *
066         * @param className The class name. Must not be {@code null}.
067         *
068         * @return The LDAP entry transformer.
069         */
070        @SuppressWarnings( "unchecked" )
071        private LDAPEntryTransformer<K,V> loadEntryTransformerClass(final String className) {
072
073                try {
074                        Class<LDAPEntryTransformer<K,V>> clazz = (Class<LDAPEntryTransformer<K,V>>)Class.forName(className);
075                        return clazz.newInstance();
076                } catch (Exception e) {
077                        throw new PersistenceException("Couldn't load LDAP entry transformer class: " + e.getMessage(), e);
078                }
079        }
080
081
082        @Override
083        @SuppressWarnings("unchecked")
084        public void init(final InitializationContext ctx) {
085
086                // This method will be invoked by the PersistenceManager during initialization. The InitializationContext
087                // contains:
088                // - this CacheLoader's configuration
089                // - the cache to which this loader is applied. Your loader might want to use the cache's name to construct
090                //   cache-specific identifiers
091                // - the StreamingMarshaller that needs to be used to marshall/unmarshall the entries
092                // - a TimeService which the loader can use to determine expired entries
093                // - a ByteBufferFactory which needs to be used to construct ByteBuffers
094                // - a MarshalledEntryFactory which needs to be used to construct entries from the data retrieved by the loader
095
096                super.init(ctx);
097                
098                this.config = ctx.getConfiguration();
099
100                Loggers.MAIN_LOG.info("[IL0201] LDAP store configuration properties for cache {}:", getCacheName());
101                config.log();
102
103                Loggers.MAIN_LOG.debug("[IL0202] Loading LDAP entry transformer class {} for cache {}...",
104                        config.ldapDirectory.entryTransformer,
105                        getCacheName());
106
107                ldapEntryTransformer = loadEntryTransformerClass(config.ldapDirectory.entryTransformer);
108
109                marshalledEntryFactory = (MarshalledEntryFactory<K, V>)ctx.getMarshalledEntryFactory();
110
111                Loggers.MAIN_LOG.info("[IL0203] Initialized LDAP external store for cache {}", getCacheName());
112        }
113
114
115        @Override
116        public void start() {
117
118                // This method will be invoked by the PersistenceManager to start the CacheLoader. At this stage configuration
119                // is complete and the loader can perform operations such as opening a connection to the external storage,
120                // initialize internal data structures, etc.
121
122                if (ldapConnector != null) {
123                        throw new IllegalStateException("LDAP store connector already started");
124                }
125
126                ldapConnector = new LDAPConnector(
127                        config,
128                        getCacheName(),
129                        ldapEntryTransformer.getModifiableAttributes(),
130                        ldapEntryTransformer.includesAttributesWithOptions());
131
132                Loggers.MAIN_LOG.info("[IL0204] Started LDAP external store connector for cache {}", getCacheName());
133
134                reaper = new ExpiredEntryReaper<>(ldapConnector, ldapEntryTransformer);
135        }
136
137
138        @Override
139        public void stop() {
140
141                if (ldapConnector != null) {
142                        ldapConnector.shutdown();
143                        Loggers.MAIN_LOG.info("[IL0205] Stopped LDAP external store connector for cache {}", getCacheName());
144                }
145                
146                super.stop();
147        }
148
149
150        @SuppressWarnings("unchecked")
151        private K resolveKey(final Object key) {
152
153                if (key instanceof byte[]) {
154                        throw new PersistenceException("Cannot resolve " + getCacheName() + " cache key from byte[], enable compatibility mode");
155                }
156
157                return (K)key;
158        }
159
160
161        @Override
162        public boolean contains(final Object key) {
163
164                // This method will be invoked by the PersistenceManager to determine if the loader contains the specified key.
165                // The implementation should be as fast as possible, e.g. it should strive to transfer the least amount of data possible
166                // from the external storage to perform the check. Also, if possible, make sure the field is indexed on the external storage
167                // so that its existence can be determined as quickly as possible.
168                //
169                // Note that keys will be in the cache's native format, which means that if the cache is being used by a remoting protocol
170                // such as HotRod or REST and compatibility mode has not been enabled, then they will be encoded in a byte[].
171
172                Loggers.LDAP_LOG.trace("[IL0250] LDAP store: Checking {} cache key {}", getCacheName(), key);
173
174                DN dn = new DN(ldapEntryTransformer.resolveRDN(resolveKey(key)), config.ldapDirectory.baseDN);
175
176                return ldapConnector.entryExists(dn);
177        }
178
179
180        @Override
181        @SuppressWarnings("unchecked")
182        public MarshalledEntry<K,V> load(final Object key) {
183
184                // Fetches an entry from the storage using the specified key. The CacheLoader should retrieve from the external storage all
185                // of the data that is needed to reconstruct the entry in memory, i.e. the value and optionally the metadata. This method
186                // needs to return a MarshalledEntry which can be constructed as follows:
187                //
188                // ctx.getMarshalledEntryFactory().new MarshalledEntry(key, value, metadata);
189                //
190                // If the entry does not exist or has expired, this method should return null.
191                // If an error occurs while retrieving data from the external storage, this method should throw a PersistenceException
192                //
193                // Note that keys and values will be in the cache's native format, which means that if the cache is being used by a remoting protocol
194                // such as HotRod or REST and compatibility mode has not been enabled, then they will be encoded in a byte[].
195                // If the loader needs to have knowledge of the key/value data beyond their binary representation, then it needs access to the key's and value's
196                // classes and the marshaller used to encode them.
197
198                Loggers.LDAP_LOG.trace("[IL0251] LDAP store: Loading {} cache entry with key {}", getCacheName(), key);
199
200                DN dn = new DN(ldapEntryTransformer.resolveRDN(resolveKey(key)), config.ldapDirectory.baseDN);
201
202                Loggers.LDAP_LOG.trace("[IL0257] LDAP store: Resolved DN {}", dn);
203
204                ReadOnlyEntry ldapEntry = ldapConnector.retrieveEntry(dn);
205
206                if (ldapEntry == null) {
207                        // Not found
208                        Loggers.LDAP_LOG.trace("[IL0258] LDAP store: Entry not found");
209                        return null;
210                }
211
212
213                if (Loggers.LDAP_LOG.isTraceEnabled()) {
214                        Loggers.LDAP_LOG.trace("[IL0259] LDAP store: Retrieved entry: {}", ldapEntry.toLDIFString());
215                }
216
217                // Transform LDAP entry to Infinispan entry
218                InfinispanEntry<K,V> infinispanEntry = ldapEntryTransformer.toInfinispanEntry(new LDAPEntry(ldapEntry));
219
220                return marshalledEntryFactory.newMarshalledEntry(infinispanEntry.getKey(), infinispanEntry.getValue(), infinispanEntry.getMetadata());
221        }
222
223
224        @Override
225        public boolean delete(final Object key) {
226
227                // The CacheWriter should remove from the external storage the entry identified by the specified key.
228                // Note that keys will be in the cache's native format, which means that if the cache is being used by a remoting protocol
229                // such as HotRod or REST and compatibility mode has not been enabled, then they will be encoded in a byte[].
230
231                Loggers.LDAP_LOG.trace("[IL0252] LDAP store: Deleting {} entry with key {}", getCacheName(), key);
232
233                DN dn = new DN(ldapEntryTransformer.resolveRDN(resolveKey(key)), config.ldapDirectory.baseDN);
234
235                return ldapConnector.deleteEntry(dn);
236        }
237
238
239        @Override
240        public void write(final MarshalledEntry<? extends K, ? extends V> marshalledEntry) {
241
242                // The CacheWriter should write the specified entry to the external storage.
243                //
244                // The PersistenceManager uses MarshalledEntry as the default format so that CacheWriters can efficiently store data coming
245                // from a remote node, thus avoiding any additional transformation steps.
246                //
247                // Note that keys and values will be in the cache's native format, which means that if the cache is being used by a remoting protocol
248                // such as HotRod or REST and compatibility mode has not been enabled, then they will be encoded in a byte[].
249
250                Loggers.LDAP_LOG.trace("[IL0253] LDAP store: Writing {} entry {}", getCacheName(), marshalledEntry);
251
252                LDAPEntry ldapEntry = ldapEntryTransformer.toLDAPEntry(
253                        config.ldapDirectory.baseDN,
254                        new InfinispanEntry<>(
255                                marshalledEntry.getKey(),
256                                marshalledEntry.getValue(),
257                                marshalledEntry.getMetadata()));
258
259                // Resolve the LDAP write strategy
260                LDAPWriteStrategy writeStrategy = ldapEntry.getWriteStrategy();
261
262                if (writeStrategy != null) {
263                        Loggers.LDAP_LOG.trace("[IL0263] LDAP store: Entry transformer suggested {} write strategy", writeStrategy);
264                } else {
265                        writeStrategy = LDAPWriteStrategy.getDefault();
266                        Loggers.LDAP_LOG.trace("[IL0264] LDAP store: Defaulted to {} write strategy", writeStrategy);
267                }
268
269                // Entry metadata created timestamp unreliable, cannot be used
270                // as hint whether LDAP ADD or LDAP MODIFY should be attempted
271                // first
272                // InternalMetadataImpl{actual=EmbeddedExpirableMetadata{lifespan=-1, maxIdle=-1, version=null}, created=-1, lastUsed=-1}
273
274
275                switch (writeStrategy) {
276
277                        case TRY_LDAP_ADD_FIRST:
278
279                                if (ldapConnector.addEntry(ldapEntry.getEntry())) {
280                                        Loggers.LDAP_LOG.trace("[IL0256] LDAP store: Added new {} entry with DN {}", getCacheName(), ldapEntry.getEntry().getDN());
281                                        return; // success
282                                }
283
284                                // Entry already exists, attempt LDAP modify
285                                if (ldapConnector.replaceEntry(ldapEntry.getEntry())) {
286                                        Loggers.LDAP_LOG.trace("[IL0257] LDAP store: Replaced {} entry with DN {}", getCacheName(), ldapEntry.getEntry().getDN());
287                                        return; // success
288                                }
289
290                                // Try to recover from concurrent LDAP delete
291                                // (entry deleted between first failed add and second modify attempt)
292                                if (! ldapConnector.addEntry(ldapEntry.getEntry())) {
293                                        // This should be highly unlikely
294                                        throw new PersistenceException("Failed recovery from concurrent LDAP delete (" + getCacheName() + " cache): " + ldapEntry.getEntry().getDN());
295                                }
296
297                                break;
298
299                        case TRY_LDAP_MODIFY_FIRST:
300
301                                if (ldapConnector.replaceEntry(ldapEntry.getEntry())) {
302                                        Loggers.LDAP_LOG.trace("[IL0265] LDAP store: Replaced {} entry with DN {}", getCacheName(), ldapEntry.getEntry().getDN());
303                                        return; // success
304                                }
305
306                                // Entry doesn't exist, try LDAP add
307                                if (ldapConnector.addEntry(ldapEntry.getEntry())) {
308                                        Loggers.LDAP_LOG.trace("[IL0266] LDAP store: Added new {} entry with DN {}", getCacheName(), ldapEntry.getEntry().getDN());
309                                        return; // success
310                                }
311
312                                // Try to recover from concurrent LDAP add
313                                // (entry added between first failed replace and second add attempt)
314                                if (! ldapConnector.replaceEntry(ldapEntry.getEntry())) {
315                                        // This should be highly unlikely
316                                        throw new PersistenceException("Failed recovery from concurrent LDAP add (" + getCacheName() + " cache): " + ldapEntry.getEntry().getDN());
317                                }
318
319                                break;
320
321                        default:
322                                throw new PersistenceException("Unexpected LDAP write strategy: " + writeStrategy);
323                }
324        }
325
326
327        @Override
328        public void process(final KeyFilter<? super K> keyFilter,
329                            final CacheLoaderTask<K, V> cacheLoaderTask,
330                            final Executor executor,
331                            final boolean fetchValue,
332                            final boolean fetchMetadata) {
333
334                Loggers.LDAP_LOG.trace("[IL0262] LDAP store: Processing key filter for {} cache: fetchValue={} fetchMetadata=", getCacheName(), fetchValue, fetchMetadata);
335
336                final TaskContext taskContext = new TaskContextImpl();
337
338                // TODO consider multi-threaded LDAP retrieval?
339                executor.execute(() -> ldapConnector.retrieveEntries(ldapEntry -> {
340
341                        // Retrieves entire entry, fetchValue / fetchMetadata params are ignored TODO consider
342
343                        if (taskContext.isStopped()) {
344                                // TODO Consider pushing task context to LDAP connector routine
345                                return;
346                        }
347
348                        InfinispanEntry<K,V> infinispanEntry = ldapEntryTransformer.toInfinispanEntry(new LDAPEntry(ldapEntry));
349
350                        if (keyFilter.accept(infinispanEntry.getKey())) {
351
352                                MarshalledEntry<K,V> marshalledEntry = marshalledEntryFactory.newMarshalledEntry(
353                                        infinispanEntry.getKey(),
354                                        infinispanEntry.getValue(),
355                                        infinispanEntry.getMetadata());
356
357                                try {
358                                        cacheLoaderTask.processEntry(marshalledEntry, taskContext);
359
360                                } catch (InterruptedException e) {
361                                        throw new PersistenceException(e.getMessage(), e);
362                                }
363                        }
364                }));
365        }
366
367
368        @Override
369        public int size() {
370
371                // Infinispan code analysis on 8.2 shows that this method is never called in practise, and
372                // is not wired to the data / cache container API
373
374                Loggers.LDAP_LOG.trace("[IL0258] LDAP store: Counting {} entries", getCacheName());
375
376                final int count = ldapConnector.countEntries();
377
378                Loggers.LDAP_LOG.trace("[IL0259] LDAP store: Counted {} {} entries", count, getCacheName());
379
380                return count;
381        }
382
383
384        @Override
385        public void clear() {
386
387                Loggers.LDAP_LOG.trace("[IL0260] LDAP store: Clearing {} entries", getCacheName());
388
389                int numDeleted = ldapConnector.deleteEntries();
390
391                Loggers.LDAP_LOG.debug("[IL0254] LDAP store: Cleared {} {} entries", numDeleted, getCacheName());
392        }
393
394
395        @Override
396        public void purge(final Executor executor, final PurgeListener<? super K> purgeListener) {
397
398                Loggers.LDAP_LOG.trace("[IL0261] LDAP store: Purging {} entries", getCacheName());
399
400                final AtomicInteger numPurged = new AtomicInteger();
401
402                executor.execute(() -> numPurged.set(reaper.purge(purgeListener)));
403
404                Loggers.LDAP_LOG.debug("[IL0255] LDAP store: Purged {} expired {} entries", numPurged.get(), getCacheName());
405        }
406}