Skip to content

Commit a6f63df

Browse files
Introduce BlobStoreRepository CAS Mechanism (#93825)
Only for testing purposes through the `FsRepository` for now and rather simple, but should get the job done and technically be correct for a compliant NFS implementation. Co-authored-by: David Turner <david.turner@elastic.co>
1 parent f8e306e commit a6f63df

File tree

12 files changed

+222
-0
lines changed

12 files changed

+222
-0
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

+15
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,19 @@ public Map<String, BlobContainer> children() throws IOException {
154154
protected String buildKey(String blobName) {
155155
return keyPath + (blobName == null ? "" : blobName);
156156
}
157+
158+
@Override
159+
public long compareAndExchangeRegister(String key, long expected, long updated) {
160+
throw new UnsupportedOperationException(); // TODO
161+
}
162+
163+
@Override
164+
public boolean compareAndSetRegister(String key, long expected, long updated) {
165+
throw new UnsupportedOperationException(); // TODO
166+
}
167+
168+
@Override
169+
public long getRegister(String key) throws IOException {
170+
throw new UnsupportedOperationException(); // TODO
171+
}
157172
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

+15
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,19 @@ private String buildKey(String blobName) {
117117
assert blobName != null;
118118
return path + blobName;
119119
}
120+
121+
@Override
122+
public long compareAndExchangeRegister(String key, long expected, long updated) {
123+
throw new UnsupportedOperationException(); // TODO
124+
}
125+
126+
@Override
127+
public boolean compareAndSetRegister(String key, long expected, long updated) {
128+
throw new UnsupportedOperationException(); // TODO
129+
}
130+
131+
@Override
132+
public long getRegister(String key) throws IOException {
133+
throw new UnsupportedOperationException(); // TODO
134+
}
120135
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

+15
Original file line numberDiff line numberDiff line change
@@ -607,4 +607,19 @@ static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long par
607607
return Tuple.tuple(parts + 1, remaining);
608608
}
609609
}
610+
611+
@Override
612+
public long compareAndExchangeRegister(String key, long expected, long updated) {
613+
throw new UnsupportedOperationException(); // TODO
614+
}
615+
616+
@Override
617+
public boolean compareAndSetRegister(String key, long expected, long updated) {
618+
throw new UnsupportedOperationException(); // TODO
619+
}
620+
621+
@Override
622+
public long getRegister(String key) throws IOException {
623+
throw new UnsupportedOperationException(); // TODO
624+
}
610625
}

modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

+5
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,9 @@ private static InputStream getInputStream(URL url) throws IOException {
147147
}
148148
}
149149

150+
@Override
151+
public long compareAndExchangeRegister(String key, long expected, long updated) {
152+
throw new UnsupportedOperationException("URL repository doesn't support this operation");
153+
}
154+
150155
}

plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

+5
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,9 @@ public synchronized void reset() throws IOException {
316316
});
317317
}
318318
}
319+
320+
@Override
321+
public long compareAndExchangeRegister(String key, long expected, long updated) {
322+
throw new UnsupportedOperationException("HDFS repositories do not support this operation");
323+
}
319324
}

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

+36
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,40 @@ void writeMetadataBlob(String blobName, boolean failIfAlreadyExists, boolean ato
194194
* @throws IOException if there were any failures in reading from the blob container.
195195
*/
196196
Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) throws IOException;
197+
198+
/**
199+
* Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}.
200+
* Keys not yet used start at initial value 0. Returns the current value (before it was updated).
201+
*
202+
* @param key key of the value to update
203+
* @param expected the expected value
204+
* @param updated the new value
205+
* @return the value read from the register (before it was updated)
206+
*/
207+
long compareAndExchangeRegister(String key, long expected, long updated) throws IOException;
208+
209+
/**
210+
* Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}.
211+
* Keys not yet used start at initial value 0.
212+
*
213+
* @param key key of the value to update
214+
* @param expected the expected value
215+
* @param updated the new value
216+
* @return true if successful, false if the expected value did not match the updated value
217+
*/
218+
default boolean compareAndSetRegister(String key, long expected, long updated) throws IOException {
219+
return compareAndExchangeRegister(key, expected, updated) == expected;
220+
}
221+
222+
/**
223+
* Gets the value set by {@link #compareAndSetRegister(String, long, long)} for a given key.
224+
* If a key has not yet been used, the initial value is 0.
225+
*
226+
* @param key key of the value to get
227+
* @return value found
228+
*/
229+
default long getRegister(String key) throws IOException {
230+
return compareAndExchangeRegister(key, 0, 0);
231+
}
232+
197233
}

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

