1
use std::{
2
    fmt::Debug,
3
    fs::OpenOptions,
4
    io::{self, ErrorKind, Write},
5
    net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
6
    sync::mpsc,
7
    thread,
8
    time::Duration,
9
};
10

            
11
use cfdp::{
12
    dest::DestinationHandler,
13
    filestore::NativeFilestore,
14
    request::{PutRequestOwned, StaticPutRequestCacher},
15
    source::SourceHandler,
16
    user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
17
    EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
18
    RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider,
19
};
20
use clap::Parser;
21
use log::{debug, info, warn};
22
use spacepackets::{
23
    cfdp::{
24
        pdu::{file_data::FileDataPdu, metadata::MetadataPduReader, PduError},
25
        ChecksumType, ConditionCode, TransmissionMode,
26
    },
27
    seq_count::SeqCountProviderSyncU16,
28
    util::{UnsignedByteFieldU16, UnsignedEnum},
29
};
30

            
31
const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
32
const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
33

            
34
const RUST_PORT: u16 = 5111;
35
const PY_PORT: u16 = 5222;
36

            
37
const LOG_LEVEL: log::LevelFilter = log::LevelFilter::Info;
38

            
39
const FILE_DATA: &str = "Hello World!";
40

            
41
#[derive(Debug, Copy, Clone, clap::ValueEnum)]
42
pub enum TransmissionModeCli {
43
    Nak,
44
    Ack,
45
}
46

            
47
#[derive(clap::Parser)]
48
#[command(about = "Arguments for executing a file copy operation")]
49
pub struct Cli {
50
    #[arg(short, help = "Perform a file copy operation")]
51
    file_copy: bool,
52
    #[arg(short, default_value = "nak")]
53
    mode: Option<TransmissionModeCli>,
54
    #[arg(short)]
55
    closure_requested: Option<bool>,
56
}
57

            
58
#[derive(Default)]
59
pub struct ExampleFaultHandler {}
60

            
61
impl UserFaultHookProvider for ExampleFaultHandler {
62
    fn notice_of_suspension_cb(
63
        &mut self,
64
        transaction_id: TransactionId,
65
        cond: ConditionCode,
66
        progress: u64,
67
    ) {
68
        panic!(
69
            "unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
70
            transaction_id, cond, progress
71
        );
72
    }
73

            
74
    fn notice_of_cancellation_cb(
75
        &mut self,
76
        transaction_id: TransactionId,
77
        cond: ConditionCode,
78
        progress: u64,
79
    ) {
80
        panic!(
81
            "unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
82
            transaction_id, cond, progress
83
        );
84
    }
85

            
86
    fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
87
        panic!(
88
            "unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
89
            transaction_id, cond, progress
90
        );
91
    }
92

            
93
    fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
94
        panic!(
95
            "ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
96
            transaction_id, cond, progress
97
        );
98
    }
99
}
100

            
101
pub struct ExampleCfdpUser {
102
    entity_type: EntityType,
103
}
104

            
105
impl ExampleCfdpUser {
106
    pub fn new(entity_type: EntityType) -> Self {
107
        Self { entity_type }
108
    }
109
}
110

            
111
impl CfdpUser for ExampleCfdpUser {
112
    fn transaction_indication(&mut self, id: &crate::TransactionId) {
113
        println!(
114
            "{:?} entity: Transaction indication for {:?}",
115
            self.entity_type, id
116
        );
117
    }
118

            
119
    fn eof_sent_indication(&mut self, id: &crate::TransactionId) {
120
        println!(
121
            "{:?} entity: EOF sent for transaction {:?}",
122
            self.entity_type, id
123
        );
124
    }
125

            
126
    fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) {
127
        println!(
128
            "{:?} entity: Transaction finished: {:?}",
129
            self.entity_type, finished_params
130
        );
131
    }
132

            
133
    fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
134
        println!(
135
            "{:?} entity: Metadata received: {:?}",
136
            self.entity_type, md_recvd_params
137
        );
138
    }
139

            
140
    fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) {
141
        println!(
142
            "{:?} entity: File segment {:?} received",
143
            self.entity_type, segment_recvd_params
144
        );
145
    }
146

            
147
    fn report_indication(&mut self, _id: &crate::TransactionId) {}
148

            
149
    fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) {
150
        panic!("unexpected suspended indication");
151
    }
