Skip to content

Commit

Permalink
[improve][misc] Replace dependencies on PositionImpl with Position in…
Browse files Browse the repository at this point in the history
…terface (#22891)

Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
lhotari and merlimat authored Jun 13, 2024
1 parent a91a172 commit 411f697
Show file tree
Hide file tree
Showing 151 changed files with 2,214 additions and 1,910 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

/**
* A ManagedCursor is a persisted cursor inside a ManagedLedger.
Expand Down Expand Up @@ -152,7 +151,7 @@ enum IndividualDeletedEntries {
* max position can read
*/
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);
Position maxPosition);


/**
Expand All @@ -165,7 +164,7 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, O
* @param maxPosition max position can read
*/
void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition);
Object ctx, Position maxPosition);

/**
* Asynchronously read entries from the ManagedLedger.
Expand All @@ -178,7 +177,7 @@ void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesC
* @param skipCondition predicate of read filter out
*/
default void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
Object ctx, Position maxPosition, Predicate<Position> skipCondition) {
asyncReadEntries(numberOfEntriesToRead, maxSizeBytes, callback, ctx, maxPosition);
}

Expand Down Expand Up @@ -256,7 +255,7 @@ List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
* max position can read
*/
void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);
Position maxPosition);

/**
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
Expand All @@ -277,7 +276,7 @@ void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callb
* max position can read
*/
void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);
Position maxPosition);

/**
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
Expand All @@ -298,7 +297,7 @@ void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallba
* predicate of read filter out
*/
default void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
Position maxPosition, Predicate<Position> skipCondition) {
asyncReadEntriesOrWait(maxEntries, callback, ctx, maxPosition);
}

Expand All @@ -323,15 +322,15 @@ default void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback
* predicate of read filter out
*/
default void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
Object ctx, Position maxPosition,
Predicate<Position> skipCondition) {
asyncReadEntriesOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition);
}

/**
* Cancel a previously scheduled asyncReadEntriesOrWait operation.
*
* @see #asyncReadEntriesOrWait(int, ReadEntriesCallback, Object, PositionImpl)
* @see #asyncReadEntriesOrWait(int, ReadEntriesCallback, Object, Position)
* @return true if the read operation was canceled or false if there was no pending operation
*/
boolean cancelPendingReadRequest();
Expand Down Expand Up @@ -837,7 +836,7 @@ default void skipNonRecoverableLedger(long ledgerId){}
* Get last individual deleted range.
* @return range
*/
Range<PositionImpl> getLastIndividualDeletedRange();
Range<Position> getLastIndividualDeletedRange();

/**
* Trim delete entries for the given entries.
Expand All @@ -847,7 +846,7 @@ default void skipNonRecoverableLedger(long ledgerId){}
/**
* Get deleted batch indexes list for a batch message.
*/
long[] getDeletedBatchIndexesAsLongArray(PositionImpl position);
long[] getDeletedBatchIndexesAsLongArray(Position position);

/**
* @return the managed cursor stats MBean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger;

import java.util.Optional;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;

Expand All @@ -26,16 +27,108 @@
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
public interface Position {
public interface Position extends Comparable<Position> {
/**
* Get the ledger id of the entry pointed by this position.
*
* @return the ledger id
*/
long getLedgerId();

/**
* Get the entry id of the entry pointed by this position.
*
* @return the entry id
*/
long getEntryId();

/**
* Compare this position with another position.
* The comparison is first based on the ledger id, and then on the entry id.
* This is implements the Comparable interface.
* @param that the other position to be compared.
* @return -1 if this position is less than the other, 0 if they are equal, 1 if this position is greater than
* the other.
*/
default int compareTo(Position that) {
if (getLedgerId() != that.getLedgerId()) {
return Long.compare(getLedgerId(), that.getLedgerId());
}

return Long.compare(getEntryId(), that.getEntryId());
}

/**
* Compare this position with another position based on the ledger id and entry id.
* @param ledgerId the ledger id to compare
* @param entryId the entry id to compare
* @return -1 if this position is less than the other, 0 if they are equal, 1 if this position is greater than
* the other.
*/
default int compareTo(long ledgerId, long entryId) {
if (getLedgerId() != ledgerId) {
return Long.compare(getLedgerId(), ledgerId);
}

return Long.compare(getEntryId(), entryId);
}

/**
* Calculate the hash code for the position based on ledgerId and entryId.
* This is used in Position implementations to implement the hashCode method.
* @return hash code
*/
default int hashCodeForPosition() {
int result = Long.hashCode(getLedgerId());
result = 31 * result + Long.hashCode(getEntryId());
return result;
}

/**
* Get the position of the entry next to this one. The returned position might point to a non-existing, or not-yet
* existing entry
*
* @return the position of the next logical entry
*/
Position getNext();
default Position getNext() {
if (getEntryId() < 0) {
return PositionFactory.create(getLedgerId(), 0);
} else {
return PositionFactory.create(getLedgerId(), getEntryId() + 1);
}
}

long getLedgerId();
/**
* Position after moving entryNum messages,
* if entryNum < 1, then return the current position.
* */
default Position getPositionAfterEntries(int entryNum) {
if (entryNum < 1) {
return this;
}
if (getEntryId() < 0) {
return PositionFactory.create(getLedgerId(), entryNum - 1);
} else {
return PositionFactory.create(getLedgerId(), getEntryId() + entryNum);
}
}

long getEntryId();
/**
* Check if the position implementation has an extension of the given class or interface.
*
* @param extensionClass the class of the extension
* @return true if the position has an extension of the given class, false otherwise
*/
default boolean hasExtension(Class<?> extensionClass) {
return getExtension(extensionClass).isPresent();
}

/**
* Get the extension instance of the given class or interface that is attached to this position.
* If the position does not have an extension of the given class, an empty optional is returned.
* @param extensionClass the class of the extension
*/
default <T> Optional<T> getExtension(Class<T> extensionClass) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;

/**
* Factory for creating {@link Position} instances.
*/
public final class PositionFactory {
/**
* Earliest position.
*/
public static final Position EARLIEST = create(-1, -1);
/**
* Latest position.
*/
public static final Position LATEST = create(Long.MAX_VALUE, Long.MAX_VALUE);

private PositionFactory() {
}

/**
* Create a new position.
*
* @param ledgerId ledger id
* @param entryId entry id
* @return new position
*/
public static Position create(long ledgerId, long entryId) {
return new ImmutablePositionImpl(ledgerId, entryId);
}

/**
* Create a new position.
*
* @param other other position
* @return new position
*/
public static Position create(Position other) {
return new ImmutablePositionImpl(other);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
Expand All @@ -48,7 +47,7 @@ public interface ReadOnlyCursor {
* @see #readEntries(int)
*/
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition);
Object ctx, Position maxPosition);

/**
* Asynchronously read entries from the ManagedLedger.
Expand All @@ -60,7 +59,7 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback,
* @param maxPosition max position can read
*/
void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition);
Object ctx, Position maxPosition);

/**
* Get the read position. This points to the next message to be read from the cursor.
Expand Down Expand Up @@ -116,7 +115,7 @@ Position findNewestMatching(ManagedCursor.FindPositionConstraint constraint, Pre
* @param range the range between two positions
* @return the number of entries in range
*/
long getNumberOfEntries(Range<PositionImpl> range);
long getNumberOfEntries(Range<Position> range);

/**
* Close the cursor and releases the associated resources.
Expand Down
Loading

0 comments on commit 411f697

Please sign in to comment.