1
//! This is an end-to-end integration tests using the CFDP abstractions provided by the library.
2
use std::{
3
    fs::OpenOptions,
4
    io::Write,
5
    sync::{atomic::AtomicBool, mpsc, Arc},
6
    thread,
7
    time::Duration,
8
};
9

            
10
use cfdp::{
11
    dest::DestinationHandler,
12
    filestore::NativeFilestore,
13
    request::{PutRequestOwned, StaticPutRequestCacher},
14
    source::SourceHandler,
15
    user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
16
    EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig,
17
    StdTimerCreator, TransactionId, UserFaultHookProvider,
18
};
19
use spacepackets::{
20
    cfdp::{ChecksumType, ConditionCode, TransmissionMode},
21
    seq_count::SeqCountProviderSyncU16,
22
    util::UnsignedByteFieldU16,
23
};
24

            
25
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
26
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
27

            
28
const FILE_DATA: &str = "Hello World!";
29

            
30
#[derive(Default)]
31
pub struct ExampleFaultHandler {}
32

            
33
impl UserFaultHookProvider for ExampleFaultHandler {
34
    fn notice_of_suspension_cb(
35
        &mut self,
36
        transaction_id: TransactionId,
37
        cond: ConditionCode,
38
        progress: u64,
39
    ) {
40
        panic!(
41
            "unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
42
            transaction_id, cond, progress
43
        );
44
    }
45

            
46
    fn notice_of_cancellation_cb(
47
        &mut self,
48
        transaction_id: TransactionId,
49
        cond: ConditionCode,
50
        progress: u64,
51
    ) {
52
        panic!(
53
            "unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
54
            transaction_id, cond, progress
55
        );
56
    }
57

            
58
    fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
59
        panic!(
60
            "unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
61
            transaction_id, cond, progress
62
        );
63
    }
64

            
65
    fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
66
        panic!(
67
            "ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
68
            transaction_id, cond, progress
69
        );
70
    }
71
}
72

            
73
pub struct ExampleCfdpUser {
74
    entity_type: EntityType,
75
    completion_signal: Arc<AtomicBool>,
76
}
77

            
78
impl ExampleCfdpUser {
79
8
    pub fn new(entity_type: EntityType, completion_signal: Arc<AtomicBool>) -> Self {
80
8
        Self {
81
8
            entity_type,
82
8
            completion_signal,
83
8
        }
84
8
    }
85
}
86

            
87
impl CfdpUser for ExampleCfdpUser {
88
4
    fn transaction_indication(&mut self, id: &crate::TransactionId) {
89
4
        println!(
90
4
            "{:?} entity: Transaction indication for {:?}",
91
4
            self.entity_type, id
92
4
        );
93
4
    }
94

            
95
4
    fn eof_sent_indication(&mut self, id: &crate::TransactionId) {
96
4
        println!(
97
4
            "{:?} entity: EOF sent for transaction {:?}",
98
4
            self.entity_type, id
99
4
        );
100
4
    }
101

            
102
8
    fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) {
103
8
        println!(
104
8
            "{:?} entity: Transaction finished: {:?}",
105
8
            self.entity_type, finished_params
106
8
        );
107
8
        self.completion_signal
108
8
            .store(true, std::sync::atomic::Ordering::Relaxed);
109
8
    }
110

            
111
4
    fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
112
4
        println!(
113
4
            "{:?} entity: Metadata received: {:?}",
114
4
            self.entity_type, md_recvd_params
115
4
        );
116
4
    }
117

            
118
4
    fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) {
119
4
        println!(
120
4
            "{:?} entity: File segment {:?} received",
121
4
            self.entity_type, segment_recvd_params
122
4
        );
123
4
    }
124

            
125
    fn report_indication(&mut self, _id: &crate::TransactionId) {}
126

            
127
    fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) {
128
        panic!("unexpected suspended indication");
129
    }
130

            
131
    fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {}
132

            
133
    fn fault_indication(
134
        &mut self,
135
        _id: &crate::TransactionId,
136
        _condition_code: ConditionCode,
137
        _progress: u64,
138
    ) {
139
        panic!("unexpected fault indication");
140
    }
