use std::{collections::BTreeMap, io};
use super::{
pread_exact_or_eof, read_message, read_segment_header, BasedBuf, DiskPtr,
LogKind, LogOffset, LogRead, Lsn, SegmentHeader, SegmentNumber,
MAX_MSG_HEADER_LEN, SEG_HEADER_LEN,
};
use crate::*;
#[derive(Debug)]
pub struct LogIter {
pub config: RunningConfig,
pub segments: BTreeMap<Lsn, LogOffset>,
pub segment_base: Option<BasedBuf>,
pub max_lsn: Option<Lsn>,
pub cur_lsn: Option<Lsn>,
pub last_stage: bool,
}
impl Iterator for LogIter {
type Item = (LogKind, PageId, Lsn, DiskPtr, u64);
fn next(&mut self) -> Option<Self::Item> {
loop {
let remaining_seg_too_small_for_msg = !valid_entry_offset(
LogOffset::try_from(self.cur_lsn.unwrap_or(0)).unwrap(),
self.config.segment_size,
);
if remaining_seg_too_small_for_msg {
self.segment_base = None;
}
if self.segment_base.is_none() {
if let Err(e) = self.read_segment() {
debug!("unable to load new segment: {:?}", e);
return None;
}
}
let lsn = self.cur_lsn.unwrap();
let _measure = Measure::new(&M.read_segment_message);
if let Some(max_lsn) = self.max_lsn {
if let Some(cur_lsn) = self.cur_lsn {
if cur_lsn > max_lsn {
debug!("hit max_lsn {} in iterator, stopping", max_lsn);
return None;
}
}
}
let segment_base = &self.segment_base.as_ref().unwrap();
let lid = segment_base.offset
+ LogOffset::try_from(lsn % self.config.segment_size as Lsn)
.unwrap();
let expected_segment_number = SegmentNumber(
u64::try_from(lsn).unwrap()
/ u64::try_from(self.config.segment_size).unwrap(),
);
match read_message(
&**segment_base,
lid,
expected_segment_number,
&self.config,
) {
Ok(LogRead::Blob(header, _buf, blob_ptr, inline_len)) => {
trace!("read blob flush in LogIter::next");
self.cur_lsn = Some(lsn + Lsn::from(inline_len));
return Some((
LogKind::from(header.kind),
header.pid,
lsn,
DiskPtr::Blob(lid, blob_ptr),
u64::from(inline_len),
));
}
Ok(LogRead::Inline(header, _buf, inline_len)) => {
trace!(
"read inline flush with header {:?} in LogIter::next",
header,
);
self.cur_lsn = Some(lsn + Lsn::from(inline_len));
return Some((
LogKind::from(header.kind),
header.pid,
lsn,
DiskPtr::Inline(lid),
u64::from(inline_len),
));
}
Ok(LogRead::BatchManifest(last_lsn_in_batch, inline_len)) => {
if let Some(max_lsn) = self.max_lsn {
if last_lsn_in_batch > max_lsn {
debug!(
"cutting recovery short due to torn batch. \
required stable lsn: {} actual max possible lsn: {}",
last_lsn_in_batch,
self.max_lsn.unwrap()
);
return None;
}
}
self.cur_lsn = Some(lsn + Lsn::from(inline_len));
continue;
}
Ok(LogRead::Canceled(inline_len)) => {
trace!("read zeroed in LogIter::next");
self.cur_lsn = Some(lsn + Lsn::from(inline_len));
}
Ok(LogRead::Corrupted) => {
trace!(
"read corrupted msg in LogIter::next as lid {} lsn {}",
lid,
lsn
);
if self.last_stage {
let _taken = self.segment_base.take().unwrap();
continue;
} else {
return None;
}
}
Ok(LogRead::Cap(_segment_number)) => {
trace!("read cap in LogIter::next");
let _taken = self.segment_base.take().unwrap();
continue;
}
Ok(LogRead::DanglingBlob(_, blob_ptr, inline_len)) => {
debug!(
"encountered dangling blob \
pointer at lsn {} ptr {}",
lsn, blob_ptr
);
self.cur_lsn = Some(lsn + Lsn::from(inline_len));
continue;
}
Err(e) => {
debug!(
"failed to read log message at lid {} \
with expected lsn {} during iteration: {}",
lid, lsn, e
);
return None;
}
}
}
}
}
impl LogIter {
fn read_segment(&mut self) -> Result<()> {
let _measure = Measure::new(&M.segment_read);
if self.segments.is_empty() {
return Err(io::Error::new(
io::ErrorKind::Other,
"no segments remaining to iterate over",
)
.into());
}
let first_ref = self.segments.iter().next().unwrap();
let (lsn, offset) = (*first_ref.0, *first_ref.1);
if let Some(max_lsn) = self.max_lsn {
if lsn > max_lsn {
return Err(io::Error::new(
io::ErrorKind::Other,
"next segment is above our configured max_lsn",
)
.into());
}
}
assert!(
lsn + (self.config.segment_size as Lsn)
>= self.cur_lsn.unwrap_or(0),
"caller is responsible for providing segments \
that contain the initial cur_lsn value or higher"
);
trace!(
"LogIter::read_segment lsn: {:?} cur_lsn: {:?}",
lsn,
self.cur_lsn
);
assert!(
lsn + self.config.segment_size as Lsn >= self.cur_lsn.unwrap_or(0)
);
let f = &self.config.file;
let segment_header = read_segment_header(f, offset)?;
if offset % self.config.segment_size as LogOffset != 0 {
debug!("segment offset not divisible by segment length");
return Err(Error::corruption(None));
}
if segment_header.lsn % self.config.segment_size as Lsn != 0 {
debug!(
"expected a segment header lsn that is divisible \
by the segment_size ({}) instead it was {}",
self.config.segment_size, segment_header.lsn
);
return Err(Error::corruption(None));
}
if segment_header.lsn != lsn {
debug!(
"segment header lsn ({}) != expected lsn ({})",
segment_header.lsn, lsn
);
return Err(io::Error::new(
io::ErrorKind::Other,
"encountered torn segment",
)
.into());
}
trace!("read segment header {:?}", segment_header);
let mut buf = vec![0; self.config.segment_size];
let size = pread_exact_or_eof(f, &mut buf, offset)?;
trace!("setting stored segment buffer length to {} after read", size);
buf.truncate(size);
self.cur_lsn = Some(segment_header.lsn + SEG_HEADER_LEN as Lsn);
self.segment_base = Some(BasedBuf { buf, offset });
self.segments.remove(&lsn);
Ok(())
}
}
fn valid_entry_offset(lid: LogOffset, segment_len: usize) -> bool {
let seg_start = lid / segment_len as LogOffset * segment_len as LogOffset;
let max_lid =
seg_start + segment_len as LogOffset - MAX_MSG_HEADER_LEN as LogOffset;
let min_lid = seg_start + SEG_HEADER_LEN as LogOffset;
lid >= min_lid && lid <= max_lid
}
fn scan_segment_headers_and_tail(
min: Lsn,
config: &RunningConfig,
) -> Result<(BTreeMap<Lsn, LogOffset>, Lsn)> {
fn fetch(
idx: u64,
min: Lsn,
config: &RunningConfig,
) -> Option<(LogOffset, SegmentHeader)> {
let segment_len = u64::try_from(config.segment_size).unwrap();
let base_lid = idx * segment_len;
let segment = read_segment_header(&config.file, base_lid).ok()?;
trace!(
"SA scanned header at lid {} during startup: {:?}",
base_lid,
segment
);
if segment.ok && segment.lsn >= min {
assert_ne!(segment.lsn, Lsn::max_value());
Some((base_lid, segment))
} else {
trace!(
"not using segment at lid {}, ok: {} lsn: {} min lsn: {}",
base_lid,
segment.ok,
segment.lsn,
min
);
None
}
};
let segment_len = LogOffset::try_from(config.segment_size).unwrap();
let f = &config.file;
let file_len = f.metadata()?.len();
let segments = (file_len / segment_len)
+ if file_len % segment_len
< LogOffset::try_from(SEG_HEADER_LEN).unwrap()
{
0
} else {
1
};
trace!(
"file len: {} segment len {} segments: {}",
file_len,
segment_len,
segments
);
let header_promises_res: Result<Vec<_>> = (0..segments)
.map({
move |idx| {
threadpool::spawn({
let config = config.clone();
move || fetch(idx, min, &config)
})
}
})
.collect();
let header_promises = header_promises_res?;
let mut headers: Vec<(LogOffset, SegmentHeader)> = vec![];
for promise in header_promises {
let read_attempt =
promise.wait().expect("thread pool should not crash");
if let Some(completed_result) = read_attempt {
headers.push(completed_result);
}
}
let mut ordering = BTreeMap::new();
let mut max_header_stable_lsn = min;
for (lid, header) in headers {
max_header_stable_lsn =
std::cmp::max(header.max_stable_lsn, max_header_stable_lsn);
if let Some(old) = ordering.insert(header.lsn, lid) {
assert_eq!(
old, lid,
"duplicate segment LSN {} detected at both {} and {}, \
one should have been zeroed out during recovery",
header.lsn, old, lid
);
}
}
debug!(
"ordering before clearing tears: {:?}, \
max_header_stable_lsn: {}",
ordering, max_header_stable_lsn
);
let end_of_last_contiguous_message_in_unstable_tail =
check_contiguity_in_unstable_tail(
max_header_stable_lsn,
&ordering,
config,
)?;
Ok((ordering, end_of_last_contiguous_message_in_unstable_tail))
}
fn check_contiguity_in_unstable_tail(
max_header_stable_lsn: Lsn,
ordering: &BTreeMap<Lsn, LogOffset>,
config: &RunningConfig,
) -> Result<Lsn> {
let segment_size = config.segment_size as Lsn;
let lowest_lsn_in_tail: Lsn =
std::cmp::max(0, (max_header_stable_lsn / segment_size) * segment_size);
let mut expected_present = lowest_lsn_in_tail;
let mut missing_item_in_tail = None;
let logical_tail = ordering
.range(lowest_lsn_in_tail..)
.map(|(lsn, lid)| (*lsn, *lid))
.take_while(|(lsn, _lid)| {
let matches = expected_present == *lsn;
if !matches {
debug!(
"failed to find expected segment \
at lsn {}, tear detected",
expected_present
);
missing_item_in_tail = Some(expected_present);
}
expected_present += segment_size;
matches
})
.collect();
debug!(
"in clean_tail_tears, found missing item in tail: {:?} \
and we'll scan segments {:?} above lowest lsn {}",
missing_item_in_tail, logical_tail, lowest_lsn_in_tail
);
let mut iter = LogIter {
config: config.clone(),
segments: logical_tail,
segment_base: None,
max_lsn: missing_item_in_tail,
cur_lsn: None,
last_stage: false,
};
while let Some(_) = iter.next() {}
let end_of_last_message = iter.cur_lsn.unwrap_or(0) - 1;
debug!(
"filtering out segments after detected tear at (lsn, lid) {:?}",
end_of_last_message,
);
Ok(end_of_last_message)
}
pub fn raw_segment_iter_from(
lsn: Lsn,
config: &RunningConfig,
) -> Result<LogIter> {
let segment_len = config.segment_size as Lsn;
let normalized_lsn = lsn / segment_len * segment_len;
let (ordering, end_of_last_msg) =
scan_segment_headers_and_tail(normalized_lsn, config)?;
let tip_segment_iter: BTreeMap<_, _> = ordering
.iter()
.next_back()
.map(|(a, b)| (*a, *b))
.into_iter()
.collect();
trace!(
"trying to find the max stable tip for \
bounding batch manifests with segment iter {:?} \
of segments >= first_tip {}",
tip_segment_iter,
end_of_last_msg,
);
trace!(
"generated iterator over segments {:?} with lsn >= {}",
ordering,
normalized_lsn,
);
let ordering = ordering;
let segments = ordering
.into_iter()
.filter(move |&(l, _)| l >= normalized_lsn)
.collect();
Ok(LogIter {
config: config.clone(),
max_lsn: Some(end_of_last_msg),
cur_lsn: None,
segment_base: None,
segments,
last_stage: true,
})
}