1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::*;

/// A simple map that can be used to store metadata
/// for the pagecache tenant.
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub struct Meta {
    pub(crate) inner: BTreeMap<IVec, PageId>,
}

impl Meta {
    /// Retrieve the `PageId` associated with an identifier
    pub(crate) fn get_root(&self, table: &[u8]) -> Option<PageId> {
        self.inner.get(table).cloned()
    }

    /// Set the `PageId` associated with an identifier
    pub(crate) fn set_root(
        &mut self,
        name: IVec,
        pid: PageId,
    ) -> Option<PageId> {
        self.inner.insert(name, pid)
    }

    /// Remove the page mapping for a given identifier
    pub(crate) fn del_root(&mut self, name: &[u8]) -> Option<PageId> {
        self.inner.remove(name)
    }

    /// Return the current rooted tenants in Meta
    pub(crate) fn tenants(&self) -> BTreeMap<IVec, PageId> {
        self.inner.clone()
    }

    pub(crate) fn rss(&self) -> u64 {
        self.inner
            .iter()
            .map(|(k, _pid)| {
                k.len() as u64 + std::mem::size_of::<PageId>() as u64
            })
            .sum()
    }
}

/// Open or create a new disk-backed Tree with its own keyspace,
/// accessible from the `Db` via the provided identifier.
pub(crate) fn open_tree<V>(
    context: &Context,
    raw_name: V,
    guard: &Guard,
) -> Result<Tree>
where
    V: Into<IVec>,
{
    let name = raw_name.into();

    // we loop because creating this Tree may race with
    // concurrent attempts to open the same one.
    loop {
        match context.pagecache.meta_pid_for_name(&name, guard) {
            Ok(root_id) => {
                return Ok(Tree(Arc::new(TreeInner {
                    tree_id: name,
                    context: context.clone(),
                    subscribers: Subscribers::default(),
                    root: AtomicU64::new(root_id),
                    merge_operator: RwLock::new(None),
                })));
            }
            Err(Error::CollectionNotFound(_)) => {}
            Err(other) => return Err(other),
        }

        // set up empty leaf
        let leaf = Node::default();
        let (leaf_id, leaf_ptr) = context.pagecache.allocate(leaf, guard)?;

        trace!(
            "allocated pid {} for leaf in new_tree for namespace {:?}",
            leaf_id,
            name
        );

        // set up root index

        // vec![0] represents a prefix-encoded empty prefix
        let root = Node::new_root(leaf_id);
        let (root_id, root_ptr) = context.pagecache.allocate(root, guard)?;

        debug!("allocated pid {} for root of new_tree {:?}", root_id, name);

        let res = context.pagecache.cas_root_in_meta(
            &name,
            None,
            Some(root_id),
            guard,
        )?;

        if res.is_err() {
            // clean up the tree we just created if we couldn't
            // install it.
            let _ = context
                .pagecache
                .free(root_id, root_ptr, guard)?
                .expect("could not free allocated page");
            let _ = context
                .pagecache
                .free(leaf_id, leaf_ptr, guard)?
                .expect("could not free allocated page");
            continue;
        }

        return Ok(Tree(Arc::new(TreeInner {
            tree_id: name,
            subscribers: Subscribers::default(),
            context: context.clone(),
            root: AtomicU64::new(root_id),
            merge_operator: RwLock::new(None),
        })));
    }
}