152

            
153
    fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {}
154

            
155
    fn fault_indication(
156
        &mut self,
157
        _id: &crate::TransactionId,
158
        _condition_code: ConditionCode,
159
        _progress: u64,
160
    ) {
161
        panic!("unexpected fault indication");
162
    }
163

            
164
    fn abandoned_indication(
165
        &mut self,
166
        _id: &crate::TransactionId,
167
        _condition_code: ConditionCode,
168
        _progress: u64,
169
    ) {
170
        panic!("unexpected abandoned indication");
171
    }
172

            
173
    fn eof_recvd_indication(&mut self, id: &crate::TransactionId) {
174
        println!(
175
            "{:?} entity: EOF received for transaction {:?}",
176
            self.entity_type, id
177
        );
178
    }
179
}
180

            
181
pub struct UdpServer {
182
    pub socket: UdpSocket,
183
    recv_buf: Vec<u8>,
184
    remote_addr: SocketAddr,
185
    source_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
186
    dest_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
187
    source_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
188
    dest_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
189
}
190

            
191
#[derive(Debug, thiserror::Error)]
192
pub enum UdpServerError {
193
    #[error(transparent)]
194
    Io(#[from] io::Error),
195
    #[error("pdu error: {0}")]
196
    Pdu(#[from] PduError),
197
    #[error("send error")]
198
    Send,
199
}
200

            
201
impl UdpServer {
202
    pub fn new<A: ToSocketAddrs>(
203
        addr: A,
204
        remote_addr: SocketAddr,
205
        max_recv_size: usize,
206
        source_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
207
        dest_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
208
        source_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
209
        dest_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
210
    ) -> Result<Self, io::Error> {
211
        let server = Self {
212
            socket: UdpSocket::bind(addr)?,
213
            recv_buf: vec![0; max_recv_size],
214
            source_tc_tx,
215
            dest_tc_tx,
216
            remote_addr,
217
            source_tm_rx,
218
            dest_tm_rx,
219
        };
220
        server.socket.set_nonblocking(true)?;
221
        Ok(server)
222
    }
223

            
224
    pub fn try_recv_tc(
225
        &mut self,
226
    ) -> Result<Option<(PduOwnedWithInfo, SocketAddr)>, UdpServerError> {
227
        let res = match self.socket.recv_from(&mut self.recv_buf) {
228
            Ok(res) => res,
229
            Err(e) => {
230
                return if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut {
231
                    Ok(None)
232
                } else {
233
                    Err(e.into())
234
                }
235
            }
236
        };
237
        let (_, from) = res;
238
        self.remote_addr = from;
239
        let pdu_owned = PduOwnedWithInfo::new_from_raw_packet(&self.recv_buf)?;
240
        match pdu_owned.packet_target()? {
241
            cfdp::PacketTarget::SourceEntity => {
242
                self.source_tc_tx
243
                    .send(pdu_owned.clone())
244
                    .map_err(|_| UdpServerError::Send)?;
245
            }
246
            cfdp::PacketTarget::DestEntity => {
247
                self.dest_tc_tx
248
                    .send(pdu_owned.clone())
249
                    .map_err(|_| UdpServerError::Send)?;
250
            }
251
        }
252
        Ok(Some((pdu_owned, from)))
253
    }
254

            
255
    pub fn recv_and_send_telemetry(&mut self) {
256
        let tm_handler = |receiver: &mpsc::Receiver<PduOwnedWithInfo>| {
257
            while let Ok(tm) = receiver.try_recv() {
258
                debug!("Sending PDU: {:?}", tm);
259
                pdu_printout(&tm);
260
                let result = self.socket.send_to(tm.pdu(), self.remote_addr());
261
                if let Err(e) = result {
262
                    warn!("Sending TM with UDP socket failed: {e}")
263
                }
264
            }
265
        };
266
        tm_handler(&self.source_tm_rx);
267
        tm_handler(&self.dest_tm_rx);
268
    }
269

            
270
    pub fn remote_addr(&self) -> SocketAddr {
271
        self.remote_addr
272
    }
273
}
274

            
275
fn pdu_printout(pdu: &PduOwnedWithInfo) {
276
    match pdu.pdu_type() {
277
        spacepackets::cfdp::PduType::FileDirective => match pdu.file_directive_type().unwrap() {
278
            spacepackets::cfdp::pdu::FileDirectiveType::EofPdu => (),
279
            spacepackets::cfdp::pdu::FileDirectiveType::FinishedPdu => (),
280
            spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (),
281
            spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => {
282
                let meta_pdu =
283
                    MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed");
284
                debug!("Metadata PDU: {:?}", meta_pdu)
285
            }
286
            spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (),
287
            spacepackets::cfdp::pdu::FileDirectiveType::PromptPdu => (),
288
            spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (),
289
        },
290
        spacepackets::cfdp::PduType::FileData => {
291
            let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed");
292
            debug!("File data PDU: {:?}", fd_pdu);
293
        }
294
    }
295
}
296

            
297
fn main() {
298
    let cli_args = Cli::parse();
299
    fern::Dispatch::new()
300
        .format(|out, message, record| {
301
            out.finish(format_args!(
302
                "{}[{}][{}] {}",
303
                chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
304
                std::thread::current().name().expect("thread is not named"),
305
                record.level(),
306
                message
307
            ))
308
        })
309
        .level(LOG_LEVEL)
310
        .chain(std::io::stdout())
311
        .apply()
312
        .unwrap();
313

            
314
    let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
315
    let mut file = OpenOptions::new()
316
        .write(true)
317
        .open(&srcfile)
318
        .expect("opening file failed");
319
    info!("created test source file {:?}", srcfile);
320
    file.write_all(FILE_DATA.as_bytes())
321
        .expect("writing file content failed");
322
    let destdir = tempfile::tempdir().expect("creating temp directory failed");
323
    let destfile = destdir.path().join("test.txt");
324

            
325
    let local_cfg_source = LocalEntityConfig::new(
326
        RUST_ID.into(),
327
        IndicationConfig::default(),
328
        ExampleFaultHandler::default(),
329
    );
330
    let (source_tm_tx, source_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
331
    let (dest_tm_tx, dest_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
332
    let put_request_cacher = StaticPutRequestCacher::new(2048);
333
    let remote_cfg_python = RemoteEntityConfig::new_with_default_values(
334
        PYTHON_ID.into(),
335
        1024,
336
        true,
337
        false,
338
        spacepackets::cfdp::TransmissionMode::Unacknowledged,
339
        ChecksumType::Crc32C,
340
    );
341
    let seq_count_provider = SeqCountProviderSyncU16::default();
342
    let mut source_handler = SourceHandler::new(
343
        local_cfg_source,
344
        source_tm_tx,
345
        NativeFilestore::default(),
346
        put_request_cacher,
347
        2048,
348
        remote_cfg_python,
349
        StdTimerCreator::default(),
350
        seq_count_provider,
351
    );
352
    let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending);
353

            
354
    let local_cfg_dest = LocalEntityConfig::new(
355
        RUST_ID.into(),
356
        IndicationConfig::default(),
357
        ExampleFaultHandler::default(),
358
    );
359
    let mut dest_handler = DestinationHandler::new(
360
        local_cfg_dest,
361
        1024,
362
        dest_tm_tx,
363
        NativeFilestore::default(),
364
        remote_cfg_python,
365
        StdTimerCreator::default(),
366
    );
367
    let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);
368

            
369
    let put_request = if cli_args.file_copy {
370
        Some(
371
            PutRequestOwned::new_regular_request(
372
                PYTHON_ID.into(),
373
                srcfile.to_str().expect("invaid path string"),
374
                destfile.to_str().expect("invaid path string"),
375
                cli_args.mode.map(|m| match m {
376
                    TransmissionModeCli::Ack => TransmissionMode::Acknowledged,
377
                    TransmissionModeCli::Nak => TransmissionMode::Unacknowledged,
378
                }),
379
                cli_args.closure_requested,
380
            )
381
            .expect("put request creation failed"),
382
        )
383
    } else {
384
        None
385
    };
386

            
387
    let (source_tc_tx, source_tc_rx) = mpsc::channel();
388
    let (dest_tc_tx, dest_tc_rx) = mpsc::channel();
389

            
390
    let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), RUST_PORT);
391
    let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), PY_PORT);