+56
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,21 @@
2020
import org.elasticsearch.common.bytes.BytesReference;
2121
import org.elasticsearch.common.collect.Iterators;
2222
import org.elasticsearch.common.io.Streams;
23+
import org.elasticsearch.common.util.concurrent.KeyedLock;
2324
import org.elasticsearch.core.CheckedConsumer;
2425
import org.elasticsearch.core.IOUtils;
26+
import org.elasticsearch.core.Releasable;
2527
import org.elasticsearch.core.Strings;
28+
import org.elasticsearch.core.SuppressForbidden;
2629

2730
import java.io.FileNotFoundException;
2831
import java.io.IOException;
2932
import java.io.InputStream;
3033
import java.io.OutputStream;
34+
import java.nio.ByteBuffer;
3135
import java.nio.channels.Channels;
36+
import java.nio.channels.FileChannel;
37+
import java.nio.channels.FileLock;
3238
import java.nio.channels.SeekableByteChannel;
3339
import java.nio.file.AccessDeniedException;
3440
import java.nio.file.DirectoryStream;
@@ -369,4 +375,54 @@ public static boolean isTempBlobName(final String blobName) {
369375
private static OutputStream blobOutputStream(Path file) throws IOException {
370376
return Files.newOutputStream(file, StandardOpenOption.CREATE_NEW);
371377
}
378+
379+
private static final KeyedLock<String> registerLocks = new KeyedLock<>();
380+
381+
@Override
382+
@SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly")
383+
public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException {
384+
try (
385+
FileChannel channel = openOrCreateAtomic(path.resolve(key));
386+
FileLock ignored1 = channel.lock();
387+
Releasable ignored2 = registerLocks.acquire(key)
388+
) {
389+
final ByteBuffer buf = ByteBuffer.allocate(Long.BYTES);
390+
final long found;
391+
while (buf.remaining() > 0) {
392+
if (channel.read(buf) == -1) {
393+
break;
394+
}
395+
}
396+
if (buf.position() == 0) {
397+
found = 0L;
398+
} else if (buf.position() == Long.BYTES) {
399+
found = buf.getLong(0);
400+
buf.clear();
401+
if (channel.read(buf) != -1) {
402+
throw new IllegalStateException("Read file of length greater than [" + Long.BYTES + "] for [" + key + "]");
403+
}
404+
} else {
405+
throw new IllegalStateException("Read file of length [" + buf.position() + "] for [" + key + "]");
406+
}
407+
if (found == expected) {
408+
buf.clear().putLong(updated).flip();
409+
while (buf.remaining() > 0) {
410+
channel.write(buf, buf.position());
411+
}
412+
channel.force(true);
413+
}
414+
return found;
415+
}
416+
}
417+
418+
private static FileChannel openOrCreateAtomic(Path path) throws IOException {
419+
try {
420+
if (Files.exists(path) == false) {
421+
return FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
422+
}
423+
} catch (FileAlreadyExistsException e) {
424+
// ok, created concurrently
425+
}
426+
return FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE);
427+
}
372428
}

server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java

+15
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,19 @@ public Map<String, BlobContainer> children() throws IOException {
101101
public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) throws IOException {
102102
return delegate.listBlobsByPrefix(blobNamePrefix);
103103
}
104+
105+
@Override
106+
public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException {
107+
return delegate.compareAndExchangeRegister(key, expected, updated);
108+
}
109+
110+
@Override
111+
public boolean compareAndSetRegister(String key, long expected, long updated) throws IOException {
112+
return delegate.compareAndSetRegister(key, expected, updated);
113+
}
114+
115+
@Override
116+
public long getRegister(String key) throws IOException {
117+
return delegate.getRegister(key);
118+
}
104119
}

