001    /*
002    // $Id: //open/mondrian-release/3.0/src/main/mondrian/rolap/RolapStar.java#4 $
003    // This software is subject to the terms of the Common Public License
004    // Agreement, available at the following URL:
005    // http://www.opensource.org/licenses/cpl.html.
006    // Copyright (C) 2001-2002 Kana Software, Inc.
007    // Copyright (C) 2001-2007 Julian Hyde and others
008    // All Rights Reserved.
009    // You must accept the terms of that agreement to use this software.
010    //
011    // jhyde, 12 August, 2001
012    */
013    
014    package mondrian.rolap;
015    
016    import mondrian.olap.*;
017    import mondrian.rolap.agg.Aggregation;
018    import mondrian.rolap.agg.AggregationKey;
019    import mondrian.rolap.aggmatcher.AggStar;
020    import mondrian.rolap.sql.SqlQuery;
021    import mondrian.spi.DataSourceChangeListener;
022    import mondrian.util.Bug;
023    import org.apache.log4j.Logger;
024    import org.eigenbase.util.property.Property;
025    import org.eigenbase.util.property.TriggerBase;
026    
027    import javax.sql.DataSource;
028    import java.io.PrintWriter;
029    import java.io.StringWriter;
030    import java.sql.Connection;
031    import java.sql.*;
032    import java.util.*;
033    
034    /**
035     * A <code>RolapStar</code> is a star schema. It is the means to read cell
036     * values.
037     *
038     * <p>todo: put this in package which specicializes in relational aggregation,
039     * doesn't know anything about hierarchies etc.
040     *
041     * @author jhyde
042     * @since 12 August, 2001
043     * @version $Id: //open/mondrian-release/3.0/src/main/mondrian/rolap/RolapStar.java#4 $
044     */
045    public class RolapStar {
046        private static final Logger LOGGER = Logger.getLogger(RolapStar.class);
047    
048        /**
049         * Controls the aggregate data cache for all RolapStars.
050         * An administrator or tester might selectively enable or
051         * disable in memory caching to allow direct measurement of database
052         * performance.
053         */
054        private static boolean disableCaching =
055            MondrianProperties.instance().DisableCaching.get();
056    
057        static {
058            // Trigger is used to lookup and change the value of the
059            // variable that controls aggregate data caching
060            // Using a trigger means we don't have to look up the property eveytime.
061            MondrianProperties.instance().DisableCaching.addTrigger(
062                new TriggerBase(true) {
063                    public void execute(Property property, String value) {
064                        disableCaching = property.booleanValue();
065                        // must flush all caches
066                        if (disableCaching) {
067                            // REVIEW: could replace following code with call to
068                            // CacheControl.flush(CellRegion)
069                            for (Iterator<RolapSchema> itSchemas =
070                                RolapSchema.getRolapSchemas();
071                                 itSchemas.hasNext(); )
072                            {
073                                RolapSchema schema1 = itSchemas.next();
074                                for (RolapStar star : schema1.getStars()) {
075                                    star.clearCachedAggregations(true);
076                                }
077                            }
078                        }
079                    }
080                }
081            );
082        }
083    
084    
085        private final RolapSchema schema;
086    
087        // not final for test purposes
088        private DataSource dataSource;
089    
090        private final Table factTable;
091    
092        /** Holds all global aggregations of this star. */
093        private final Map<AggregationKey,Aggregation> sharedAggregations;
094    
095        /** Holds all thread-local aggregations of this star. */
096        private final ThreadLocal<Map<AggregationKey, Aggregation>>
097            localAggregations =
098                new ThreadLocal<Map<AggregationKey, Aggregation>>() {
099                protected Map<AggregationKey, Aggregation> initialValue() {
100                    return new HashMap<AggregationKey, Aggregation>();
101                }
102            };
103    
104        /**
105         * Holds all pending aggregations of this star that are waiting to
106         * be pushed into the global cache.  They cannot be pushed yet, because
107         * the aggregates in question are currently in use by other threads.
108         */
109        private final Map<AggregationKey, Aggregation> pendingAggregations;
110    
111        /**
112         * Holds all requests for aggregations.
113         */
114        private final List<AggregationKey> aggregationRequests;
115    
116        /**
117         * Holds all requests of aggregations per thread.
118         */
119        private final ThreadLocal<List<AggregationKey>> 
120            localAggregationRequests =
121                new ThreadLocal<List<AggregationKey>>() {
122                protected List<AggregationKey> initialValue() {
123                    return new ArrayList<AggregationKey>();
124                }
125            };
126    
127        /**
128         * Number of columns (column and columnName).
129         */
130        private int columnCount;
131    
132        private final SqlQuery.Dialect sqlQueryDialect;
133    
134        /**
135         * If true, then database aggregation information is cached, otherwise
136         * it is flushed after each query.
137         */
138        private boolean cacheAggregations;
139    
140        /**
141         * Partially ordered list of AggStars associated with this RolapStar's fact
142         * table
143         */
144        private List<AggStar> aggStars;
145    
146        private DataSourceChangeListener changeListener;
147    
148        // temporary model, should eventually use RolapStar.Table and RolapStar.Column
149        private StarNetworkNode factNode;
150        private Map<String, StarNetworkNode> nodeLookup =
151            new HashMap<String, StarNetworkNode>();
152    
153        /**
154         * Creates a RolapStar. Please use
155         * {@link RolapSchema.RolapStarRegistry#getOrCreateStar} to create a
156         * {@link RolapStar}.
157         */
158        RolapStar(
159            final RolapSchema schema,
160            final DataSource dataSource,
161            final MondrianDef.Relation fact)
162        {
163            this.cacheAggregations = true;
164            this.schema = schema;
165            this.dataSource = dataSource;
166            this.factTable = new RolapStar.Table(this, fact, null, null);
167    
168            // phase out and replace with Table, Column network
169            this.factNode = new StarNetworkNode(null, factTable.alias, null, null, null);
170    
171            this.sharedAggregations = new HashMap<AggregationKey, Aggregation>();
172            
173            this.pendingAggregations = new HashMap<AggregationKey, Aggregation>();
174    
175            this.aggregationRequests = new ArrayList<AggregationKey>();
176            
177            clearAggStarList();
178    
179            this.sqlQueryDialect = schema.getDialect();
180    
181            this.changeListener = schema.getDataSourceChangeListener();
182        }
183    
184        private static class StarNetworkNode {
185            private StarNetworkNode parent;
186            private MondrianDef.Relation origRel;
187            private String foreignKey;
188            private String joinKey;
189    
190            private StarNetworkNode(
191                StarNetworkNode parent,
192                String alias,
193                MondrianDef.Relation origRel,
194                String foreignKey,
195                String joinKey)
196            {
197                this.parent = parent;
198                this.origRel = origRel;
199                this.foreignKey = foreignKey;
200                this.joinKey = joinKey;
201            }
202    
203            private boolean isCompatible(
204                StarNetworkNode compatibleParent,
205                MondrianDef.Relation rel,
206                String compatibleForeignKey,
207                String compatibleJoinKey)
208            {
209                return (parent == compatibleParent &&
210                    origRel.getClass().equals(rel.getClass()) &&
211                    foreignKey.equals(compatibleForeignKey) &&
212                    joinKey.equals(compatibleJoinKey));
213            }
214        }
215    
216        private MondrianDef.RelationOrJoin cloneRelation(
217            MondrianDef.Relation rel,
218            String possibleName)
219        {
220            if (rel instanceof MondrianDef.Table) {
221                MondrianDef.Table tbl = (MondrianDef.Table)rel;
222                return new MondrianDef.Table(tbl.schema, tbl.name, possibleName);
223            } else if (rel instanceof MondrianDef.View) {
224                MondrianDef.View view = (MondrianDef.View)rel;
225                MondrianDef.View newView = new MondrianDef.View(view);
226                newView.alias = possibleName;
227                return newView;
228            } else if (rel instanceof MondrianDef.InlineTable) {
229                MondrianDef.InlineTable inlineTable =
230                    (MondrianDef.InlineTable) rel;
231                MondrianDef.InlineTable newInlineTable =
232                    new MondrianDef.InlineTable(inlineTable);
233                newInlineTable.alias = possibleName;
234                return newInlineTable;
235            } else {
236                throw new UnsupportedOperationException();
237            }
238        }
239    
240        /**
241         * Generates a unique relational join to the fact table via re-aliasing
242         * MondrianDef.Relations
243         *
244         * currently called in the RolapCubeHierarchy constructor.  This should
245         * eventually be phased out and replaced with RolapStar.Table and
246         * RolapStar.Column references
247         *
248         * @param rel the relation needing uniqueness
249         * @param factForeignKey the foreign key of the fact table
250         * @param primaryKey the join key of the relation
251         * @param primaryKeyTable the join table of the relation
252         * @return if necessary a new relation that has been re-aliased
253         */
254        public MondrianDef.RelationOrJoin getUniqueRelation(
255            MondrianDef.RelationOrJoin rel,
256            String factForeignKey,
257            String primaryKey,
258            String primaryKeyTable)
259        {
260            return getUniqueRelation(
261                factNode, rel, factForeignKey, primaryKey, primaryKeyTable);
262        }
263    
264        private MondrianDef.RelationOrJoin getUniqueRelation(
265            StarNetworkNode parent,
266            MondrianDef.RelationOrJoin relOrJoin,
267            String foreignKey,
268            String joinKey,
269            String joinKeyTable)
270        {
271            if (relOrJoin == null) {
272                return null;
273            } else if (relOrJoin instanceof MondrianDef.Relation) {
274                int val = 0;
275                MondrianDef.Relation rel =
276                    (MondrianDef.Relation) relOrJoin;
277                String newAlias =
278                    joinKeyTable != null ? joinKeyTable : rel.getAlias();
279                while (true) {
280                    StarNetworkNode node = nodeLookup.get(newAlias);
281                    if (node == null) {
282                        if (val != 0) {
283                            rel = (MondrianDef.Relation)
284                                cloneRelation(rel, newAlias);
285                        }
286                        node =
287                            new StarNetworkNode(
288                                parent, newAlias, rel, foreignKey, joinKey);
289                        nodeLookup.put(newAlias, node);
290                        return rel;
291                    } else if (node.isCompatible(
292                        parent, rel, foreignKey, joinKey))
293                    {
294                        return node.origRel;
295                    }
296                    newAlias = rel.getAlias() + "_" + (++val);
297                }
298            } else if (relOrJoin instanceof MondrianDef.Join) {
299                // determine if the join starts from the left or right side
300                MondrianDef.Join join = (MondrianDef.Join)relOrJoin;
301                MondrianDef.RelationOrJoin left = null;
302                MondrianDef.RelationOrJoin right = null;
303                if (join.getLeftAlias().equals(joinKeyTable)) {
304                    // first manage left then right
305                    left =
306                        getUniqueRelation(
307                            parent, join.left, foreignKey,
308                            joinKey, joinKeyTable);
309                    parent = nodeLookup.get(
310                        ((MondrianDef.Relation) left).getAlias());
311                    right =
312                        getUniqueRelation(
313                            parent, join.right, join.leftKey,
314                            join.rightKey, join.getRightAlias());
315                } else if (join.getRightAlias().equals(joinKeyTable)) {
316                    // right side must equal
317                    right =
318                        getUniqueRelation(
319                            parent, join.right, foreignKey,
320                            joinKey, joinKeyTable);
321                    parent = nodeLookup.get(
322                        ((MondrianDef.Relation) right).getAlias());
323                    left =
324                        getUniqueRelation(
325                            parent, join.left, join.rightKey,
326                            join.leftKey, join.getLeftAlias());
327                } else {
328                    new MondrianException(
329                        "failed to match primary key table to join tables");
330                }
331    
332                if (join.left != left || join.right != right) {
333                    join =
334                        new MondrianDef.Join(
335                            left instanceof MondrianDef.Relation
336                                ? ((MondrianDef.Relation) left).getAlias()
337                                : null,
338                            join.leftKey,
339                            left, 
340                            right instanceof MondrianDef.Relation
341                                ? ((MondrianDef.Relation) right).getAlias()
342                                : null,
343                            join.rightKey,
344                            right);
345                }
346                return join;
347            }
348            return null;
349        }
350    
351        /**
352         * Returns this RolapStar's column count. After a star has been created with
353         * all of its columns, this is the number of columns in the star.
354         */
355        public int getColumnCount() {
356            return columnCount;
357        }
358    
359        /**
360         * This is used by the {@link Column} constructor to get a unique id (per
361         * its parent {@link RolapStar}).
362         */
363        private int nextColumnCount() {
364            return columnCount++;
365        }
366    
367        /**
368         * This is used to decrement the column counter and is used if a newly
369         * created column is found to already exist.
370         */
371        private int decrementColumnCount() {
372            return columnCount--;
373        }
374    
375        /**
376         * This is a place holder in case in the future we wish to be able to
377         * reload aggregates. In that case, if aggregates had already been loaded,
378         * i.e., this star has some aggstars, then those aggstars are cleared.
379         */
380        public void prepareToLoadAggregates() {
381            aggStars = Collections.emptyList();
382        }
383    
384        /**
385         * Adds an {@link AggStar} to this star.
386         *
387         * <p>Internally the AggStars are added in sort order, smallest row count
388         * to biggest, so that the most efficient AggStar is encountered first;
389         * ties do not matter.
390         */
391        public void addAggStar(AggStar aggStar) {
392            if (aggStars == Collections.EMPTY_LIST) {
393                // if this is NOT a LinkedList, then the insertion time is longer.
394                aggStars = new LinkedList<AggStar>();
395            }
396    
397            // Add it before the first AggStar which is larger, if there is one.
398            int size = aggStar.getSize();
399            ListIterator<AggStar> lit = aggStars.listIterator();
400            while (lit.hasNext()) {
401                AggStar as = lit.next();
402                if (as.getSize() >= size) {
403                    lit.previous();
404                    lit.add(aggStar);
405                    return;
406                }
407            }
408    
409            // There is no larger star. Add at the end of the list.
410            aggStars.add(aggStar);
411        }
412    
413        /**
414         * Set the agg star list to empty.
415         */
416        void clearAggStarList() {
417            aggStars = Collections.emptyList();
418        }
419    
420        /**
421         * Reorder the list of aggregate stars. This should be called if the
422         * algorithm used to order the AggStars has been changed.
423         */
424        public void reOrderAggStarList() {
425            // the order of these two lines is important
426            List<AggStar> l = aggStars;
427            clearAggStarList();
428    
429            for (AggStar aggStar : l) {
430                addAggStar(aggStar);
431            }
432        }
433    
434        /**
435         * Returns this RolapStar's aggregate table AggStars, ordered in ascending
436         * order of size.
437         */
438        public List<AggStar> getAggStars() {
439            return aggStars;
440        }
441    
442        /**
443         * Returns the fact table at the center of this RolapStar.
444         *
445         * @return fact table
446         */
447        public Table getFactTable() {
448            return factTable;
449        }
450    
451        /**
452         * Clones an existing SqlQuery to create a new one (this cloning creates one
453         * with an empty sql query).
454         */
455        public SqlQuery getSqlQuery() {
456            return new SqlQuery(getSqlQueryDialect());
457        }
458    
459        /**
460         * Returns this RolapStar's SQL dialect.
461         */
462        public SqlQuery.Dialect getSqlQueryDialect() {
463            return sqlQueryDialect;
464        }
465    
466        /**
467         * Sets whether to cache database aggregation information; if false, cache
468         * is flushed after each query.
469         *
470         * <p>This method is called only by the RolapCube and is only called if
471         * caching is to be turned off. Note that the same RolapStar can be
472         * associated with more than on RolapCube. If any one of those cubes has
473         * caching turned off, then caching is turned off for all of them.
474         *
475         * @param cacheAggregations Whether to cache database aggregation
476         */
477        void setCacheAggregations(boolean cacheAggregations) {
478            // this can only change from true to false
479            this.cacheAggregations = cacheAggregations;
480            clearCachedAggregations(false);
481        }
482    
483        /**
484         * Returns whether the this RolapStar cache aggregates.
485         *
486         * @see #setCacheAggregations(boolean)
487         */
488        boolean isCacheAggregations() {
489            return this.cacheAggregations;
490        }
491    
492        /**
493         * Clears the aggregate cache. This only does something if aggregate caching
494         * is disabled (see {@link #setCacheAggregations(boolean)}).
495         *
496         * @param forced If true, clears cached aggregations regardless of any other
497         *   settings.  If false, clears only cache from the current thread
498         */
499        void clearCachedAggregations(boolean forced) {
500            if (forced || !cacheAggregations || RolapStar.disableCaching) {
501                if (LOGGER.isDebugEnabled()) {
502                    StringBuilder buf = new StringBuilder(100);
503                    buf.append("RolapStar.clearCachedAggregations: schema=");
504                    buf.append(schema.getName());
505                    buf.append(", star=");
506                    buf.append(getFactTable().getAlias());
507                    LOGGER.debug(buf.toString());
508                }
509    
510                if (forced) {
511                    synchronized (sharedAggregations) {
512                        sharedAggregations.clear();
513                    }
514                    localAggregations.get().clear();
515                } else {
516                    // Only clear aggregation cache for the currect thread context.
517                    localAggregations.get().clear();
518                }
519            }
520    
521        }
522    
523        /**
524         * Looks up an aggregation or creates one if it does not exist in an
525         * atomic (synchronized) operation.
526         *
527         * <p>When a new aggregation is created, it is marked as thread local.
528         *
529         * @param aggregationKey this is the contrained column bitkey
530         */
531        public Aggregation lookupOrCreateAggregation(AggregationKey aggregationKey) {
532    
533            Aggregation aggregation = lookupAggregation(aggregationKey);
534    
535            if (aggregation == null) {
536                aggregation = new Aggregation(aggregationKey);
537    
538                this.localAggregations.get().put(aggregationKey, aggregation);
539    
540                // Let the change listener get the opportunity to register the
541                // first time the aggregation is used
542                if ((this.cacheAggregations) && (!RolapStar.disableCaching)) {
543                    if (changeListener != null) {
544                        Util.discard(changeListener.isAggregationChanged(aggregation));
545                    }
546                }
547            }
548            return aggregation;
549        }
550    
551        /**
552         * Looks for an existing aggregation over a given set of columns, or
553         * returns <code>null</code> if there is none.
554         *
555         * <p>Thread local cache is taken first.
556         *
557         * <p>Must be called from synchronized context.
558         */
559        public Aggregation lookupAggregation(AggregationKey aggregationKey) {
560            // First try thread local cache
561            Aggregation aggregation = localAggregations.get().get(aggregationKey);
562            if (aggregation != null) {
563                return aggregation;
564            }
565    
566            if (cacheAggregations && !RolapStar.disableCaching) {
567                // Look in global cache
568                synchronized (sharedAggregations) {
569                    aggregation = sharedAggregations.get(aggregationKey);
570                    if (aggregation != null) {
571                        // Keep track of global aggregates that a query is using
572                        recordAggregationRequest(aggregationKey);
573                    }
574                }
575            }
576    
577            return aggregation;
578        }
579    
580        /**
581         * Checks whether an aggregation has changed since the last the time
582         * loaded.
583         *
584         * <p>If so, a new thread local aggregation will be made and added after
585         * the query has finished.
586         *
587         * <p>This method should be called before a query is executed and afterwards
588         * the function {@link #pushAggregateModificationsToGlobalCache()} should
589         * be called.
590         */
591        public void checkAggregateModifications() {
592    
593            // Clear own aggregation requests at the beginning of a query
594            // made by request to materialize results after RolapResult constructor
595            // is finished
596            clearAggregationRequests();
597    
598            if (changeListener != null) {
599                if (cacheAggregations && !RolapStar.disableCaching) {
600                    synchronized (sharedAggregations) {
601                        for (Map.Entry<AggregationKey, Aggregation> e :
602                            sharedAggregations.entrySet())
603                        {
604                            AggregationKey aggregationKey = e.getKey();
605    
606                            Aggregation aggregation = e.getValue();
607                            if (changeListener.isAggregationChanged(aggregation)) {
608                                // Create new thread local aggregation
609                                // This thread will renew aggregations
610                                // And these will be checked in if all queries
611                                // that are currently using these aggregates
612                                // are finished
613                                aggregation = new Aggregation(aggregationKey);
614    
615                                localAggregations.get().put(aggregationKey, aggregation);
616                            }
617                        }
618                    }
619                }
620            }
621        }
622    
623        /**
624         * Checks whether changed modifications may be pushed into global cache.
625         *
626         * <p>The method checks whether there are other running queries that are
627         * using the requested modifications.  If this is the case, modifications
628         * are not pushed yet.
629         */
630        public void pushAggregateModificationsToGlobalCache() {
631            // Need synchronized access to both aggregationRequests as to
632            // aggregations, synchronize this instead
633            synchronized (this) {
634                if (cacheAggregations && !RolapStar.disableCaching) {
635    
636                    // Push pending modifications other thread could not push
637                    // to global cache, because it was in use
638                    Iterator<Map.Entry<AggregationKey, Aggregation>>
639                        it = pendingAggregations.entrySet().iterator();
640                    while (it.hasNext()) {
641                        Map.Entry<AggregationKey, Aggregation> e = it.next();
642                        AggregationKey aggregationKey = e.getKey();
643                        Aggregation aggregation = e.getValue();
644                        // In case this aggregation is not requested by anyone
645                        // this aggregation may be pushed into global cache
646                        // otherwise put it in pending cache, that will be pushed
647                        // when another query finishes
648                        if (!isAggregationRequested(aggregationKey)) {
649                            pushAggregateModification(
650                                aggregationKey, aggregation,sharedAggregations);
651                            it.remove();
652                        }
653                    }
654                    // Push thread local modifications
655                    it = localAggregations.get().entrySet().iterator();
656                    while (it.hasNext()) {
657                        Map.Entry<AggregationKey, Aggregation> e = it.next();
658                        AggregationKey aggregationKey = e.getKey();
659                        Aggregation aggregation = e.getValue();
660                        // In case this aggregation is not requested by anyone
661                        // this aggregation may be pushed into global cache
662                        // otherwise put it in pending cache, that will be pushed
663                        // when another query finishes
664                        if (!isAggregationRequested(aggregationKey)) {
665                            pushAggregateModification(
666                                aggregationKey, aggregation, sharedAggregations);
667                        } else {
668                            pushAggregateModification(
669                                aggregationKey, aggregation, pendingAggregations);
670                        }
671                    }
672                    localAggregations.get().clear();
673                }
674                // Clear own aggregation requests
675                clearAggregationRequests();
676            }
677        }
678    
679        /**
680         * Pushes aggregations in destination aggregations, replacing older
681         * entries.
682         */
683        private void pushAggregateModification(
684            AggregationKey localAggregationKey,
685            Aggregation localAggregation,
686            Map<AggregationKey,Aggregation> destAggregations)
687        {
688            if (cacheAggregations && !RolapStar.disableCaching) {
689                synchronized (destAggregations) {
690    
691                    boolean found = false;
692                    Iterator<Map.Entry<AggregationKey, Aggregation>>
693                            it = destAggregations.entrySet().iterator();
694                    while (it.hasNext()) {
695                        Map.Entry<AggregationKey, Aggregation> e =
696                            it.next();
697                        AggregationKey aggregationKey = e.getKey();
698                        Aggregation aggregation = e.getValue();
699    
700                        if (localAggregationKey.equals(aggregationKey)) {
701    
702                            if (localAggregation.getCreationTimestamp().after(
703                                aggregation.getCreationTimestamp())) {
704                                it.remove();
705                            } else {
706                                // Entry is newer, do not replace
707                                found = true;
708                            }
709                            break;
710                        }
711                    }
712                    if (!found) {
713                        destAggregations.put(localAggregationKey, localAggregation);
714                    }
715                }
716            }
717        }
718    
719        /**
720         * Records global cache requests per thread.
721         */
722        private void recordAggregationRequest(AggregationKey aggregationKey) {
723            if (!localAggregationRequests.get().contains(aggregationKey)) {
724                synchronized(aggregationRequests) {
725                    aggregationRequests.add(aggregationKey);
726                }
727                // Store own request for cleanup afterwards
728                localAggregationRequests.get().add(aggregationKey);
729            }
730        }
731    
732        /**
733         * Checks whether an aggregation is requested by another thread.
734         */
735        private boolean isAggregationRequested(AggregationKey aggregationKey) {
736            synchronized (aggregationRequests) {
737                return aggregationRequests.contains(aggregationKey);
738            }
739        }
740    
741        /**
742         * Clears the aggregation requests created by the current thread.
743         */
744        private void clearAggregationRequests() {
745            synchronized (aggregationRequests) {
746                if (localAggregationRequests.get().isEmpty()) {
747                    return;
748                }
749                // Build a set of requests for efficient probing. Negligible cost
750                // if this thread's localAggregationRequests is small, but avoids a
751                // quadratic algorithm if it is large.
752                Set<AggregationKey> localAggregationRequestSet =
753                    new HashSet<AggregationKey>(localAggregationRequests.get());
754                Iterator<AggregationKey> iter = aggregationRequests.iterator();
755                while (iter.hasNext()) {
756                    AggregationKey aggregationKey = iter.next();
757                    if (localAggregationRequestSet.contains(aggregationKey)) {
758                        iter.remove();
759                        // Make sure that bitKey is not removed more than once:
760                        // other occurrences might exist for other threads.
761                        localAggregationRequestSet.remove(aggregationKey);
762                        if (localAggregationRequestSet.isEmpty()) {
763                            // Nothing further to do
764                            break;
765                        }
766                    }
767                }
768                localAggregationRequests.get().clear();
769            }
770        }
771        
772        /** For testing purposes only.  */
773        public void setDataSource(DataSource dataSource) {
774            this.dataSource = dataSource;
775        }
776    
777        /**
778         * Returns the DataSource used to connect to the underlying DBMS.
779         *
780         * @return DataSource
781         */
782        public DataSource getDataSource() {
783            return dataSource;
784        }
785    
786        /**
787         * Retrieves the {@link RolapStar.Measure} in which a measure is stored.
788         */
789        public static Measure getStarMeasure(Member member) {
790            return (Measure) ((RolapStoredMeasure) member).getStarMeasure();
791        }
792    
793        /**
794         * Retrieves a named column, returns null if not found.
795         */
796        public Column[] lookupColumns(String tableAlias, String columnName) {
797            final Table table = factTable.findDescendant(tableAlias);
798            return (table == null) ? null : table.lookupColumns(columnName);
799        }
800    
801        /**
802         * This is used by TestAggregationManager only.
803         */
804        public Column lookupColumn(String tableAlias, String columnName) {
805            final Table table = factTable.findDescendant(tableAlias);
806            return (table == null) ? null : table.lookupColumn(columnName);
807        }
808    
809        public BitKey getBitKey(String[] tableAlias, String[] columnName) {
810            BitKey bitKey = BitKey.Factory.makeBitKey(getColumnCount());
811            Column starColumn;
812            for (int i = 0; i < tableAlias.length; i ++) {
813                starColumn = lookupColumn(tableAlias[i], columnName[i]);
814                if (starColumn != null) {
815                    bitKey.set(starColumn.getBitPosition());
816                }
817            }
818            return bitKey;
819        }
820        
821        /**
822         * Returns a list of all aliases used in this star.
823         */
824        public List<String> getAliasList() {
825            List<String> aliasList = new ArrayList<String>();
826            if (factTable != null) {
827                collectAliases(aliasList, factTable);
828            }
829            return aliasList;
830        }
831    
832        /**
833         * Finds all of the table aliases in a table and its children.
834         */
835        private static void collectAliases(List<String> aliasList, Table table) {
836            aliasList.add(table.getAlias());
837            for (Table child : table.children) {
838                collectAliases(aliasList, child);
839            }
840        }
841    
842        /**
843         * Collects all columns in this table and its children.
844         * If <code>joinColumn</code> is specified, only considers child tables
845         * joined by the given column.
846         */
847        public static void collectColumns(
848            Collection<Column> columnList,
849            Table table,
850            MondrianDef.Column joinColumn)
851        {
852            if (joinColumn == null) {
853                columnList.addAll(table.columnList);
854            }
855            for (Table child : table.children) {
856                if (joinColumn == null ||
857                    child.getJoinCondition().left.equals(joinColumn)) {
858                    collectColumns(columnList, child, null);
859                }
860            }
861        }
862    
863        private boolean containsColumn(String tableName, String columnName) {
864            Connection jdbcConnection;
865            try {
866                jdbcConnection = dataSource.getConnection();
867            } catch (SQLException e1) {
868                throw Util.newInternal(
869                    e1, "Error while creating connection from data source");
870            }
871            try {
872                final DatabaseMetaData metaData = jdbcConnection.getMetaData();
873                final ResultSet columns =
874                    metaData.getColumns(null, null, tableName, columnName);
875                return columns.next();
876            } catch (SQLException e) {
877                throw Util.newInternal("Error while retrieving metadata for table '" +
878                                tableName + "', column '" + columnName + "'");
879            } finally {
880                try {
881                    jdbcConnection.close();
882                } catch (SQLException e) {
883                    // ignore
884                }
885            }
886        }
887    
888        public RolapSchema getSchema() {
889            return schema;
890        }
891    
892        /**
893         * Generates a SQL statement to read all instances of the given attributes.
894         *
895         * <p>The SQL statement is of the form {@code SELECT ... FROM ... JOIN ...
896         * GROUP BY ...}. It is useful for populating an aggregate table.
897         *
898         * @param columnList List of columns (attributes and measures)
899         * @param columnNameList List of column names (must have same cardinality
900         *     as {@code columnList})
901         * @return SQL SELECT statement
902         */
903        public String generateSql(
904            List<Column> columnList,
905            List<String> columnNameList)
906        {
907            final SqlQuery query = new SqlQuery(sqlQueryDialect, true);
908            query.addFrom(
909                factTable.relation,
910                factTable.relation.getAlias(),
911                false);
912            int k = -1;
913            for (Column column : columnList) {
914                ++k;
915                column.table.addToFrom(query,  false, true);
916                String columnExpr = column.generateExprString(query);
917                if (column instanceof Measure) {
918                    Measure measure = (Measure) column;
919                    columnExpr = measure.getAggregator().getExpression(columnExpr);
920                }
921                final String columnName = columnNameList.get(k);
922                query.addSelect(columnExpr, columnName);
923                if (!(column instanceof Measure)) {
924                    query.addGroupBy(columnExpr);
925                }
926            }
927            // remove whitespace from query - in particular, the trailing newline
928            return query.toString().trim();
929        }
930    
931        public String toString() {
932            StringWriter sw = new StringWriter(256);
933            PrintWriter pw = new PrintWriter(sw);
934            print(pw, "", true);
935            pw.flush();
936            return sw.toString();
937        }
938    
939        /**
940         * Prints the state of this <code>RolapStar</code>
941         *
942         * @param pw Writer
943         * @param prefix Prefix to print at the start of each line
944         * @param structure Whether to print the structure of the star
945         */
946        public void print(PrintWriter pw, String prefix, boolean structure) {
947            if (structure) {
948                pw.print(prefix);
949                pw.println("RolapStar:");
950                String subprefix = prefix + "  ";
951                factTable.print(pw, subprefix);
952    
953                for (AggStar aggStar : getAggStars()) {
954                    aggStar.print(pw, subprefix);
955                }
956            }
957    
958            List<Aggregation> aggregationList =
959                new ArrayList<Aggregation>(sharedAggregations.values());
960            Collections.sort(
961                aggregationList,
962                new Comparator<Aggregation>() {
963                    public int compare(Aggregation o1, Aggregation o2) {
964                        return o1.getConstrainedColumnsBitKey().compareTo(
965                            o2.getConstrainedColumnsBitKey());
966                    }
967                }
968            );
969    
970            for (Aggregation aggregation : aggregationList) {
971                aggregation.print(pw);
972            }
973        }
974    
975        /**
976         * Flushes the contents of a given region of cells from this star.
977         *
978         * @param cacheControl Cache control API
979         * @param region Predicate defining a region of cells
980         */
981        public void flush(
982            CacheControl cacheControl,
983            CacheControl.CellRegion region)
984        {
985            // Translate the region into a set of (column, value) constraints.
986            final RolapCacheRegion cacheRegion =
987                RolapAggregationManager.makeCacheRegion(this, region);
988            for (Aggregation aggregation : sharedAggregations.values()) {
989                aggregation.flush(cacheControl, cacheRegion);
990            }
991        }
992    
993    
994        /**
995         * Returns the listener for changes to this star's underlying database.
996         *
997         * @return Returns the Data source change listener.
998         */
999        public DataSourceChangeListener getChangeListener() {
1000            return changeListener;
1001        }
1002    
1003        /**
1004         * Sets the listener for changes to this star's underlying database.
1005         *
1006         * @param changeListener The Data source change listener to set
1007         */
1008        public void setChangeListener(DataSourceChangeListener changeListener) {
1009            this.changeListener = changeListener;
1010        }
1011    
1012        // -- Inner classes --------------------------------------------------------
1013    
1014        /**
1015         * A column in a star schema.
1016         */
1017        public static class Column {
1018            private final Table table;
1019            private final MondrianDef.Expression expression;
1020            private final SqlQuery.Datatype datatype;
1021            private final String name;
1022            /**
1023             * When a Column is a column, and not a Measure, the parent column
1024             * is the coloumn associated with next highest Level.
1025             */
1026            private final Column parentColumn;
1027    
1028            /**
1029             * This is used during both aggregate table recognition and aggregate
1030             * table generation. For multiple dimension usages, multiple shared
1031             * dimension or unshared dimension with the same column names,
1032             * this is used to disambiguate aggregate column names.
1033             */
1034            private final String usagePrefix;
1035            /**
1036             * This is only used in RolapAggregationManager and adds
1037             * non-constraining columns making the drill-through queries easier for
1038             * humans to understand.
1039             */
1040            private final Column nameColumn;
1041            private boolean isNameColumn;
1042    
1043            /** this has a unique value per star */
1044            private final int bitPosition;
1045    
1046            private int cardinality = -1;
1047    
1048            private Column(
1049                String name,
1050                Table table,
1051                MondrianDef.Expression expression,
1052                SqlQuery.Datatype datatype)
1053            {
1054                this(name, table, expression, datatype, null, null, null);
1055            }
1056    
1057            private Column(
1058                String name,
1059                Table table,
1060                MondrianDef.Expression expression,
1061                SqlQuery.Datatype datatype,
1062                Column nameColumn,
1063                Column parentColumn,
1064                String usagePrefix)
1065            {
1066                this.name = name;
1067                this.table = table;
1068                this.expression = expression;
1069                this.datatype = datatype;
1070                this.bitPosition = table.star.nextColumnCount();
1071                this.nameColumn = nameColumn;
1072                this.parentColumn = parentColumn;
1073                this.usagePrefix = usagePrefix;
1074                if (nameColumn != null) {
1075                    nameColumn.isNameColumn = true;
1076                }
1077            }
1078    
1079            /**
1080             * Fake column.
1081             *
1082             * @param datatype Datatype
1083             */
1084            protected Column(SqlQuery.Datatype datatype)
1085            {
1086                this.table = null;
1087                this.expression = null;
1088                this.datatype = datatype;
1089                this.name = null;
1090                this.parentColumn = null;
1091                this.nameColumn = null;
1092                this.usagePrefix = null;
1093                this.bitPosition = 0;
1094            }
1095    
1096            public boolean equals(Object obj) {
1097                if (! (obj instanceof RolapStar.Column)) {
1098                    return false;
1099                }
1100                RolapStar.Column other = (RolapStar.Column) obj;
1101                // Note: both columns have to be from the same table
1102                return (other.table == this.table) &&
1103                       other.expression.equals(this.expression) &&
1104                       (other.datatype == this.datatype) &&
1105                       other.name.equals(this.name);
1106            }
1107    
1108            public int hashCode() {
1109                int h = name.hashCode();
1110                h = Util.hash(h, table);
1111                return h;
1112            }
1113    
1114            public String getName() {
1115                return name;
1116            }
1117    
1118            public int getBitPosition() {
1119                return bitPosition;
1120            }
1121    
1122            public RolapStar getStar() {
1123                return table.star;
1124            }
1125    
1126            public RolapStar.Table getTable() {
1127                return table;
1128            }
1129    
1130            public SqlQuery getSqlQuery() {
1131                return getTable().getStar().getSqlQuery();
1132            }
1133    
1134            public RolapStar.Column getNameColumn() {
1135                return nameColumn;
1136            }
1137    
1138            public RolapStar.Column getParentColumn() {
1139                return parentColumn;
1140            }
1141    
1142            public String getUsagePrefix() {
1143                return usagePrefix;
1144            }
1145    
1146            public boolean isNameColumn() {
1147                return isNameColumn;
1148            }
1149    
1150            public MondrianDef.Expression getExpression() {
1151                return expression;
1152            }
1153    
1154            /**
1155             * Generates a SQL expression, which typically this looks like
1156             * this: <code><i>tableName</i>.<i>columnName</i></code>.
1157             */
1158            public String generateExprString(SqlQuery query) {
1159                return getExpression().getExpression(query);
1160            }
1161    
1162            /**
1163             * Get column cardinality from the schema cache if possible;
1164             * otherwise issue a select count(distinct) query to retrieve
1165             * the cardinality and stores it in the cache.
1166             *
1167             * @return the column cardinality.
1168             */
1169            public int getCardinality() {
1170                if (cardinality == -1) {
1171                    RolapStar star = getStar();
1172                    RolapSchema schema = star.getSchema();
1173                    Integer card =
1174                        schema.getCachedRelationExprCardinality(
1175                            table.getRelation(),
1176                            expression);
1177    
1178                    if (card != null) {
1179                        cardinality = card.intValue();
1180                    } else {
1181                        // If not cached, issue SQL to get the cardinality for
1182                        // this column.
1183                        cardinality = getCardinality(star.getDataSource());
1184                        schema.putCachedRelationExprCardinality(
1185                            table.getRelation(),
1186                            expression,
1187                            cardinality);
1188                    }
1189                }
1190                return cardinality;
1191            }
1192    
1193            private int getCardinality(DataSource dataSource) {
1194                SqlQuery sqlQuery = getSqlQuery();
1195                if (sqlQuery.getDialect().allowsCountDistinct()) {
1196                    // e.g. "select count(distinct product_id) from product"
1197                    sqlQuery.addSelect("count(distinct "
1198                        + generateExprString(sqlQuery) + ")");
1199    
1200                    // no need to join fact table here
1201                    table.addToFrom(sqlQuery, true, false);
1202                } else if (sqlQuery.getDialect().allowsFromQuery()) {
1203                    // Some databases (e.g. Access) don't like 'count(distinct)',
1204                    // so use, e.g., "select count(*) from (select distinct
1205                    // product_id from product)"
1206                    SqlQuery inner = sqlQuery.cloneEmpty();
1207                    inner.setDistinct(true);
1208                    inner.addSelect(generateExprString(inner));
1209                    boolean failIfExists = true,
1210                        joinToParent = false;
1211                    table.addToFrom(inner, failIfExists, joinToParent);
1212                    sqlQuery.addSelect("count(*)");
1213                    sqlQuery.addFrom(inner, "init", failIfExists);
1214                } else {
1215                    throw Util.newInternal("Cannot compute cardinality: this " +
1216                        "database neither supports COUNT DISTINCT nor SELECT in " +
1217                        "the FROM clause.");
1218                }
1219                String sql = sqlQuery.toString();
1220                final SqlStatement stmt =
1221                    RolapUtil.executeQuery(
1222                        dataSource, sql,
1223                        "RolapStar.Column.getCardinality",
1224                        "while counting distinct values of column '" +
1225                        expression.getGenericExpression());
1226                try {
1227                    ResultSet resultSet = stmt.getResultSet();
1228                    Util.assertTrue(resultSet.next());
1229                    ++stmt.rowCount;
1230                    return resultSet.getInt(1);
1231                } catch (SQLException e) {
1232                    throw stmt.handle(e);
1233                } finally {
1234                    stmt.close();
1235                }
1236            }
1237    
1238            /**
1239             * Generates a predicate that a column matches one of a list of values.
1240             *
1241             * <p>
1242             * Several possible outputs, depending upon whether the there are
1243             * nulls:<ul>
1244             *
1245             * <li>One not-null value: <code>foo.bar = 1</code>
1246             *
1247             * <li>All values not null: <code>foo.bar in (1, 2, 3)</code></li
1248             *
1249             * <li>Null and not null values:
1250             * <code>(foo.bar is null or foo.bar in (1, 2))</code></li>
1251             *
1252             * <li>Only null values:
1253             * <code>foo.bar is null</code></li>
1254             *
1255             * <li>String values: <code>foo.bar in ('a', 'b', 'c')</code></li>
1256             *
1257             * </ul>
1258             */
1259            public static String createInExpr(
1260                final String expr,
1261                StarColumnPredicate predicate,
1262                SqlQuery.Datatype datatype,
1263                SqlQuery sqlQuery)
1264            {
1265                // Sometimes a column predicate is created without a column. This
1266                // is unfortunate, and we will fix it some day. For now, create
1267                // a fake column with all of the information needed by the toSql
1268                // method, and a copy of the predicate wrapping that fake column.
1269                if (!Bug.Bug1767775Fixed ||
1270                    !Bug.Bug1767779Fixed && predicate.getConstrainedColumn() == null)
1271                {
1272                    Column column = new Column(datatype) {
1273                        public String generateExprString(SqlQuery query) {
1274                            return expr;
1275                        }
1276                    };
1277                    predicate = predicate.cloneWithColumn(column);
1278                }
1279    
1280                StringBuilder buf = new StringBuilder(64);
1281                predicate.toSql(sqlQuery, buf);
1282                return buf.toString();
1283            }
1284    
1285            public String toString() {
1286                StringWriter sw = new StringWriter(256);
1287                PrintWriter pw = new PrintWriter(sw);
1288                print(pw, "");
1289                pw.flush();
1290                return sw.toString();
1291            }
1292    
1293            /**
1294             * Prints this column.
1295             *
1296             * @param pw Print writer
1297             * @param prefix Prefix to print first, such as spaces for indentation
1298             */
1299            public void print(PrintWriter pw, String prefix) {
1300                SqlQuery sqlQuery = getSqlQuery();
1301                pw.print(prefix);
1302                pw.print(getName());
1303                pw.print(" (");
1304                pw.print(getBitPosition());
1305                pw.print("): ");
1306                pw.print(generateExprString(sqlQuery));
1307            }
1308    
1309            public SqlQuery.Datatype getDatatype() {
1310                return datatype;
1311            }
1312    
1313            /**
1314             * Returns a string representation of the datatype of this column, in
1315             * the dialect specified. For example, 'DECIMAL(10, 2) NOT NULL'.
1316             *
1317             * @param dialect Dialect
1318             * @return String representation of column's datatype
1319             */
1320            public String getDatatypeString(SqlQuery.Dialect dialect) {
1321                final SqlQuery query = new SqlQuery(dialect);
1322                query.addFrom(
1323                    table.star.factTable.relation, table.star.factTable.alias,
1324                    false);
1325                query.addFrom(table.relation, table.alias, false);
1326                query.addSelect(expression.getExpression(query));
1327                final String sql = query.toString();
1328                Connection jdbcConnection = null;
1329                try {
1330                    jdbcConnection = table.star.dataSource.getConnection();
1331                    final PreparedStatement pstmt =
1332                        jdbcConnection.prepareStatement(sql);
1333                    final ResultSetMetaData resultSetMetaData =
1334                        pstmt.getMetaData();
1335                    assert resultSetMetaData.getColumnCount() == 1;
1336                    final String type = resultSetMetaData.getColumnTypeName(1);
1337                    int precision = resultSetMetaData.getPrecision(1);
1338                    final int scale = resultSetMetaData.getScale(1);
1339                    if (type.equals("DOUBLE")) {
1340                        precision = 0;
1341                    }
1342                    String typeString;
1343                    if (precision == 0) {
1344                        typeString = type;
1345                    } else if (scale == 0) {
1346                        typeString = type + "(" + precision + ")";
1347                    } else {
1348                        typeString = type + "(" + precision + ", " + scale + ")";
1349                    }
1350                    pstmt.close();
1351                    jdbcConnection.close();
1352                    jdbcConnection = null;
1353                    return typeString;
1354                } catch (SQLException e) {
1355                    throw Util.newError(
1356                        e,
1357                        "Error while deriving type of column " + toString());
1358                } finally {
1359                    if (jdbcConnection != null) {
1360                        try {
1361                            jdbcConnection.close();
1362                        } catch (SQLException e) {
1363                            // ignore
1364                        }
1365                    }
1366                }
1367            }
1368        }
1369    
1370        /**
1371         * Definition of a measure in a star schema.
1372         *
1373         * <p>A measure is basically just a column; except that its
1374         * {@link #aggregator} defines how it is to be rolled up.
1375         */
1376        public static class Measure extends Column {
1377            private final String cubeName;
1378            private final RolapAggregator aggregator;
1379    
1380            public Measure(
1381                String name,
1382                String cubeName,
1383                RolapAggregator aggregator,
1384                Table table,
1385                MondrianDef.Expression expression,
1386                SqlQuery.Datatype datatype)
1387            {
1388                super(name, table, expression, datatype);
1389                this.cubeName = cubeName;
1390                this.aggregator = aggregator;
1391            }
1392    
1393            public RolapAggregator getAggregator() {
1394                return aggregator;
1395            }
1396