392
    let mut udp_server = UdpServer::new(
393
        local_addr,
394
        remote_addr,
395
        2048,
396
        source_tc_tx,
397
        dest_tc_tx,
398
        source_tm_rx,
399
        dest_tm_rx,
400
    )
401
    .expect("creating UDP server failed");
402

            
403
    let jh_source = thread::Builder::new()
404
        .name("cfdp src entity".to_string())
405
        .spawn(move || {
406
            info!("Starting RUST SRC");
407
            if let Some(put_request) = put_request {
408
                info!("RUST SRC: Performing put request: {:?}", put_request);
409
                source_handler
410
                    .put_request(&put_request)
411
                    .expect("put request failed");
412
            }
413
            loop {
414
                let mut next_delay = None;
415
                let mut undelayed_call_count = 0;
416
                let packet_info = match source_tc_rx.try_recv() {
417
                    Ok(pdu_with_info) => Some(pdu_with_info),
418
                    Err(e) => match e {
419
                        mpsc::TryRecvError::Empty => None,
420
                        mpsc::TryRecvError::Disconnected => {
421
                            panic!("unexpected disconnect from destination channel sender");
422
                        }
423
                    },
424
                };
425
                match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) {
426
                    Ok(sent_packets) => {
427
                        if sent_packets == 0 {
428
                            next_delay = Some(Duration::from_millis(50));
429
                        }
430
                    }
431
                    Err(e) => {
432
                        warn!("cfdp src entity error: {}", e);
433
                        next_delay = Some(Duration::from_millis(50));
434
                    }
435
                }
436
                if let Some(delay) = next_delay {
437
                    thread::sleep(delay);
438
                } else {
439
                    undelayed_call_count += 1;
440
                }
441
                // Safety feature against configuration errors.
442
                if undelayed_call_count >= 200 {
443
                    panic!("Source handler state machine possible in permanent loop");
444
                }
445
            }
446
        })