server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java

+43
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.lucene.tests.mockfile.FilterSeekableByteChannel;
1212
import org.apache.lucene.tests.util.LuceneTestCase;
1313
import org.elasticsearch.common.blobstore.BlobPath;
14+
import org.elasticsearch.common.bytes.BytesArray;
1415
import org.elasticsearch.common.io.Streams;
1516
import org.elasticsearch.core.IOUtils;
1617
import org.elasticsearch.core.PathUtils;
@@ -94,6 +95,48 @@ public void testIsTempBlobName() {
9495
assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true));
9596
}
9697

98+
public void testCompareAndExchange() throws Exception {
99+
final Path path = PathUtils.get(createTempDir().toString());
100+
final FsBlobContainer container = new FsBlobContainer(
101+
new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false),
102+
BlobPath.EMPTY,
103+
path
104+
);
105+
106+
final String key = randomAlphaOfLength(10);
107+
final AtomicLong expectedValue = new AtomicLong();
108+
109+
for (int i = 0; i < 5; i++) {
110+
switch (between(1, 4)) {
111+
case 1 -> assertEquals(expectedValue.get(), container.getRegister(key));
112+
case 2 -> assertFalse(
113+
container.compareAndSetRegister(key, randomValueOtherThan(expectedValue.get(), ESTestCase::randomLong), randomLong())
114+
);
115+
case 3 -> assertEquals(
116+
expectedValue.get(),
117+
container.compareAndExchangeRegister(
118+
key,
119+
randomValueOtherThan(expectedValue.get(), ESTestCase::randomLong),
120+
randomLong()
121+
)
122+
);
123+
case 4 -> {/* no-op */}
124+
}
125+
126+
final var newValue = randomLong();
127+
if (randomBoolean()) {
128+
assertTrue(container.compareAndSetRegister(key, expectedValue.get(), newValue));
129+
} else {
130+
assertEquals(expectedValue.get(), container.compareAndExchangeRegister(key, expectedValue.get(), newValue));
131+
}
132+
expectedValue.set(newValue);
133+
}
134+
135+
final byte[] corruptContents = new byte[9];
136+
container.writeBlob(key, new BytesArray(corruptContents, 0, randomFrom(1, 7, 9)), false);
137+
expectThrows(IllegalStateException.class, () -> container.compareAndExchangeRegister(key, expectedValue.get(), 0));
138+
}
139+
97140
static class MockFileSystemProvider extends FilterFileSystemProvider {
98141

99142
final Consumer<Long> onRead;

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java

+5
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,11 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) {
228228
throw unsupportedException();
229229
}
230230

231+
@Override
232+
public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException {
233+
throw unsupportedException();
234+
}
235+
231236
private UnsupportedOperationException unsupportedException() {
232237
assert false : "this operation is not supported and should have not be called";
233238
return new UnsupportedOperationException("This operation is not supported");

x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,12 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) throws
537537
blobMetadataByName.keySet().removeIf(s -> s.startsWith(blobNamePrefix) == false);
538538
return blobMetadataByName;
539539
}
540+
541+
@Override
542+
public long compareAndExchangeRegister(String key, long expected, long updated) {
543+
assert false : "should not have been called";
544+
throw new UnsupportedOperationException();
545+
}
540546
}
541547

542548
}

x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,12 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) {
404404
blobMetadataByName.keySet().removeIf(s -> s.startsWith(blobNamePrefix) == false);
405405
return blobMetadataByName;
406406
}
407+
408+
@Override
409+
public long compareAndExchangeRegister(String key, long expected, long updated) {
410+
assert false : "should not have been called";
411+
throw new UnsupportedOperationException();
412+
}
407413
}
408414

409415
}

0 commit comments

Comments
 (0)