Skip to content

Commit fa77192

Browse files
committed
chore: update examples
1 parent 1cde645 commit fa77192

File tree

8 files changed

+372
-426
lines changed

8 files changed

+372
-426
lines changed

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,15 @@ libtest-mimic = "0.7.3"
4848
futures = "0.3.30"
4949
anyhow = "1.0.86"
5050
powershell_script = "1.1.0"
51+
widestring = "1.0.2"
52+
ssh2 = "0.9.4"
53+
thiserror = "1.0.30"
54+
ctrlc = "3.4.4"
5155

5256
[features]
5357
# Enable globs in the `info::FetchPlaceholders` struct.
5458
globs = ["globset"]
5559

56-
# TODO: temporarily ignored
57-
[workspace]
58-
members = ["examples/sftp"]
59-
6060
[[test]]
6161
harness = false
6262
name = "behavior"

examples/cloud-mirror.rs

Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
use std::{
2+
env,
3+
ffi::OsStr,
4+
fs::{self, File},
5+
io::{Read, Seek, SeekFrom},
6+
path::{Path, PathBuf},
7+
sync::mpsc,
8+
time::UNIX_EPOCH,
9+
};
10+
11+
use cloud_filter::{
12+
error::{CResult, CloudErrorKind},
13+
filter::{info, ticket, Request, SyncFilter},
14+
metadata::Metadata,
15+
placeholder::{ConvertOptions, Placeholder},
16+
placeholder_file::PlaceholderFile,
17+
root::{HydrationType, PopulationType, SecurityId, Session, SyncRootIdBuilder, SyncRootInfo},
18+
utility::{FileTime, WriteAt},
19+
};
20+
21+
// MUST be a multiple of 4096
22+
const CHUNK_SIZE_BYTES: usize = 65536;
23+
24+
const PROVIDER_NAME: &str = "CloudMirrorProvider";
25+
const DISPLAY_NAME: &str = "Cloud Mirror";
26+
const VERSION: &str = "1.0.0";
27+
28+
fn main() {
29+
let server_path = PathBuf::from(env::var("SERVER").expect("SERVER env var"));
30+
let client_path = PathBuf::from(env::var("CLIENT").expect("CLIENT env var"));
31+
32+
let sync_root_id = SyncRootIdBuilder::new(PROVIDER_NAME)
33+
.user_security_id(SecurityId::current_user().unwrap())
34+
.build();
35+
// register the sync root if it isn't already registered
36+
if !sync_root_id.is_registered().unwrap() {
37+
sync_root_id
38+
.register(
39+
SyncRootInfo::default()
40+
.with_display_name(DISPLAY_NAME)
41+
.with_hydration_type(HydrationType::Full)
42+
.with_population_type(PopulationType::Full)
43+
.with_icon("%SystemRoot%\\system32\\charmap.exe,0")
44+
.with_version(VERSION)
45+
.with_recycle_bin_uri("http://cloudmirror.example.com/recyclebin")
46+
.unwrap()
47+
.with_path(Path::new(&client_path))
48+
.unwrap(),
49+
)
50+
.unwrap()
51+
}
52+
53+
mark_in_sync(&client_path, &client_path, &server_path);
54+
55+
let connection = Session::new()
56+
.connect(
57+
&client_path,
58+
CloudMirror {
59+
client_path: client_path.clone(),
60+
server_path,
61+
},
62+
)
63+
.unwrap();
64+
65+
wait_for_ctrlc();
66+
67+
drop(connection);
68+
sync_root_id.unregister().unwrap();
69+
}
70+
71+
#[derive(Debug)]
72+
struct CloudMirror {
73+
/// Destination folder
74+
client_path: PathBuf,
75+
/// Source folder
76+
server_path: PathBuf,
77+
}
78+
79+
impl SyncFilter for CloudMirror {
80+
fn fetch_data(
81+
&self,
82+
request: Request,
83+
ticket: ticket::FetchData,
84+
info: info::FetchData,
85+
) -> CResult<()> {
86+
// Server path stored in fetch_placeholders
87+
let path = Path::new(unsafe { OsStr::from_encoded_bytes_unchecked(request.file_blob()) });
88+
89+
let range = info.required_file_range();
90+
let end = range.end;
91+
let mut position = range.start;
92+
93+
println!(
94+
"fetch_data {:?} {:?} {}",
95+
path,
96+
range,
97+
info.interrupted_hydration()
98+
);
99+
let mut server_file = File::open(path).map_err(|_| CloudErrorKind::InvalidRequest)?;
100+
server_file
101+
.seek(SeekFrom::Start(position))
102+
.map_err(|_| CloudErrorKind::InvalidRequest)?;
103+
104+
let mut buffer = [0; CHUNK_SIZE_BYTES];
105+
loop {
106+
let mut bytes_read = server_file
107+
.read(&mut buffer)
108+
.map_err(|_| CloudErrorKind::InvalidRequest)?;
109+
110+
let unaligned = bytes_read % 4096;
111+
if unaligned != 0 && position + (bytes_read as u64) < end {
112+
bytes_read -= unaligned;
113+
server_file
114+
.seek(SeekFrom::Current(-(unaligned as i64)))
115+
.unwrap();
116+
}
117+
ticket
118+
.write_at(&buffer[0..bytes_read], position)
119+
.map_err(|_| CloudErrorKind::InvalidRequest)?;
120+
position += bytes_read as u64;
121+
122+
if position >= end {
123+
break;
124+
}
125+
126+
ticket.report_progress(end, position).unwrap();
127+
}
128+
129+
Ok(())
130+
}
131+
132+
fn cancel_fetch_data(&self, _request: Request, _info: info::CancelFetchData) {
133+
println!("cancel fetch data");
134+
}
135+
136+
fn validate_data(
137+
&self,
138+
_request: Request,
139+
_ticket: ticket::ValidateData,
140+
_info: info::ValidateData,
141+
) -> CResult<()> {
142+
println!("validate data");
143+
Err(CloudErrorKind::NotSupported)
144+
}
145+
146+
fn fetch_placeholders(
147+
&self,
148+
request: Request,
149+
ticket: ticket::FetchPlaceholders,
150+
info: info::FetchPlaceholders,
151+
) -> CResult<()> {
152+
println!(
153+
"fetch_placeholders {:?} {:?}",
154+
request.path(),
155+
info.pattern()
156+
);
157+
let absolute = request.path();
158+
let path = absolute.strip_prefix(&self.client_path).unwrap();
159+
160+
let dirs = fs::read_dir(&self.server_path.join(path)).unwrap();
161+
let mut placeholders = dirs
162+
.into_iter()
163+
.filter_map(|entry| {
164+
entry
165+
.and_then(|e| {
166+
Ok((
167+
e.path()
168+
.strip_prefix(&self.server_path)
169+
.unwrap()
170+
.to_path_buf(),
171+
e.metadata()?,
172+
))
173+
})
174+
.ok()
175+
})
176+
// Only create placeholders that don't exist on client path
177+
.filter(|(relative_path, _)| !self.client_path.join(relative_path).exists())
178+
.map(|(relative_path, stat)| {
179+
println!("relative_path: {:?}, stat {:?}", relative_path, stat);
180+
println!("is file: {}, is dir: {}", stat.is_file(), stat.is_dir());
181+
182+
let accessed = stat
183+
.accessed()
184+
.ok()
185+
.and_then(|t| {
186+
FileTime::from_unix_time(
187+
t.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() as _,
188+
)
189+
.ok()
190+
})
191+
.unwrap_or_default();
192+
PlaceholderFile::new(&relative_path)
193+
.metadata(
194+
match stat.is_dir() {
195+
true => Metadata::directory(),
196+
false => Metadata::file(),
197+
}
198+
.size(stat.len())
199+
.accessed(accessed),
200+
)
201+
.mark_in_sync()
202+
.overwrite()
203+
.blob(
204+
self.server_path
205+
.join(relative_path)
206+
.into_os_string()
207+
.into_encoded_bytes(),
208+
)
209+
})
210+
.collect::<Vec<_>>();
211+
212+
ticket.pass_with_placeholder(&mut placeholders).unwrap();
213+
214+
Ok(())
215+
}
216+
217+
fn cancel_fetch_placeholders(&self, _request: Request, _info: info::CancelFetchPlaceholders) {
218+
println!("cancel fetch placeholders");
219+
}
220+
221+
fn opened(&self, request: Request, _info: info::Opened) {
222+
println!("file opened {:?}", request.path());
223+
}
224+
225+
fn closed(&self, request: Request, _info: info::Closed) {
226+
println!("file closed {:?}", request.path());
227+
}
228+
229+
fn dehydrate(
230+
&self,
231+
_request: Request,
232+
_ticket: ticket::Dehydrate,
233+
_info: info::Dehydrate,
234+
) -> CResult<()> {
235+
println!("dehydrate");
236+
Err(CloudErrorKind::NotSupported)
237+
}
238+
239+
fn dehydrated(&self, _request: Request, _info: info::Dehydrated) {
240+
println!("dehydrated");
241+
}
242+
243+
fn delete(&self, request: Request, ticket: ticket::Delete, info: info::Delete) -> CResult<()> {
244+
println!(
245+
"delete {:?}, is_undeleted: {}",
246+
request.path(),
247+
info.is_undelete()
248+
);
249+
if !info.is_undelete() {
250+
ticket.pass().unwrap();
251+
return Ok(());
252+
}
253+
254+
// Server path stored in fetch_placeholders
255+
let path = Path::new(unsafe { OsStr::from_encoded_bytes_unchecked(request.file_blob()) });
256+
match info.is_directory() {
257+
true => fs::remove_dir_all(path).map_err(|_| CloudErrorKind::InvalidRequest)?,
258+
false => fs::remove_file(path).map_err(|_| CloudErrorKind::InvalidRequest)?,
259+
}
260+
ticket.pass().unwrap();
261+
Ok(())
262+
}
263+
264+
fn deleted(&self, _request: Request, _info: info::Deleted) {
265+
println!("deleted");
266+
}
267+
268+
fn rename(&self, request: Request, ticket: ticket::Rename, info: info::Rename) -> CResult<()> {
269+
let src = request.path();
270+
let dest = info.target_path();
271+
272+
println!(
273+
"rename {} to {}, source in scope: {}, target in scope: {}",
274+
src.display(),
275+
dest.display(),
276+
info.source_in_scope(),
277+
info.target_in_scope()
278+
);
279+
280+
match (info.source_in_scope(), info.target_in_scope()) {
281+
(true, true) => {
282+
fs::rename(
283+
src.strip_prefix(&self.client_path).unwrap(),
284+
dest.strip_prefix(&self.client_path).unwrap(),
285+
)
286+
.map_err(|_| CloudErrorKind::InvalidRequest)?;
287+
}
288+
(true, false) => {}
289+
(false, true) => Err(CloudErrorKind::NotSupported)?, // TODO
290+
(false, false) => Err(CloudErrorKind::InvalidRequest)?,
291+
}
292+
293+
ticket.pass().unwrap();
294+
Ok(())
295+
}
296+
297+
fn renamed(&self, _request: Request, _info: info::Renamed) {
298+
println!("renamed");
299+
}
300+
301+
fn state_changed(&self, changes: Vec<PathBuf>) {
302+
println!("state_changed: {:?}", changes);
303+
}
304+
}
305+
306+
fn mark_in_sync(path: &Path, client: &Path, server: &Path) {
307+
for entry in path.read_dir().unwrap() {
308+
let entry = entry.unwrap();
309+
let remote_path = entry.path().strip_prefix(&client).unwrap().to_owned();
310+
311+
let Ok(meta) = fs::metadata(server.join(&remote_path)) else {
312+
continue;
313+
};
314+
if meta.is_dir() != entry.file_type().unwrap().is_dir() {
315+
continue;
316+
}
317+
318+
let mut options = ConvertOptions::default()
319+
.mark_in_sync()
320+
.blob(remote_path.clone().into_os_string().into_encoded_bytes());
321+
let mut placeholder = match meta.is_dir() {
322+
true => {
323+
options = options.has_children();
324+
let Ok(placeholder) = Placeholder::open(entry.path()) else {
325+
continue;
326+
};
327+
placeholder
328+
}
329+
false => {
330+
let Ok(file) = File::open(entry.path()) else {
331+
continue;
332+
};
333+
file.into()
334+
}
335+
};
336+
337+
_ = placeholder
338+
.convert_to_placeholder(options, None)
339+
.inspect_err(|e| println!("convert_to_placeholder {:?}", e));
340+
341+
if meta.is_dir() {
342+
mark_in_sync(&entry.path(), client, server);
343+
}
344+
}
345+
}
346+
347+
fn wait_for_ctrlc() {
348+
let (tx, rx) = mpsc::channel();
349+
350+
ctrlc::set_handler(move || {
351+
tx.send(()).unwrap();
352+
})
353+
.expect("Error setting Ctrl-C handler");
354+
355+
rx.recv().unwrap();
356+
}

examples/cloud-mirror/.gitignore

Lines changed: 0 additions & 2 deletions
This file was deleted.

examples/cloud-mirror/Cargo.toml

Lines changed: 0 additions & 11 deletions
This file was deleted.

0 commit comments

Comments
 (0)