141

            
142
    fn abandoned_indication(
143
        &mut self,
144
        _id: &crate::TransactionId,
145
        _condition_code: ConditionCode,
146
        _progress: u64,
147
    ) {
148
        panic!("unexpected abandoned indication");
149
    }
150

            
151
4
    fn eof_recvd_indication(&mut self, id: &crate::TransactionId) {
152
4
        println!(
153
4
            "{:?} entity: EOF received for transaction {:?}",
154
4
            self.entity_type, id
155
4
        );
156
4
    }
157
}
158

            
159
4
fn end_to_end_test(with_closure: bool) {
160
4
    // Simplified event handling using atomic signals.
161
4
    let stop_signal_source = Arc::new(AtomicBool::new(false));
162
4
    let stop_signal_dest = stop_signal_source.clone();
163
4
    let stop_signal_ctrl = stop_signal_source.clone();
164
4

            
165
4
    let completion_signal_source = Arc::new(AtomicBool::new(false));
166
4
    let completion_signal_source_main = completion_signal_source.clone();
167
4

            
168
4
    let completion_signal_dest = Arc::new(AtomicBool::new(false));
169
4
    let completion_signal_dest_main = completion_signal_dest.clone();
170
4

            
171
4
    let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
172
4
    let mut file = OpenOptions::new()
173
4
        .write(true)
174
4
        .open(&srcfile)
175
4
        .expect("opening file failed");
176
4
    file.write_all(FILE_DATA.as_bytes())
177
4
        .expect("writing file content failed");
178
4
    let destdir = tempfile::tempdir().expect("creating temp directory failed");
179
4
    let destfile = destdir.path().join("test.txt");
180
4

            
181
4
    let local_cfg_source = LocalEntityConfig::new(
182
4
        LOCAL_ID.into(),
183
4
        IndicationConfig::default(),
184
4
        ExampleFaultHandler::default(),
185
4
    );
186
4
    let (source_tx, source_rx) = mpsc::channel::<PduOwnedWithInfo>();
187
4
    let (dest_tx, dest_rx) = mpsc::channel::<PduOwnedWithInfo>();
188
4
    let put_request_cacher = StaticPutRequestCacher::new(2048);
189
4
    let remote_cfg_of_dest = RemoteEntityConfig::new_with_default_values(
190
4
        REMOTE_ID.into(),
191
4
        1024,
192
4
        with_closure,
193
4
        false,
194
4
        spacepackets::cfdp::TransmissionMode::Unacknowledged,
195
4
        ChecksumType::Crc32,
196
4
    );
197
4
    let seq_count_provider = SeqCountProviderSyncU16::default();
198
4
    let mut source_handler = SourceHandler::new(
199
4
        local_cfg_source,
200
4
        source_tx,
201
4
        NativeFilestore::default(),
202
4
        put_request_cacher,
203
4
        2048,
204
4
        remote_cfg_of_dest,
205
4
        StdTimerCreator::default(),
206
4
        seq_count_provider,
207
4
    );
208
4
    let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source);
209
4

            
210
4
    let local_cfg_dest = LocalEntityConfig::new(
211
4
        REMOTE_ID.into(),
212
4
        IndicationConfig::default(),
213
4
        ExampleFaultHandler::default(),
214
4
    );
215
4
    let remote_cfg_of_source = RemoteEntityConfig::new_with_default_values(
216
4
        LOCAL_ID.into(),
217
4
        1024,
218
4
        true,
219
4
        false,
220
4
        spacepackets::cfdp::TransmissionMode::Unacknowledged,
221
4
        ChecksumType::Crc32,
222
4
    );
223
4
    let mut dest_handler = DestinationHandler::new(
224
4
        local_cfg_dest,
225
4
        1024,
226
4
        dest_tx,
227
4
        NativeFilestore::default(),
228
4
        remote_cfg_of_source,
229
4
        StdTimerCreator::default(),
230
4
    );
231
4
    let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);
232
4

            
233
4
    let put_request = PutRequestOwned::new_regular_request(
234
4
        REMOTE_ID.into(),
235
4
        srcfile.to_str().expect("invaid path string"),
236
4
        destfile.to_str().expect("invaid path string"),
237
4
        Some(TransmissionMode::Unacknowledged),
238
4
        Some(with_closure),
239
4
    )
