This article is a discussion about how we overcame the ‘failed-to-reset-stream’ problem in AWS’ TransferManager while doing backups in Cassandra.
Why would you want to use backups with Cassandra anyway?
Cassandra is designed to be resilient to failure. Nodes can transition between up and down and it will still perform as if it were nobody’s business assuming the number of online replicas is greater than the required consistency level. However, it’s always great to have more safety factors so we (and our clients) can sleep more soundly at night, hence we perform daily external backups of the data stored in our managed clusters.
Enter the TransferManager
We use both AWS and Azure platforms to store backups, but I’ll be focusing on AWS in this discussion. The backups are uploaded to Amazon Simple Storage Service (S3) buckets using AWS’s TransferManager API. The TransferManager can accept an InputStream or a file as its parameter to upload. It automatically decides for you the most ideal way to perform the upload, whether serially in one part or with multipart uploads.
We have found the TransferManager to be quite reliable – it can do multipart uploads concurrently to speed up throughput for really large files (which would be the case for big data) and also offers automatic retries.
The fail to mark and reset problem
There is just one catch – we do not want to let the TransferManager have its own fun uploading as many parts as it possibly can as fast as it can go – this will saturate the instance’s network bandwidth. Cassandra, being a distributed database, requires network bandwidth to perform its gossip amongst other basic read & write operations. We would ideally want the backups to run as a trickling process in the background. It does not have to be fast, it just has to be reliable and complete the backup successfully, because that’s what we set out to do in the first place, which is to improve our chances of a restful sleep!
With that in mind, we actually want to throttle the speed at which the TransferManager can perform its uploads. We do so by passing the Cassandra snapshot files to upload as an InputStream, and we throttle on this stream by rate limiting it using Guava’s RateLimiter class.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public static class RateLimitedInputStream extends InputStream { private final InputStream delegate; final RateLimiter limiter; public RateLimitedInputStream(final InputStream delegate, final RateLimiter limiter) { this.delegate = delegate; this.limiter = limiter; } @Override public int read() throws IOException { limiter.acquire(); return delegate.read(); } @Override public int read(final byte[] b) throws IOException { limiter.acquire(b.length); return delegate.read(b); } @Override public int read(final byte[] b, final int off, final int len) throws IOException { limiter.acquire(len); return delegate.read(b, off, len); } ... |
We wrap the stream with a RateLimitedInputStream which limits the reading of the stream depending on the amount of permits available, and then upload to TransferManager:
1 2 3 4 5 6 7 |
try (final InputStream s = Files.newInputStream(largeFile)) { final InputStream rateLimitedStream = wrapWithRateLimitedStream.apply(s); final Upload upload = transferManager.upload(backupBucket, "TestBucket", rateLimitedStream, new ObjectMetadata() {{ setContentLength(Files.size(largeFile)); }}); upload.waitForCompletion(); |
For the full example check out the gist here. This actually leads to the following problem when tested with large files and intermittent poor connection:
1 2 3 4 |
com.amazonaws.ResetException: Failed to reset the request input stream; If the request involves an input stream, the maximum stream buffer size can be configured via request.getRequestClientOptions().setReadLimit(int) Caused by: java.io.IOException: Resetting to invalid mark at java.io.BufferedInputStream.reset(BufferedInputStream.java:448) at com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106) |
Streams support the concept of ‘mark’ and ‘reset’. Marking the stream at its current position allows us to reset to that position in the future (a stream that can seek forwards and backwards is also described as ‘Seekable’), which is what allows the TransferManager to retry an upload. Our RateLimitedStream is further wrapped by the S3 API in a BufferedInputStream before being uploaded, and this is what the exception seems to be hinting at: Data was read into the buffer and it was marked. The upload proceeded until the marked position was ejected from the buffer. When a loss of connectivity happened and a reset was in order, it naturally couldn’t do so because the buffer no longer contained the position which was marked.
FileInputStream, FileInputStream, FileInputStream
The general recommendation from Amazon was to pass the TransferManager a file or FileInputStream. If the input stream provided to the TransferManager is an instance of a FileInputStream (which is not seekable backwards), it will be wrapped with a ResettableInputStream.
1 2 |
if (is instanceof FileInputStream) return ResettableInputStream.newResettableInputStream((FileInputStream)is); |
Directly before the upload, the stream is checked whether it is mark supported and if so it will be wrapped in a SDKBufferedInputStream. The RateLimitedStream is not an instance of FileInputStream and does not support mark/reset, so it is wrapped in a SDKBufferedInputStream, which has a default buffer size of 128K.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
if (!content.markSupported()) { // try to wrap the content input stream to become // mark-and-resettable for signing and retry purposes. if (content instanceof FileInputStream) { try { // ResettableInputStream supports mark-and-reset without // memory buffering content = new ResettableInputStream((FileInputStream) content); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("For the record; ignore otherwise", e); } } } if (!content.markSupported()) content = new SdkBufferedInputStream(content); |
The FileChannel way to achieve Zen
The solution to the problem is to pass a seekable stream to the TransferManager. FileChannels don’t have the concept of marking or resetting, but they support positioning. Per the docs, it is connected to a file and has a current position with its file which can be modified and queried. We can ‘teach’ it to mark and reset this way, which incidentally is the same way the AWS SDK wraps a non-seekable stream into a seekable one:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
@Override public long skip(final long n) throws IOException { final long position = Math.max(0, Math.min(ch.size(), ch.position() + n)); final long skipped = Math.abs(position - ch.position()); ch.position(position); return skipped; } @Override public synchronized void mark(final int readlimit) { try { markPos = ch.position(); } catch (IOException e) { throw Throwables.propagate(e); } } @Override public synchronized void reset() throws IOException { if (markPos < 0) throw new IOException("Resetting to invalid mark"); ch.position(markPos); } |
There was a GitHub issue which tracked this, and you can see our solution there as well. If your use case forces you into a situation where you need to upload to AWS S3 without being able to use FileInputStreams, this could be a viable solution for you. It allowed us to complete our production Cassandra backups successfully as a silent background process instead of potentially interfering the instance’s available bandwidth.