1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
use crate::{pagecache::*, *};
/// A pending log reservation which can be aborted or completed.
/// NB the holder should quickly call `complete` or `abort` as
/// taking too long to decide will cause the underlying IO
/// buffer to become blocked.
#[derive(Debug)]
pub struct Reservation<'a> {
pub(super) log: &'a Log,
pub(super) iobuf: Arc<IoBuf>,
pub(super) buf: &'a mut [u8],
pub(super) flushed: bool,
pub(super) pointer: DiskPtr,
pub(super) lsn: Lsn,
pub(super) is_blob_rewrite: bool,
pub(super) header_len: usize,
}
impl<'a> Drop for Reservation<'a> {
fn drop(&mut self) {
// We auto-abort if the user never uses a reservation.
if !self.flushed {
if let Err(e) = self.flush(false) {
self.log.config.set_global_error(e);
}
}
}
}
impl<'a> Reservation<'a> {
/// Cancel the reservation, placing a failed flush on disk, returning
/// the (cancelled) log sequence number and file offset.
pub fn abort(mut self) -> Result<(Lsn, DiskPtr)> {
if self.pointer.is_blob() && !self.is_blob_rewrite {
// we don't want to remove this blob if something
// else may still be using it.
trace!(
"removing blob for aborted reservation at lsn {}",
self.pointer
);
remove_blob(self.pointer.blob().1, &self.log.config)?;
}
self.flush(false)
}
/// Complete the reservation, placing the buffer on disk. returns
/// the log sequence number of the write, and the file offset.
pub fn complete(mut self) -> Result<(Lsn, DiskPtr)> {
self.flush(true)
}
/// Get the log sequence number for this update.
pub const fn lsn(&self) -> Lsn {
self.lsn
}
/// Get the underlying storage location for the written value.
/// Note that an blob write still has a pointer in the
/// log at the provided lid location.
pub const fn pointer(&self) -> DiskPtr {
self.pointer
}
/// Returns the length of the on-log reservation.
pub(crate) fn reservation_len(&self) -> usize {
self.buf.len()
}
/// Refills the reservation buffer with new data.
/// Must supply a buffer of an identical length
/// as the one initially provided. Don't use this
/// on messages subject to compression etc...
///
/// # Panics
///
/// Will panic if the reservation is not the correct
/// size to hold a serialized Lsn.
#[doc(hidden)]
pub fn mark_writebatch(self, peg_lsn: Lsn) -> Result<(Lsn, DiskPtr)> {
trace!(
"writing batch required stable lsn {} into \
BatchManifest at lid {} peg_lsn {}",
peg_lsn,
self.pointer.lid(),
self.lsn
);
if self.lsn == peg_lsn {
// this can happen because high-level tree updates
// may result in no work happening.
self.abort()
} else {
self.buf[4] = MessageKind::BatchManifest.into();
let buf = lsn_to_arr(peg_lsn);
let dst = &mut self.buf[self.header_len..];
dst.copy_from_slice(&buf);
let mut intervals = self.log.iobufs.intervals.lock();
intervals.mark_batch((self.lsn, peg_lsn));
drop(intervals);
self.complete()
}
}
fn flush(&mut self, valid: bool) -> Result<(Lsn, DiskPtr)> {
if self.flushed {
panic!("flushing already-flushed reservation!");
}
self.flushed = true;
if !valid {
// don't actually zero the message, still check its hash
// on recovery to find corruption.
self.buf[4] = MessageKind::Canceled.into();
}
let crc32 = calculate_message_crc32(
self.buf[..self.header_len].as_ref(),
&self.buf[self.header_len..],
);
let crc32_arr = u32_to_arr(crc32);
#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
crc32_arr.as_ptr(),
self.buf.as_mut_ptr(),
std::mem::size_of::<u32>(),
);
}
self.log.exit_reservation(&self.iobuf)?;
Ok((self.lsn(), self.pointer()))
}
}