240
4
    .expect("put request creation failed");
241
4

            
242
4
    let start = std::time::Instant::now();
243
4

            
244
6
    let jh_source = thread::spawn(move || {
245
4
        source_handler
246
4
            .put_request(&put_request)
247
4
            .expect("put request failed");
248
        loop {
249
26
            let mut next_delay = None;
250
26
            let mut undelayed_call_count = 0;
251
26
            let packet_info = match dest_rx.try_recv() {
252
2
                Ok(pdu_with_info) => Some(pdu_with_info),
253
24
                Err(e) => match e {
254
24
                    mpsc::TryRecvError::Empty => None,
255
                    mpsc::TryRecvError::Disconnected => {
256
                        panic!("unexpected disconnect from destination channel sender");
257
                    }
258
                },
259
            };
260
26
            match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) {
261
26
                Ok(sent_packets) => {
262
26
                    if sent_packets == 0 {
263
18
                        next_delay = Some(Duration::from_millis(50));
264
18
                    }
265
                }
266
                Err(e) => {
267
                    println!("Source handler error: {}", e);
268
                    next_delay = Some(Duration::from_millis(50));
269
                }
270
            }
271
26
            if let Some(delay) = next_delay {
272
18
                thread::sleep(delay);
273
18
            } else {
274
8
                undelayed_call_count += 1;
275
8
            }
276
26
            if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) {
277
4
                break;
278
22
            }
279
22
            // Safety feature against configuration errors.
280
22
            if undelayed_call_count >= 200 {
281
                panic!("Source handler state machine possible in permanent loop");
282
22
            }
283
        }
284
6
    });
285
4

            
286
6
    let jh_dest = thread::spawn(move || {
287
        loop {
288
20
            let mut next_delay = None;
289
20
            let mut undelayed_call_count = 0;
290
20
            let packet_info = match source_rx.try_recv() {
291
12
                Ok(pdu_with_info) => Some(pdu_with_info),
292
8
                Err(e) => match e {
293
8
                    mpsc::TryRecvError::Empty => None,
294
                    mpsc::TryRecvError::Disconnected => {
295
                        panic!("unexpected disconnect from destination channel sender");
296
                    }
297
                },
298
            };
299
20
            match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
300
20
                Ok(sent_packets) => {
301
20
                    if sent_packets == 0 {
302
18
                        next_delay = Some(Duration::from_millis(50));
303
18
                    }
304
                }
305
                Err(e) => {
306
                    println!("Source handler error: {}", e);
307
                    next_delay = Some(Duration::from_millis(50));
308
                }
309
            }
310
20
            if let Some(delay) = next_delay {
311
18
                thread::sleep(delay);
312
18
            } else {
313
2
                undelayed_call_count += 1;
314
2
            }
315
20
            if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) {
316
4
                break;
317
16
            }
318
16
            // Safety feature against configuration errors.
319
16
            if undelayed_call_count >= 200 {
320
                panic!("Destination handler state machine possible in permanent loop");
321
16
            }
322
        }
323
6
    });
324

            
325
    loop {
326
20
        if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed)
327
10
            && completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed)
328
        {
329
4
            let file = std::fs::read_to_string(destfile).expect("reading file failed");
330
4
            assert_eq!(file, FILE_DATA);
331
            // Stop the threads gracefully.
332
4
            stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed);
333
4
            break;
334
16
        }
335
16
        if std::time::Instant::now() - start > Duration::from_secs(2) {
336
            panic!("file transfer not finished in 2 seconds");
337
16
        }
338
16
        std::thread::sleep(Duration::from_millis(50));
339
    }
340

            
341
4
    jh_source.join().unwrap();
342
4
    jh_dest.join().unwrap();
343
4
}
344

            
345
#[test]
346
2
fn end_to_end_test_no_closure() {
347
2
    end_to_end_test(false);
348
2
}
349

            
350
#[test]
351
2
fn end_to_end_test_with_closure() {
352
2
    end_to_end_test(true);
353
2
}