447
        .unwrap();
448

            
449
    let jh_dest = thread::Builder::new()
450
        .name("cfdp dest entity".to_string())
451
        .spawn(move || {
452
            info!("Starting RUST DEST. Local ID {}", RUST_ID.value());
453
            loop {
454
                let mut next_delay = None;
455
                let mut undelayed_call_count = 0;
456
                let packet_info = match dest_tc_rx.try_recv() {
457
                    Ok(pdu_with_info) => Some(pdu_with_info),
458
                    Err(e) => match e {
459
                        mpsc::TryRecvError::Empty => None,
460
                        mpsc::TryRecvError::Disconnected => {
461
                            panic!("unexpected disconnect from destination channel sender");
462
                        }
463
                    },
464
                };
465
                match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
466
                    Ok(sent_packets) => {
467
                        if sent_packets == 0 {
468
                            next_delay = Some(Duration::from_millis(50));
469
                        }
470
                    }
471
                    Err(e) => {
472
                        println!("Dest handler error: {}", e);
473
                        // TODO: I'd prefer a proper cancel request if a transfer is active..
474
                        dest_handler.reset();
475
                        next_delay = Some(Duration::from_millis(50));
476
                    }
477
                }
478
                if let Some(delay) = next_delay {
479
                    thread::sleep(delay);
480
                } else {
481
                    undelayed_call_count += 1;
482
                }
483
                // Safety feature against configuration errors.
484
                if undelayed_call_count >= 200 {
485
                    panic!("Destination handler state machine possible in permanent loop");
486
                }
487
            }
488
        })
489
        .unwrap();
490

            
491
    let jh_udp_server = thread::Builder::new()
492
        .name("cfdp udp server".to_string())
493
        .spawn(move || {
494
            info!("Starting UDP server on {}", remote_addr);
495
            loop {
496
                loop {
497
                    match udp_server.try_recv_tc() {
498
                        Ok(result) => match result {
499
                            Some((pdu, _addr)) => {
500
                                debug!("Received PDU on UDP server: {:?}", pdu);
501
                                pdu_printout(&pdu);
502
                            }
503
                            None => break,
504
                        },
505
                        Err(e) => {
506
                            warn!("UDP server error: {}", e);
507
                            break;
508
                        }
509
                    }
510
                }
511
                udp_server.recv_and_send_telemetry();
512
                thread::sleep(Duration::from_millis(50));
513
            }
514
        })
515
        .unwrap();
516

            
517
    jh_source.join().unwrap();
518
    jh_dest.join().unwrap();
519
    jh_udp_server.join().unwrap();
520
}