diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 985369289e..208b6837fd 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -42,6 +42,7 @@ import com.netflix.hollow.core.util.HollowObjectHashCodeFinder; import com.netflix.hollow.core.util.HollowWriteStateCreator; import com.netflix.hollow.core.write.HollowBlobWriter; +import com.netflix.hollow.core.write.HollowTypeWriteState; import com.netflix.hollow.core.write.HollowWriteStateEngine; import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper; import com.netflix.hollow.core.write.objectmapper.RecordPrimaryKey; @@ -50,8 +51,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -839,7 +843,12 @@ private void announce(ProducerListeners listeners, HollowProducer.ReadState read "Fail the announcement because current producer is not primary (aka leader)"); throw new HollowProducer.NotPrimaryMidCycleException("Announcement failed primary (aka leader) check"); } - announcer.announce(readState.getVersion(), readState.getStateEngine().getHeaderTags()); + + Map announceTags = new HashMap(); + announceTags.putAll(changeCounts(getWriteEngine())); // change counts + announceTags.putAll(readState.getStateEngine().getHeaderTags()); // blob headers + + announcer.announce(readState.getVersion(), announceTags); } finally { singleProducerEnforcer.unlock(); } @@ -853,6 +862,35 @@ private void announce(ProducerListeners listeners, HollowProducer.ReadState read } } + public Map changeCounts(HollowWriteStateEngine writeStateEngine) { + OptionalLong totalAdded = OptionalLong.empty(); + OptionalLong totalRemoved = OptionalLong.empty(); + Map changeCounts = new HashMap(); + + for (HollowTypeWriteState typeState : writeStateEngine.getOrderedTypeStates()) { + if (typeState.getDeltaAddedOrdinalCount().isPresent()) { + int added = typeState.getDeltaAddedOrdinalCount().getAsInt(); + changeCounts.put("hollow.ordinals.added.type." + typeState.getSchema().getName(), + String.valueOf(added)); + totalAdded = OptionalLong.of(totalAdded.isPresent() + ? (totalAdded.getAsLong() + added) + : added); + } + if (typeState.getDeltaRemovedOrdinalCount().isPresent()) { + int removed = typeState.getDeltaRemovedOrdinalCount().getAsInt(); + changeCounts.put("hollow.ordinals.removed.type." + typeState.getSchema().getName(), + String.valueOf(removed)); + totalRemoved = OptionalLong.of(totalRemoved.isPresent() + ? (totalRemoved.getAsLong() + removed) + : removed); + } + } + totalAdded.ifPresent(v -> changeCounts.put("hollow.ordinals.added.total", String.valueOf(v))); + totalRemoved.ifPresent(v -> changeCounts.put("hollow.ordinals.removed.total", String.valueOf(v))); + + return changeCounts; + } + static final class Artifacts { HollowProducer.Blob snapshot = null; HollowProducer.Blob delta = null; diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java index 649e630d02..550e5174f8 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java @@ -16,6 +16,8 @@ */ package com.netflix.hollow.core.write; +import com.netflix.hollow.api.consumer.HollowConsumer; +import com.netflix.hollow.api.consumer.HollowConsumer.Blob.BlobType; import com.netflix.hollow.core.memory.ByteData; import com.netflix.hollow.core.memory.ByteDataArray; import com.netflix.hollow.core.memory.ThreadSafeBitSet; @@ -25,6 +27,7 @@ import com.netflix.hollow.core.schema.HollowListSchema; import java.io.DataOutputStream; import java.io.IOException; +import java.util.OptionalInt; public class HollowListTypeWriteState extends HollowTypeWriteState { @@ -232,7 +235,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE @Override public void calculateDelta() { - calculateDelta(previousCyclePopulated, currentCyclePopulated); + calculateDelta(previousCyclePopulated, currentCyclePopulated, BlobType.DELTA); } @Override @@ -242,7 +245,7 @@ public void writeDelta(DataOutputStream dos) throws IOException { @Override public void calculateReverseDelta() { - calculateDelta(currentCyclePopulated, previousCyclePopulated); + calculateDelta(currentCyclePopulated, previousCyclePopulated, BlobType.REVERSE_DELTA); } @Override @@ -250,7 +253,7 @@ public void writeReverseDelta(DataOutputStream dos) throws IOException { writeCalculatedDelta(dos); } - private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) { + private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, BlobType blobType) { maxOrdinal = ordinalMap.maxOrdinal(); numListsInDelta = new int[numShards]; @@ -260,7 +263,7 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe elementArray = new FixedLengthElementArray[numShards]; deltaAddedOrdinals = new ByteDataArray[numShards]; deltaRemovedOrdinals = new ByteDataArray[numShards]; - + ThreadSafeBitSet deltaAdditions = toCyclePopulated.andNot(fromCyclePopulated); int shardMask = numShards - 1; @@ -287,6 +290,8 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe long elementCounter[] = new long[numShards]; int previousRemovedOrdinal[] = new int[numShards]; int previousAddedOrdinal[] = new int[numShards]; + int addedCount = 0; + int removedCount = 0; for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) { int shardNumber = ordinal & shardMask; @@ -309,13 +314,20 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe int shardOrdinal = ordinal / numShards; VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]); + addedCount++; previousAddedOrdinal[shardNumber] = shardOrdinal; } else if(fromCyclePopulated.get(ordinal) && !toCyclePopulated.get(ordinal)) { int shardOrdinal = ordinal / numShards; VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]); + removedCount++; previousRemovedOrdinal[shardNumber] = shardOrdinal; } } + + if (blobType.equals(HollowConsumer.Blob.BlobType.DELTA)) { + deltaAddedOrdinalCount = OptionalInt.of(addedCount); + deltaRemovedOrdinalCount = OptionalInt.of(removedCount); + } } private void writeCalculatedDelta(DataOutputStream os) throws IOException { diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowMapTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowMapTypeWriteState.java index 1d2c814221..96ffc035db 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowMapTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowMapTypeWriteState.java @@ -16,6 +16,8 @@ */ package com.netflix.hollow.core.write; +import com.netflix.hollow.api.consumer.HollowConsumer; +import com.netflix.hollow.api.consumer.HollowConsumer.Blob.BlobType; import com.netflix.hollow.core.memory.ByteData; import com.netflix.hollow.core.memory.ByteDataArray; import com.netflix.hollow.core.memory.ThreadSafeBitSet; @@ -26,6 +28,7 @@ import com.netflix.hollow.core.schema.HollowMapSchema; import java.io.DataOutputStream; import java.io.IOException; +import java.util.OptionalInt; public class HollowMapTypeWriteState extends HollowTypeWriteState { @@ -317,7 +320,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE @Override public void calculateDelta() { - calculateDelta(previousCyclePopulated, currentCyclePopulated); + calculateDelta(previousCyclePopulated, currentCyclePopulated, BlobType.DELTA); } @Override @@ -327,7 +330,7 @@ public void writeDelta(DataOutputStream dos) throws IOException { @Override public void calculateReverseDelta() { - calculateDelta(currentCyclePopulated, previousCyclePopulated); + calculateDelta(currentCyclePopulated, previousCyclePopulated, BlobType.REVERSE_DELTA); } @Override @@ -335,7 +338,7 @@ public void writeReverseDelta(DataOutputStream dos) throws IOException { writeCalculatedDelta(dos); } - private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) { + private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, BlobType blobType) { maxOrdinal = ordinalMap.maxOrdinal(); int bitsPerMapFixedLengthPortion = bitsPerMapSizeValue + bitsPerMapPointer; int bitsPerMapEntry = bitsPerKeyElement + bitsPerValueElement; @@ -346,7 +349,7 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe entryData = new FixedLengthElementArray[numShards]; deltaAddedOrdinals = new ByteDataArray[numShards]; deltaRemovedOrdinals = new ByteDataArray[numShards]; - + ThreadSafeBitSet deltaAdditions = toCyclePopulated.andNot(fromCyclePopulated); int shardMask = numShards - 1; @@ -380,6 +383,8 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe if(getSchema().getHashKey() != null) primaryKeyHasher = new HollowWriteStateEnginePrimaryKeyHasher(getSchema().getHashKey(), getStateEngine()); + int addedCount = 0; + int removedCount = 0; for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) { int shardNumber = ordinal & shardMask; if(deltaAdditions.get(ordinal)) { @@ -430,13 +435,20 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe int shardOrdinal = ordinal / numShards; VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]); + addedCount++; previousAddedOrdinal[shardNumber] = shardOrdinal; } else if(fromCyclePopulated.get(ordinal) && !toCyclePopulated.get(ordinal)) { int shardOrdinal = ordinal / numShards; VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]); + removedCount++; previousRemovedOrdinal[shardNumber] = shardOrdinal; } } + + if (blobType.equals(HollowConsumer.Blob.BlobType.DELTA)) { + deltaAddedOrdinalCount = OptionalInt.of(addedCount); + deltaRemovedOrdinalCount = OptionalInt.of(removedCount); + } } private void writeCalculatedDelta(DataOutputStream os) throws IOException { diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java index 244295a559..b0b3c02b08 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java @@ -16,6 +16,8 @@ */ package com.netflix.hollow.core.write; +import com.netflix.hollow.api.consumer.HollowConsumer; +import com.netflix.hollow.api.consumer.HollowConsumer.Blob.BlobType; import com.netflix.hollow.core.memory.ByteData; import com.netflix.hollow.core.memory.ByteDataArray; import com.netflix.hollow.core.memory.ThreadSafeBitSet; @@ -26,6 +28,7 @@ import com.netflix.hollow.core.schema.HollowObjectSchema.FieldType; import java.io.DataOutputStream; import java.io.IOException; +import java.util.OptionalInt; public class HollowObjectTypeWriteState extends HollowTypeWriteState { @@ -237,7 +240,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE @Override public void calculateDelta() { - calculateDelta(previousCyclePopulated, currentCyclePopulated); + calculateDelta(previousCyclePopulated, currentCyclePopulated, BlobType.DELTA); } @Override @@ -247,7 +250,7 @@ public void writeDelta(DataOutputStream dos) throws IOException { @Override public void calculateReverseDelta() { - calculateDelta(currentCyclePopulated, previousCyclePopulated); + calculateDelta(currentCyclePopulated, previousCyclePopulated, BlobType.REVERSE_DELTA); } @Override @@ -255,7 +258,7 @@ public void writeReverseDelta(DataOutputStream dos) throws IOException { writeCalculatedDelta(dos); } - private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) { + private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, BlobType blobType) { maxOrdinal = ordinalMap.maxOrdinal(); int numBitsPerRecord = fieldStats.getNumBitsPerRecord(); @@ -267,9 +270,8 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe varLengthByteArrays = new ByteDataArray[numShards][]; recordBitOffset = new long[numShards]; int numAddedRecordsInShard[] = new int[numShards]; - int shardMask = numShards - 1; - + int addedOrdinal = deltaAdditions.nextSetBit(0); while(addedOrdinal != -1) { numAddedRecordsInShard[addedOrdinal & shardMask]++; @@ -286,6 +288,8 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe int previousRemovedOrdinal[] = new int[numShards]; int previousAddedOrdinal[] = new int[numShards]; + int addedCount = 0; + int removedCount = 0; for(int i=0;i<=maxOrdinal;i++) { int shardNumber = i & shardMask; if(deltaAdditions.get(i)) { @@ -293,13 +297,20 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe recordBitOffset[shardNumber] += numBitsPerRecord; int shardOrdinal = i / numShards; VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]); + addedCount++; previousAddedOrdinal[shardNumber] = shardOrdinal; } else if(fromCyclePopulated.get(i) && !toCyclePopulated.get(i)) { int shardOrdinal = i / numShards; VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]); + removedCount++; previousRemovedOrdinal[shardNumber] = shardOrdinal; } } + + if (blobType.equals(HollowConsumer.Blob.BlobType.DELTA)) { + deltaAddedOrdinalCount = OptionalInt.of(addedCount); + deltaRemovedOrdinalCount = OptionalInt.of(removedCount); + } } private void writeCalculatedDelta(DataOutputStream os) throws IOException { diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowSetTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowSetTypeWriteState.java index a8e630ecb1..e76b4a9ba5 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowSetTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowSetTypeWriteState.java @@ -16,6 +16,9 @@ */ package com.netflix.hollow.core.write; +import static com.netflix.hollow.api.consumer.HollowConsumer.Blob.BlobType; + +import com.netflix.hollow.api.consumer.HollowConsumer; import com.netflix.hollow.core.memory.ByteData; import com.netflix.hollow.core.memory.ByteDataArray; import com.netflix.hollow.core.memory.ThreadSafeBitSet; @@ -26,6 +29,7 @@ import com.netflix.hollow.core.schema.HollowSetSchema; import java.io.DataOutputStream; import java.io.IOException; +import java.util.OptionalInt; public class HollowSetTypeWriteState extends HollowTypeWriteState { @@ -290,7 +294,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE @Override public void calculateDelta() { - calculateDelta(previousCyclePopulated, currentCyclePopulated); + calculateDelta(previousCyclePopulated, currentCyclePopulated, BlobType.DELTA); } @Override @@ -300,7 +304,7 @@ public void writeDelta(DataOutputStream dos) throws IOException { @Override public void calculateReverseDelta() { - calculateDelta(currentCyclePopulated, previousCyclePopulated); + calculateDelta(currentCyclePopulated, previousCyclePopulated, BlobType.REVERSE_DELTA); } @Override @@ -308,7 +312,7 @@ public void writeReverseDelta(DataOutputStream dos) throws IOException { writeCalculatedDelta(dos); } - public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) { + public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, BlobType blobType) { maxOrdinal = ordinalMap.maxOrdinal(); int bitsPerSetFixedLengthPortion = bitsPerSetSizeValue + bitsPerSetPointer; @@ -352,6 +356,8 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet if(getSchema().getHashKey() != null) primaryKeyHasher = new HollowWriteStateEnginePrimaryKeyHasher(getSchema().getHashKey(), getStateEngine()); + int addedCount = 0; + int removedCount = 0; for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) { int shardNumber = ordinal & shardMask; if(deltaAdditions.get(ordinal)) { @@ -397,13 +403,20 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet int shardOrdinal = ordinal / numShards; VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]); + addedCount++; previousAddedOrdinal[shardNumber] = shardOrdinal; } else if(fromCyclePopulated.get(ordinal) && !toCyclePopulated.get(ordinal)) { int shardOrdinal = ordinal / numShards; VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]); + removedCount++; previousRemovedOrdinal[shardNumber] = shardOrdinal; } } + + if (blobType.equals(HollowConsumer.Blob.BlobType.DELTA)) { + deltaAddedOrdinalCount = OptionalInt.of(addedCount); + deltaRemovedOrdinalCount = OptionalInt.of(removedCount); + } } private void writeCalculatedDelta(DataOutputStream os) throws IOException { diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java index f5d02d6d83..fb4d5bc9eb 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java @@ -32,6 +32,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.BitSet; +import java.util.OptionalInt; /** * The {@link HollowTypeWriteState} contains and is the root handle to all of the records of a specific type in @@ -52,6 +53,18 @@ public abstract class HollowTypeWriteState { protected ThreadSafeBitSet currentCyclePopulated; protected ThreadSafeBitSet previousCyclePopulated; + /// a modification is counted as 1 ordinal added and 1 ordinal removed + protected OptionalInt deltaAddedOrdinalCount = OptionalInt.empty(); + protected OptionalInt deltaRemovedOrdinalCount = OptionalInt.empty(); + + public OptionalInt getDeltaAddedOrdinalCount() { + return deltaAddedOrdinalCount; + } + + public OptionalInt getDeltaRemovedOrdinalCount() { + return deltaRemovedOrdinalCount; + } + private final ThreadLocal serializedScratchSpace; protected HollowWriteStateEngine stateEngine; @@ -262,6 +275,9 @@ public void prepareForNextCycle() { restoredMap = null; restoredSchema = null; restoredReadState = null; + + deltaAddedOrdinalCount = OptionalInt.empty(); + deltaRemovedOrdinalCount = OptionalInt.empty(); } public void prepareForWrite() {