1 /*
2 Copyright (c) 2014-2021 Martin Cejp
3 
4 Boost Software License - Version 1.0 - August 17th, 2003
5 
6 Permission is hereby granted, free of charge, to any person or organization
7 obtaining a copy of the software and accompanying documentation covered by
8 this license (the "Software") to use, reproduce, display, distribute,
9 execute, and transmit the Software, and to prepare derivative works of the
10 Software, and to permit third-parties to whom the Software is furnished to
11 do so, all subject to the following:
12 
13 The copyright notices in the Software and this entire statement, including
14 the above license grant, this restriction and the following disclaimer,
15 must be included in all copies of the Software, in whole or in part, and
16 all derivative works of the Software, unless such copies or derivative
17 works are solely in the form of machine-executable object code generated by
18 a source language processor.
19 
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
23 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
24 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
25 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 DEALINGS IN THE SOFTWARE.
27 */
28 
29 /**
30  * Binary I/O stream interfaces
31  *
32  * Copyright: Martin Cejp 2014-2021.
33  * License: $(LINK2 https://boost.org/LICENSE_1_0.txt, Boost License 1.0).
34  * Authors: Martin Cejp, Timur Gafarov
35  */
36 module dlib.core.stream;
37 
38 import std.bitmanip;
39 import std.stdint;
40 import std.conv;
41 
42 import dlib.core.memory;
43 
44 alias StreamPos = uint64_t;
45 alias StreamSize = uint64_t;
46 alias StreamOffset = int64_t;
47 
48 /// An exception which is throwed on stream errors
49 class SeekException : Exception
50 {
51     this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
52     {
53         super(msg, file, line, next);
54     }
55 }
56 
57 /**
58  * Seekable stream interface
59  *
60  * Description:
61  * Represents a stream container that knows the size of a stream 
62  * and allows to change byte position within the stream.
63  */
64 interface Seekable
65 {
66     /// Returns current position
67     StreamPos getPosition() @property;
68     
69     /**
70      * Attempts to set current position to pos.
71      * Returns true on success, false on failure
72      */
73     bool setPosition(StreamPos pos);
74     
75     /// Returns the size of a stream in bytes
76     StreamSize size();
77 
78     /**
79      * Attempts to set current position to pos.
80      * Throws SeekException on failure
81      */
82     final StreamPos position(StreamPos pos)
83     {
84         if (!setPosition(pos))
85             throw new SeekException("Cannot set Seekable position to " ~ pos.to!string);
86         return pos;
87     }
88 
89     /// ditto
90     final StreamPos position()
91     {
92         return getPosition();
93     }
94 
95     /**
96      * Relatively changes position.
97      * amount defines an offset from the current position (can be negative).
98      * Throws SeekException on failure
99      */
100     final StreamPos seek(StreamOffset amount)
101     {
102         immutable StreamPos seekTo = getPosition() + amount;
103 
104         if (!setPosition(seekTo))
105             throw new SeekException("Cannot set Seekable position to " ~ seekTo.to!string);
106 
107         return seekTo;
108     }
109 }
110 
111 /**
112  * A parent interface for all stream types
113  */
114 interface Stream: Seekable
115 {
116     /// Closes the stream. Closed stream cannot be read or written any more
117     void close();
118     
119     /// Returns true if it is legal to use Seekable functionality on this stream
120     bool seekable();
121 }
122 
123 /**
124  * A stream inteface that allows to read data from it.
125  * Reading any data implies position advance by corresponding number of bytes.
126  * Methods shouldn't throw on EOF, may throw on a more serious error
127  */
128 interface InputStream: Stream
129 {
130     /// Returns true if there are any data to read. false means end of the stream.
131     bool readable();
132     
133     /**
134      * Attempts to read count bytes from stream and stores them in memory 
135      * pointing by buffer. Returns number of bytes actually read
136      */
137     size_t readBytes(void* buffer, size_t count);
138 
139     /**
140      * Attempts to fill an array with raw data from stream. 
141      * Returns true if the array was filled, false otherwise
142      */
143     final bool fillArray(T)(T[] array)
144     {
145         immutable size_t len = array.length * T.sizeof;
146         return readBytes(array.ptr, len) == len;
147     }
148 
149     /**
150      * Reads little-endian integer, converts to native-endian 
151      * and stores in value
152      */
153     final bool readLE(T)(T* value)
154     {
155         ubyte[T.sizeof] buffer;
156 
157         if (readBytes(buffer.ptr, buffer.length) != buffer.length)
158             return false;
159 
160         *value = littleEndianToNative!T(buffer);
161         return true;
162     }
163 
164     /**
165      * Reads big-endian integer, converts to native-endian
166      * and stores in value
167      */
168     final bool readBE(T)(T* value)
169     {
170         ubyte[T.sizeof] buffer;
171 
172         if (readBytes(buffer.ptr, buffer.length) != buffer.length)
173             return false;
174 
175         *value = bigEndianToNative!T(buffer);
176         return true;
177     }
178 }
179 
180 /**
181  * A stream interface that allows to write data into it.
182  * Methods shouldn't throw on full disk, may throw on a more serious error
183  */
184 interface OutputStream: Stream
185 {
186     /// Implementation-specific method. Usually it writes any unwritten data from output buffer
187     void flush();
188     
189     /// Returns true if stream can be written to, false otherwise
190     bool writeable();
191     
192     /**
193      * Attempts to write count bytes from the memory pointed by buffer. 
194      * Returns number of bytes actually written
195      */
196     size_t writeBytes(const void* buffer, size_t count);
197 
198     /** 
199      * Attempts to write an array.
200      * Returns true if all elements were written, false otherwise
201      */
202     final bool writeArray(T)(const T[] array)
203     {
204         immutable size_t len = array.length * T.sizeof;
205         return writeBytes(array.ptr, len) == len;
206     }
207 
208     /**
209      * Attempts to write a string as zero-terminated. 
210      * Returns true if entire string was written, false otherwise
211      */
212     final bool writeStringz(string text)
213     {
214         ubyte[1] zero = [0];
215 
216         return writeBytes(text.ptr, text.length)
217             && writeBytes(zero.ptr, zero.length);
218     }
219 
220     /// Writes an integer in little-endian encoding
221     final bool writeLE(T)(const T value)
222     {
223         ubyte[T.sizeof] buffer = nativeToLittleEndian!T(value);
224 
225         return writeBytes(buffer.ptr, buffer.length) == buffer.length;
226     }
227 
228     /// Writes an integer in big-endian encoding
229     final bool writeBE(T)(const T value)
230     {
231         ubyte[T.sizeof] buffer = nativeToBigEndian!T(value);
232 
233         return writeBytes(buffer.ptr, buffer.length) == buffer.length;
234     }
235 }
236 
237 /**
238  * A stream that allows both reading and writing data
239  */
240 interface IOStream: InputStream, OutputStream
241 {
242 }
243 
244 /**
245  * While input is readable, reads data from input and writes it to output. 
246  * Returns number of bytes read
247  */
248 StreamSize copyFromTo(InputStream input, OutputStream output)
249 {
250     ubyte[0x1000] buffer;
251     StreamSize total = 0;
252 
253     while (input.readable)
254     {
255         size_t have = input.readBytes(buffer.ptr, buffer.length);
256 
257         if (have == 0)
258             break;
259 
260         output.writeBytes(buffer.ptr, have);
261         total += have;
262     }
263 
264     return total;
265 }
266 
267 /**
268  * An InputStream that encapsulates contents of an array
269  */
270 class ArrayStream: InputStream
271 {
272     // TODO: Add OutputStream methods
273     
274     import std.algorithm;
275 
276     /// Constructor. Initializes stream as empty
277     this()
278     {
279     }
280 
281     /**
282      * Constructor. Initializes stream with an array of bytes.
283      * size delimits maximum read size
284      */
285     this(ubyte[] data, size_t size)
286     {
287         assert(size_ <= data.length);
288 
289         this.size_ = size;
290         this.data = data;
291     }
292 
293     /**
294      * Constructor. Initializes stream with an array of bytes
295      */
296     this(ubyte[] data)
297     {
298         this(data, data.length);
299     }
300 
301     override void close()
302     {
303         this.pos = 0;
304         this.size_ = 0;
305         this.data = null;
306     }
307 
308     override bool readable()
309     {
310         return pos < size_;
311     }
312 
313     override size_t readBytes(void* buffer, size_t count)
314     {
315         import core.stdc..string;
316 
317         count = min(count, size_ - pos);
318 
319         // whoops, memcpy out of nowhere, can we do better than that?
320         memcpy(buffer, data.ptr + pos, count);
321 
322         pos += count;
323         return count;
324     }
325 
326     override bool seekable()
327     {
328         return true;
329     }
330 
331     override StreamPos getPosition()
332     {
333         return pos;
334     }
335 
336     override bool setPosition(StreamPos pos)
337     {
338         if (pos > size_)
339             return false;
340 
341         this.pos = cast(size_t)pos;
342         return true;
343     }
344 
345     override StreamSize size()
346     {
347         return size_;
348     }
349 
350     private:
351     size_t pos = 0, size_ = 0;
352     ubyte[] data;       // data.length is capacity
353 }
354 
355 ///
356 unittest
357 {
358     ubyte[] arr = [23,13,42,71,0,1,1,2,3,5,8];
359 
360     auto stream = new ArrayStream(arr);
361     assert(stream.size() == arr.length);
362 
363     ubyte[4] buf;
364     assert(stream.readBytes(buf.ptr, buf.length) == buf.length);
365     assert(buf == [23,13,42,71]);
366     assert(stream.getPosition() == buf.length);
367 
368     assert(stream.setPosition(6));
369     assert(stream.getPosition == 6);
370     assert(stream.readBytes(buf.ptr, buf.length) == buf.length);
371     assert(buf == [1,2,3,5]);
372 
373     assert(stream.readBytes(buf.ptr, buf.length) == 1);
374     assert(buf[0] == 8);
375     assert(!stream.readable);
376 
377     stream.setPosition(1);
378     assert(stream.readable);
379     stream.seek(4);
380     assert(stream.readBytes(buf.ptr, buf.length) == buf.length);
381     assert(buf == [1,1,2,3]);
382 
383     assert(stream.setPosition(arr.length));
384     assert(!stream.setPosition(arr.length+1));
385 
386     stream.close();
387     assert(!stream.readable);
388 }
389 
390 /**
391  * An input range that reads data from InputStream by fixed chunks, 
392  * storing them in user-provided array
393  */
394 struct BufferedStreamReader
395 {
396     InputStream stream;
397     ubyte[] buffer;
398     ubyte[] front;
399     
400     /// Constructor. Initializes range with a stream and a buffer
401     this(InputStream istrm, ubyte[] buffer)
402     {
403         stream = istrm;
404         this.buffer = buffer;
405         popFront();
406     }
407     
408     bool empty = false;
409     
410     void popFront()
411     {
412         empty = !stream.readable();
413         size_t readLen = stream.readBytes(buffer.ptr, buffer.length);
414         if (readLen > 0)
415             front = buffer[0..readLen];
416     }
417 }