Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,8 @@ public final boolean isBackup() {
}

@Override
public void updateMemoryStats(Object oldValue, Object newValue) {
public void updateMemoryStats(Object oldValue, Object newValue,
AbstractRegionEntry re) {
// only used by BucketRegion as of now
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1441,14 +1441,14 @@ final void _setValue(RegionEntryContext context, @Unretained final Object val) {
if (rawOldVal instanceof SerializedDiskBuffer) {
synchronized (rawOldVal) {
setValueField(val);
if (context != null) context.updateMemoryStats(rawOldVal, val);
if (context != null) context.updateMemoryStats(rawOldVal, val, this);
((SerializedDiskBuffer)rawOldVal).release();
}
return;
} else if (val instanceof SerializedDiskBuffer) {
synchronized (val) {
setValueField(val);
if (context != null) context.updateMemoryStats(rawOldVal, val);
if (context != null) context.updateMemoryStats(rawOldVal, val, this);
}
return;
}
Expand All @@ -1463,7 +1463,7 @@ final void _setValue(RegionEntryContext context, @Unretained final Object val) {
|| (Token.isRemoved(val) && getValueAsToken() != Token.NOT_A_TOKEN)) {
setValueField(val);
if (!isOffHeap && context != null) {
context.updateMemoryStats(rawOldVal, val);
context.updateMemoryStats(rawOldVal, val, this);
}
}
else {
Expand Down Expand Up @@ -1501,7 +1501,7 @@ final void _setValue(RegionEntryContext context, @Unretained final Object val) {
setContainerInfo(null, val);
}
if (!isOffHeap && context != null) {
context.updateMemoryStats(rawOldVal, val);
context.updateMemoryStats(rawOldVal, val, this);
}
return;
} catch (IllegalAccessException e) {
Expand Down Expand Up @@ -1739,10 +1739,10 @@ public final void setOwner(LocalRegion owner, Object previousOwner) {
// set the context into the value if required
initContextForDiskBuffer(owner, val);
// add for new owner
if (owner != null) owner.updateMemoryStats(null, val);
if (owner != null) owner.updateMemoryStats(null, val, this);
// reduce from previous owner
if (previousOwner instanceof RegionEntryContext) {
((RegionEntryContext)previousOwner).updateMemoryStats(val, null);
((RegionEntryContext)previousOwner).updateMemoryStats(val, null, this);
}
}
final StaticSystemCallbacks sysCb =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -72,6 +77,7 @@
import com.gemstone.gemfire.internal.cache.locks.LockingPolicy.ReadEntryUnderLock;
import com.gemstone.gemfire.internal.cache.locks.ReentrantReadWriteWriteShareLock;
import com.gemstone.gemfire.internal.cache.partitioned.*;
import com.gemstone.gemfire.internal.cache.store.ColumnBatchKey;
import com.gemstone.gemfire.internal.cache.store.SerializedDiskBuffer;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientTombstoneMessage;
Expand Down Expand Up @@ -126,6 +132,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
* Contains size in bytes of the direct byte buffers stored in memory.
*/
private final AtomicLong directBufferBytesInMemory = new AtomicLong();
private final AtomicLong numRowsInColumnTable = new AtomicLong();
private final AtomicLong inProgressSize = new AtomicLong();

public static final ReadEntryUnderLock READ_SER_VALUE = new ReadEntryUnderLock() {
Expand Down Expand Up @@ -2765,6 +2772,7 @@ else if (GemFireCacheImpl.gfxdSystem()) {
prDs.updateMemoryStats(-oldMemValue);
}
this.directBufferBytesInMemory.set(0);
this.numRowsInColumnTable.set(0);
// explicitly clear overflow counters if no diskRegion is present
// (for latter the counters are cleared by DiskRegion.statsClear)
if (getDiskRegion() == null) {
Expand Down Expand Up @@ -2982,9 +2990,16 @@ protected void closeCallbacksExceptListener() {
closeCacheCallback(getEvictionController());
}

public long getSizeInMemory() {
return Math.max(this.bytesInMemory.get(), 0L) +
this.directBufferBytesInMemory.get();
public final long getSizeInMemory() {
return Math.max(this.bytesInMemory.get(), 0L) + getDirectBytesSizeInMemory();
}

public final long getDirectBytesSizeInMemory() {
return this.directBufferBytesInMemory.get();
}

public final long getNumRowsInColumnTable() {
return this.numRowsInColumnTable.get();
}

public long getInProgressSize() {
Expand Down Expand Up @@ -3102,22 +3117,44 @@ void updateBucket2Size(int oldSize, int newSize,
}

@Override
public void updateMemoryStats(final Object oldValue, final Object newValue) {
public void updateMemoryStats(final Object oldValue, final Object newValue,
final AbstractRegionEntry re) {
if (newValue != oldValue) {
int oldValueSize = calcMemSize(oldValue);
int newValueSize = calcMemSize(newValue);
updateBucketMemoryStats(newValueSize - oldValueSize);

if (this.cache.getMemorySize() > 0) {
int directBufferDelta = 0;
if (oldValue instanceof SerializedDiskBuffer) {
directBufferDelta -= ((SerializedDiskBuffer)oldValue).getOffHeapSizeInBytes();
}
if (newValue instanceof SerializedDiskBuffer) {
directBufferDelta += ((SerializedDiskBuffer)newValue).getOffHeapSizeInBytes();
}
if (directBufferDelta != 0) {
this.directBufferBytesInMemory.getAndAdd(directBufferDelta);
// update number of rows in table and off-heap size if applicable
if (re != null) {
int numColumns = this.partitionedRegion.getNumColumns();
if (numColumns > 0) {
Object key = re.getRawKey();
final boolean hasNewOffHeap = this.cache.getMemorySize() > 0;
if (key instanceof ColumnBatchKey) {
ColumnBatchKey batchKey = (ColumnBatchKey)key;
int directBufferDelta = 0;
int numRowsDelta = 0;
if (oldValue instanceof SerializedDiskBuffer) {
SerializedDiskBuffer oldBuffer = (SerializedDiskBuffer)oldValue;
if (hasNewOffHeap) {
directBufferDelta -= oldBuffer.getOffHeapSizeInBytes();
}
numRowsDelta -= batchKey.getColumnBatchRowCount(this, oldBuffer);
}
if (newValue instanceof SerializedDiskBuffer) {
SerializedDiskBuffer newBuffer = (SerializedDiskBuffer)newValue;
if (hasNewOffHeap) {
directBufferDelta += newBuffer.getOffHeapSizeInBytes();
}
numRowsDelta += batchKey.getColumnBatchRowCount(this, newBuffer);
}
if (directBufferDelta != 0) {
this.directBufferBytesInMemory.getAndAdd(directBufferDelta);
}
if (numRowsDelta != 0) {
this.numRowsInColumnTable.getAndAdd(numRowsDelta);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.gemstone.gemfire.internal.cache;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -25,6 +26,7 @@ public ExternalTableMetaData(String entityName,
Object schema,
String tableType,
Object externalStore,
int numColumns,
int columnBatchSize,
int columnMaxDeltaRows,
String compressionCodec,
Expand All @@ -37,6 +39,7 @@ public ExternalTableMetaData(String entityName,
this.schema = schema;
this.tableType = tableType;
this.externalStore = externalStore;
this.numColumns = numColumns;
this.columnBatchSize = columnBatchSize;
this.columnMaxDeltaRows = columnMaxDeltaRows;
this.compressionCodec = compressionCodec;
Expand All @@ -45,23 +48,25 @@ public ExternalTableMetaData(String entityName,
this.dependents = dependents;
this.dataSourcePath = dataSourcePath;
this.driverClass = driverClass;
this.columns = Collections.emptyList();
}

public String entityName;
public Object schema;
public String tableType;
public final String entityName;
public final Object schema;
public final String tableType;
// No type specified as the class is in snappy core
public Object externalStore;
public int columnBatchSize;
public int columnMaxDeltaRows;
public String compressionCodec;
public String baseTable;
public String dml;
public String[] dependents;
public final Object externalStore;
public final int numColumns;
public final int columnBatchSize;
public final int columnMaxDeltaRows;
public final String compressionCodec;
public final String baseTable;
public final String dml;
public final String[] dependents;
public String provider;
public String shortProvider;
public String dataSourcePath;
public String driverClass;
public final String dataSourcePath;
public final String driverClass;
public String viewText;
// columns for metadata queries
public List<Column> columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6587,7 +6587,7 @@ public void beforeReturningOffHeapMemoryToAllocator(long address,
/**
* Fetches hive meta data for Snappy tables.
*/
public ExternalTableMetaData fetchSnappyTablesHiveMetaData(PartitionedRegion region);
public ExternalTableMetaData fetchTableMetaData(PartitionedRegion region);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
import com.gemstone.gemfire.internal.shared.ClientSharedUtils;
import com.gemstone.gemfire.internal.shared.SystemProperties;
import com.gemstone.gemfire.internal.shared.Version;
import com.gemstone.gemfire.internal.size.ReflectionObjectSizer;
Expand Down Expand Up @@ -680,10 +681,10 @@ protected LocalRegion(String regionName, RegionAttributes attrs,
Assert.assertTrue(regionName != null, "regionName must not be null");
this.sharedDataView = buildDataView();
this.regionName = regionName;
this.isInternalColumnTable = regionName.toUpperCase(Locale.ENGLISH)
.endsWith(StoreCallbacks.SHADOW_TABLE_SUFFIX);
this.parentRegion = parentRegion;
this.fullPath = calcFullPath(regionName, parentRegion);
this.isInternalColumnTable = ClientSharedUtils.isColumnTable(
this.fullPath.toUpperCase(Locale.ENGLISH));
// cannot support patterns like "..._/..." due to ambiguity in encoding
// of bucket regions
if (this.fullPath.contains("_/")) {
Expand Down Expand Up @@ -3186,7 +3187,8 @@ public boolean isBackup() {
}

@Override
public void updateMemoryStats(Object oldValue, Object newValue) {
public void updateMemoryStats(Object oldValue, Object newValue,
AbstractRegionEntry re) {
// only used by BucketRegion as of now
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,17 +394,20 @@ public class PartitionedRegion extends LocalRegion implements

private volatile int shutDownAllStatus = RUNNING_MODE;

/** Number of columns if this region is a store for a column table. */
private volatile int numColumns = -1;

/** Maximum size in bytes for ColumnBatches. */
private int columnBatchSize = -1;
private volatile int columnBatchSize = -1;

/** Maximum rows to keep in the delta buffer. */
private int columnMaxDeltaRows = -1;
private volatile int columnMaxDeltaRows = -1;

/** Minimum size for ColumnBatches. */
private int columnMinDeltaRows = SystemProperties.SNAPPY_MIN_COLUMN_DELTA_ROWS;
private volatile int columnMinDeltaRows = SystemProperties.SNAPPY_MIN_COLUMN_DELTA_ROWS;

/** default compression used by the column store */
private String columnCompressionCodec;
private volatile String columnCompressionCodec;

public void setColumnBatchSizes(int size, int maxDeltaRows,
int minDeltaRows) {
Expand All @@ -417,19 +420,27 @@ private void initFromHiveMetaData() {
final GemFireCacheImpl.StaticSystemCallbacks sysCb = GemFireCacheImpl
.getInternalProductCallbacks();
if (sysCb != null) {
ExternalTableMetaData metadata = sysCb.fetchSnappyTablesHiveMetaData(this);
if (this.columnBatchSize == -1) {
this.columnBatchSize = metadata.columnBatchSize;
}
if (this.columnMaxDeltaRows == -1) {
this.columnMaxDeltaRows = metadata.columnMaxDeltaRows;
}
if (this.columnCompressionCodec == null) {
this.columnCompressionCodec = metadata.compressionCodec;
synchronized (this) {
ExternalTableMetaData metadata = sysCb.fetchTableMetaData(this);
if (this.numColumns == -1) {
this.numColumns = metadata.numColumns;
this.columnBatchSize = metadata.columnBatchSize;
this.columnMaxDeltaRows = metadata.columnMaxDeltaRows;
this.columnCompressionCodec = metadata.compressionCodec;
}
}
}
}

public int getNumColumns() {
int numColumns = this.numColumns;
if (numColumns == -1) {
initFromHiveMetaData();
return this.numColumns;
}
return numColumns;
}

public int getColumnBatchSize() {
int columnBatchSize = this.columnBatchSize;
if (columnBatchSize == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ public interface RegionEntryContext extends HasCachePerfStats {
*/
public boolean isBackup();

public void updateMemoryStats(Object oldValue, Object newValue);
public void updateMemoryStats(Object oldValue, Object newValue,
AbstractRegionEntry re);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,23 @@
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.pivotal.gemfirexd.internal.snappy;
package com.gemstone.gemfire.internal.cache.store;

import com.gemstone.gemfire.internal.cache.AbstractRegionEntry;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.lru.Sizeable;
import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator;

/**
* Interface for a key object in the column store.
*/
public interface ColumnBatchKey extends Sizeable {

/**
* Get the number of columns defined for the given
* column table (qualified name).
*/
int getNumColumnsInTable(String columnTableName);
public abstract class ColumnBatchKey implements Sizeable {

/**
* Get the number of rows in this column batch.
* This will return a non-zero result only for the STATS keys while
* for a key of DELETE bitmask it will return negative value
* indicating the delete count.
*/
int getColumnBatchRowCount(BucketRegion bucketRegion, AbstractRegionEntry re,
int numColumnsInTable);
public abstract int getColumnBatchRowCount(BucketRegion bucketRegion,
SerializedDiskBuffer value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ public List<String> getInternalTableSchemas() {
return Collections.emptyList();
}

@Override
public boolean isColumnTable(String qualifiedName) {
return false;
}

@Override
public boolean skipEvictionForEntry(LRUEntry entry) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ Set<Object> createColumnBatch(BucketRegion region, long batchID,

List<String> getInternalTableSchemas();

boolean isColumnTable(String qualifiedName);

boolean skipEvictionForEntry(LRUEntry entry);

int getHashCodeSnappy(Object dvd, int numPartitions);
Expand Down
Loading