[KERNEL][TEST] Use makeflop instead of mtools

git-svn-id: svn://kolibrios.org@9316 a494cfbc-eb01-0410-851d-a64ba20cac60
This commit is contained in:
Magomed Kostoev (mkostoevr) 2021-11-25 22:00:43 +00:00
parent 96cf187d5e
commit a849cabad0
3 changed files with 808 additions and 12 deletions

View File

@ -83,8 +83,7 @@ def check_tools(tools):
if __name__ == "__main__": if __name__ == "__main__":
# Check available tools # Check available tools
tools = (("mcopy", "mtools"), tools = (("qemu-system-i386", "qemu-system-x86"),
("qemu-system-i386", "qemu-system-x86"),
("fasm", "fasm")) ("fasm", "fasm"))
check_tools(tools) check_tools(tools)
@ -96,17 +95,18 @@ if __name__ == "__main__":
builds_eng = sys.argv[1] builds_eng = sys.argv[1]
execute(f"cp {builds_eng}/data/data/kolibri.img kolibri_test.img") execute(f"cp {builds_eng}/data/data/kolibri.img kolibri_test.img")
# Open the IMG
with open("kolibri_test.img", "rb") as img:
img_data = img.read()
img = common.Floppy(img_data)
# Remove old kernel (may fail if we removed it before so no check here) # Remove old kernel (may fail if we removed it before so no check here)
os.system("mdel -i kolibri_test.img ::kernel.mnt > /dev/null") img.delete_path("kernel.mnt")
# Check free space after kernel remove # Remove unuseful folders
free_clusters = int(subprocess.check_output("mdu -i kolibri_test.img :: -s", shell=True).split()[-1]) img.delete_path("GAMES")
floppy_image_clusters = 2880 img.delete_path("DEMOS")
if floppy_image_clusters - free_clusters < 500: img.delete_path("3D")
# Remove unuseful files from IMG if lesser than 500 sectors
execute("mdeltree -i kolibri_test.img ::GAMES", mute = True)
execute("mdeltree -i kolibri_test.img ::DEMOS", mute = True)
execute("mdeltree -i kolibri_test.img ::3D", mute = True)
# Get test kernel # Get test kernel
if not os.path.exists("kernel.mnt.pretest"): if not os.path.exists("kernel.mnt.pretest"):
@ -120,7 +120,10 @@ if __name__ == "__main__":
execute(f"cp {builds_eng}/data/kernel/trunk/kernel.mnt.pretest kernel.mnt.pretest", mute = True) execute(f"cp {builds_eng}/data/kernel/trunk/kernel.mnt.pretest kernel.mnt.pretest", mute = True)
# Put the kernel into IMG # Put the kernel into IMG
execute("mcopy -D o -i kolibri_test.img kernel.mnt.pretest ::kernel.mnt", mute = True) with open("kernel.mnt.pretest", "rb") as kernel_mnt_pretest:
kernel_mnt_pretest_data = kernel_mnt_pretest.read()
img.add_file_path("kernel.mnt", kernel_mnt_pretest_data)
img.save("kolibri_test.img")
# Collect tests from test folder (not recursively yet) # Collect tests from test folder (not recursively yet)
for test_folder in os.listdir("test"): for test_folder in os.listdir("test"):

View File

@ -9,6 +9,7 @@ import timeit
import time import time
import shlex import shlex
import signal import signal
from . makeflop import Floppy
class TestTimeoutException(Exception): class TestTimeoutException(Exception):
pass pass

View File

@ -0,0 +1,792 @@
#!/usr/bin/env python3
#
# makeflop.py
# Version 3
# Brad Smith, 2019
# http://rainwarrior.ca
#
# Simple file operations for a FAT12 floppy disk image.
# Public domain.
import sys
assert sys.version_info[0] == 3, "Python 3 required."
import struct
import datetime
import os
class Floppy:
"""
Simple file operations for a FAT12 floppy disk image.
Floppy() - creates a blank DOS formatted 1.44MB disk.
Floppy(data) - parses a disk from a given array of bytes.
.open(filename) - loads a file and returns a Floppy from its data.
.save(filename) - saves the disk image to a file.
.flush() - Updates the .data member with any pending changes.
.data - A bytearray of the disk image.
.files() - Returns a list of strings, each is a file or directory. Directories end with a backslash.
.delete_path(path) - Deletes a file or directory (recursive).
.add_dir_path(path) - Creates a new empty directory, if it does not already exist (recursive). Returns cluster of directory, or -1 if failed.
.add_file_path(path,data) - Creates a new file (creating directory if needed) with the given data. Returns False if failed.
.extract_file_path(path) - Returns a bytearray of the file at the given path, None if not found.
.set_volume_id(id=None) - Sets the 32-bit volume ID. Use with no arguments to generate ID from current time.
.set_volume_label(label) - Sets the 11-character volume label.
.boot_info() - Returns a string displaying boot sector information.
.fat_info() - Returns a very long string of all 12-bit FAT entries.
.files_info() - Returns a string displaying the files() list.
.add_all(path,prefix) - Adds all files from local path to disk (uppercased). Use prefix to specify a target directory. Returns False if any failed.
.extract_all(path) - Dumps entire contents of disk to local path.
.find_path(path) - Returns a Floppy.FileEntry.
FileEntry.info() - Returns and information string about a FileEntry.
This class provides some basic interface to a FAT12 floppy disk image.
Some things are fragile, in particular filenames that are too long,
or references to clusters outside the disk may cause exceptions.
The FAT can be accessed directly with some internal functions (see implementation)
and changes will be applied to the stored .data image with .flush().
Example:
f = Floppy.open("disk.img")
print(f.boot_info()) # list boot information about disk
print(f.file_info()) # list files and directories
f.extract_all("disk_dump\\")
f.delete_path("DIR\\FILE.TXT") # delete a file
f.delete_path("DIR") # delete an entire directory
f.add_file_path("DIR\\NEW.TXT",open("new.txt","rb").read()) # add a file, directory automatically created
f.add_dir_path("NEWDIR") # creates a new directory
f.add_all("add\\","ADDED\\") # adds all files from a local directory to to a specified floppy directory.
f.set_volume_id() # generates a new volume ID
f.set_volume_label("MY DISK") # changes the volume label
f.save("new.img")
"""
EMPTY = 0xE5 # incipit value for an empty directory entry
def _filestring(s, length):
"""Creates an ASCII string, padded to length with spaces ($20)."""
b = bytearray(s.encode("ASCII"))
b = b + bytearray([0x20] * (length - len(b)))
if len(b) != length:
raise self.Error("File string '%s' too long? (%d != %d)" % (s,len(b),length))
return b
class Error(Exception):
"""Floppy has had an error."""
pass
class FileEntry:
"""A directory entry for a file."""
def __init__(self, data=None, dir_cluster=-1, dir_index=-1):
"""Unpacks a 32 byte directory entry into a FileEntry structure."""
if data == None:
data = bytearray([Floppy.EMPTY]+([0]*31))
self.data = bytearray(data)
self.path = ""
self.incipit = data[0]
if self.incipit != 0x00 and self.incipit != Floppy.EMPTY:
filename = data[0:8].decode("ASCII")
filename = filename.rstrip(" ")
extension = data[8:11].decode("ASCII")
extension = extension.rstrip(" ")
self.path = filename
if len(extension) > 0:
self.path = self.path + "." + extension
block = struct.unpack("<BHHHHHHHHL",data[11:32])
self.attributes = block[0]
self.write_time = block[6]
self.write_date = block[7]
self.cluster = block[8]
self.size = block[9]
self.dir_cluster = dir_cluster
self.dir_index = dir_index
def compile(self):
"""Commits any changed data to the entry, rebuilds and returns the 32 byte structure."""
filename = ""
extension = ""
period = self.path.find(".")
if (period >= 0):
filename = self.path[0:period]
extension = self.path[period+1:]
else:
filename = self.path
if self.incipit != 0x00 and self.incipit != 0xEF:
self.data[0:8] = Floppy._filestring(filename,8)
self.data[8:11] = Floppy._filestring(extension,3)
else:
self.data[0] = self.incipit
self.data[11] = self.attributes
self.data[22:32] = bytearray(struct.pack("<HHHL",
self.write_time,
self.write_date,
self.cluster,
self.size))
return bytearray(self.data)
def info(self):
"""String of information about a FileEntry."""
s = ""
s += "Path: [%s]\n" % self.path
s += "Incipit: %02X\n" % self.incipit
s += "Attributes: %02X\n" % self.attributes
s += "Write: %04X %04X\n" % (self.write_date, self.write_time)
s += "Cluster: %03X\n" % self.cluster
s += "Size: %d bytes\n" % self.size
s += "Directory: %03X, %d\n" % (self.dir_cluster, self.dir_index)
return s
def fat_time(year, month, day, hour, minute, second):
"""Builds a FAT12 date/time entry."""
date = ((year - 1980) << 9) | ((month) << 5) | day
time = (hour << 11) | (minute << 5) | (second >> 1)
return (date, time)
def fat_time_now():
"""Builds the current time as a FAT12 date/time entry."""
now = datetime.datetime.now()
return Floppy.FileEntry.fat_time(now.year, now.month, now.day, now.hour, now.minute, now.second)
def set_name(self,s):
"""Sets the filename and updates incipit."""
self.path = s
self.incipit = Floppy._filestring(s,12)[0]
def set_now(self):
"""Updates modified date/time to now."""
(date,time) = Floppy.FileEntry.fat_time_now()
self.write_date = date
self.write_time = time
def new_file(name):
"""Generate a new file entry."""
e = Floppy.FileEntry()
e.set_name(name)
e.set_now()
e.attributes = 0x00
return e
def new_dir(name):
"""Generate a new subdirectory entry."""
e = Floppy.FileEntry()
e.set_name(name)
e.set_now()
e.attributes = 0x10
return e
def new_volume(name):
"""Generate a new volume entry."""
e = Floppy.FileEntry()
if len(name) > 8:
name = name[0:8] + "." + name[8:]
e.set_name(name)
e.set_now()
e.attributes = 0x08
return e
def new_terminal():
"""Generate a new directory terminating entry."""
e = Floppy.FileEntry()
e.incipit = 0x00
return e
# blank formatted 1.44MB MS-DOS 5.0 non-system floppy
blank_floppy = [ # boot sector, 2xFAT, "BLANK" volume label, F6 fill
0xEB,0x3C,0x90,0x4D,0x53,0x44,0x4F,0x53,0x35,0x2E,0x30,0x00,0x02,0x01,0x01,0x00,
0x02,0xE0,0x00,0x40,0x0B,0xF0,0x09,0x00,0x12,0x00,0x02,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x29,0xE3,0x16,0x53,0x1B,0x42,0x4C,0x41,0x4E,0x4B,
0x20,0x20,0x20,0x20,0x20,0x20,0x46,0x41,0x54,0x31,0x32,0x20,0x20,0x20,0xFA,0x33,
0xC0,0x8E,0xD0,0xBC,0x00,0x7C,0x16,0x07,0xBB,0x78,0x00,0x36,0xC5,0x37,0x1E,0x56,
0x16,0x53,0xBF,0x3E,0x7C,0xB9,0x0B,0x00,0xFC,0xF3,0xA4,0x06,0x1F,0xC6,0x45,0xFE,
0x0F,0x8B,0x0E,0x18,0x7C,0x88,0x4D,0xF9,0x89,0x47,0x02,0xC7,0x07,0x3E,0x7C,0xFB,
0xCD,0x13,0x72,0x79,0x33,0xC0,0x39,0x06,0x13,0x7C,0x74,0x08,0x8B,0x0E,0x13,0x7C,
0x89,0x0E,0x20,0x7C,0xA0,0x10,0x7C,0xF7,0x26,0x16,0x7C,0x03,0x06,0x1C,0x7C,0x13,
0x16,0x1E,0x7C,0x03,0x06,0x0E,0x7C,0x83,0xD2,0x00,0xA3,0x50,0x7C,0x89,0x16,0x52,
0x7C,0xA3,0x49,0x7C,0x89,0x16,0x4B,0x7C,0xB8,0x20,0x00,0xF7,0x26,0x11,0x7C,0x8B,
0x1E,0x0B,0x7C,0x03,0xC3,0x48,0xF7,0xF3,0x01,0x06,0x49,0x7C,0x83,0x16,0x4B,0x7C,
0x00,0xBB,0x00,0x05,0x8B,0x16,0x52,0x7C,0xA1,0x50,0x7C,0xE8,0x92,0x00,0x72,0x1D,
0xB0,0x01,0xE8,0xAC,0x00,0x72,0x16,0x8B,0xFB,0xB9,0x0B,0x00,0xBE,0xE6,0x7D,0xF3,
0xA6,0x75,0x0A,0x8D,0x7F,0x20,0xB9,0x0B,0x00,0xF3,0xA6,0x74,0x18,0xBE,0x9E,0x7D,
0xE8,0x5F,0x00,0x33,0xC0,0xCD,0x16,0x5E,0x1F,0x8F,0x04,0x8F,0x44,0x02,0xCD,0x19,
0x58,0x58,0x58,0xEB,0xE8,0x8B,0x47,0x1A,0x48,0x48,0x8A,0x1E,0x0D,0x7C,0x32,0xFF,
0xF7,0xE3,0x03,0x06,0x49,0x7C,0x13,0x16,0x4B,0x7C,0xBB,0x00,0x07,0xB9,0x03,0x00,
0x50,0x52,0x51,0xE8,0x3A,0x00,0x72,0xD8,0xB0,0x01,0xE8,0x54,0x00,0x59,0x5A,0x58,
0x72,0xBB,0x05,0x01,0x00,0x83,0xD2,0x00,0x03,0x1E,0x0B,0x7C,0xE2,0xE2,0x8A,0x2E,
0x15,0x7C,0x8A,0x16,0x24,0x7C,0x8B,0x1E,0x49,0x7C,0xA1,0x4B,0x7C,0xEA,0x00,0x00,
0x70,0x00,0xAC,0x0A,0xC0,0x74,0x29,0xB4,0x0E,0xBB,0x07,0x00,0xCD,0x10,0xEB,0xF2,
0x3B,0x16,0x18,0x7C,0x73,0x19,0xF7,0x36,0x18,0x7C,0xFE,0xC2,0x88,0x16,0x4F,0x7C,
0x33,0xD2,0xF7,0x36,0x1A,0x7C,0x88,0x16,0x25,0x7C,0xA3,0x4D,0x7C,0xF8,0xC3,0xF9,
0xC3,0xB4,0x02,0x8B,0x16,0x4D,0x7C,0xB1,0x06,0xD2,0xE6,0x0A,0x36,0x4F,0x7C,0x8B,
0xCA,0x86,0xE9,0x8A,0x16,0x24,0x7C,0x8A,0x36,0x25,0x7C,0xCD,0x13,0xC3,0x0D,0x0A,
0x4E,0x6F,0x6E,0x2D,0x53,0x79,0x73,0x74,0x65,0x6D,0x20,0x64,0x69,0x73,0x6B,0x20,
0x6F,0x72,0x20,0x64,0x69,0x73,0x6B,0x20,0x65,0x72,0x72,0x6F,0x72,0x0D,0x0A,0x52,
0x65,0x70,0x6C,0x61,0x63,0x65,0x20,0x61,0x6E,0x64,0x20,0x70,0x72,0x65,0x73,0x73,
0x20,0x61,0x6E,0x79,0x20,0x6B,0x65,0x79,0x20,0x77,0x68,0x65,0x6E,0x20,0x72,0x65,
0x61,0x64,0x79,0x0D,0x0A,0x00,0x49,0x4F,0x20,0x20,0x20,0x20,0x20,0x20,0x53,0x59,
0x53,0x4D,0x53,0x44,0x4F,0x53,0x20,0x20,0x20,0x53,0x59,0x53,0x00,0x00,0x55,0xAA,
] + \
[0xF0,0xFF,0xFF] + ([0]*(0x1200-3)) + \
[0xF0,0xFF,0xFF] + ([0]*(0x1200-3)) + [
0x42,0x4C,0x41,0x4E,0x4B,0x20,0x20,0x20,0x20,0x20,0x20,0x28,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0xEE,0x7C,0x24,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
] + ([0] * (0x4200-0x2620)) + \
([0xF6] * (0x168000-0x4200))
def __init__(self,data=blank_floppy):
"""Create Floppy() instance from image bytes, or pre-formatted blank DOS floppy by default."""
self.data = bytearray(data)
self._boot_open()
self._fat_open()
def _boot_open(self):
"""Parses the boot sector."""
if (len(self.data) < 38):
raise self.Error("Not enough data in image for boot sector. (%d bytes)" % len(data))
boot = struct.unpack("<HBHBHHBHHH",self.data[11:28])
self.sector_size = boot[0]
self.cluster_sects = boot[1]
self.reserved_sects = boot[2]
self.fat_count = boot[3]
self.root_max = boot[4]
self.sectors = boot[5]
self.fat_sects = boot[7]
self.track_sects = boot[8]
self.heads = boot[9]
self.volume_id = 0
self.volume_label = ""
if self.data[38] == 0x29 and len(self.data) >= 54:
self.volume_id = struct.unpack("<L",self.data[39:43])[0]
self.volume_label = self.data[43:54].decode("ASCII").rstrip(" ")
if (self.sectors * self.sector_size) > len(self.data):
raise self.Error("Not enough data to contain %d x %d byte sectors? (%d bytes)" %
(self.sectors, self.sector_size, len(self.data)))
self.root = self.sector_size * (self.reserved_sects + (self.fat_count * self.fat_sects))
root_sectors = ((self.root_max * 32) + (self.sector_size-1)) // self.sector_size # round up to fill sector
self.cluster2 = self.root + (self.sector_size * root_sectors)
def _boot_flush(self):
"""Commits changes to the boot sector."""
boot = struct.pack("<HBHBHHBHHH",
self.sector_size,
self.cluster_sects,
self.reserved_sects,
self.fat_count,
self.root_max,
self.sectors,
self.data[21],
self.fat_sects,
self.track_sects,
self.heads)
self.data[11:28] = bytearray(boot)
if self.data[38] == 0x29 and len(self.data) >= 54:
self.data[39:43] = bytearray(struct.pack("<L",self.volume_id))
self.data[43:54] = Floppy._filestring(self.volume_label,11)
def boot_info(self):
"""String of information about the boot sector."""
s = ""
s += "Volume Label: [%s]\n" % self.volume_label
s += "Volume ID: %04X\n" % self.volume_id
s += "Sector size: %d bytes\n" % self.sector_size
s += "Cluster size: %d sectors\n" % self.cluster_sects
s += "Reserved sectors: %d\n" % self.reserved_sects
s += "Number of FATs: %d\n" % self.fat_count
s += "Maximum root entries: %d\n" % self.root_max
s += "Total sectors: %d\n" % self.sectors
s += "FAT size: %d sectors\n" % self.fat_sects
s += "Track size: %d sectors\n" % self.track_sects
s += "Heads: %d\n" % self.heads
s += "Root directory: %08X\n" % self.root
s += "Cluster 2: %08X\n" % self.cluster2
return s
def _fat_open(self):
"""Parses the FAT table."""
fat_start = self.reserved_sects * self.sector_size
fat_sects = self.fat_sects * self.sector_size
fat_end = fat_start + fat_sects
# make sure they're in the image
if (self.sectors < (self.reserved_sects + (self.fat_count * self.fat_sects))):
raise self.Error("Not enough sectors to contain %d + %d x %d FAT tables? (%d sectors)" %
(self.reserved_sects, self.fat_count, self.fat_sects, self.sectors))
if (self.fat_count < 1) or (self.fat_sects < 1):
raise self.Error("No FAT tables? (%d x %d FAT sectors)" %
(self.fat_count, self.fat_sects))
# verify FAT tables match
for i in range(1,self.fat_count):
fat2_start = fat_start + (fat_sects * i)
fat2_end = fat2_start + fat_sects
if self.data[fat_start:fat_end] != self.data[fat2_start:fat2_end]:
raise self.Error("FAT mismatch in table %d." % i)
# read FAT 0
self.fat = []
e = fat_start
while (e+2) <= fat_end:
entry = 0
if (len(self.fat) & 1) == 0:
entry = self.data[e+0] | ((self.data[e+1] & 0x0F) << 8)
e += 1
else:
entry = ((self.data[e+0] & 0xF0) >> 4) | (self.data[e+1] << 4)
e += 2
self.fat.append(entry)
def _fat_flush(self):
"""Commits changes to the FAT table."""
fat_start = self.reserved_sects * self.sector_size
fat_sects = self.fat_sects * self.sector_size
fat_end = fat_start + fat_sects
# build FAT 0
e = self.reserved_sects * self.sector_size
for i in range(len(self.fat)):
entry = self.fat[i]
if (i & 1) == 0:
self.data[e+0] = entry & 0xFF
self.data[e+1] = (self.data[e+1] & 0xF0) | ((entry >> 8) & 0x0F)
e += 1
else:
self.data[e+0] = (self.data[e+0] & 0x0F) | ((entry << 4) & 0xF0)
self.data[e+1] = (entry >> 4) & 0xFF
e += 2
# copy to all tables
for i in range(1,self.fat_count):
fat2_start = fat_start + (fat_sects * i)
fat2_end = fat2_start + fat_sects
self.data[fat2_start:fat2_end] = self.data[fat_start:fat_end]
def fat_info(self):
"""String of information about the FAT."""
per_line = 32
s = ""
for i in range(len(self.fat)):
if (i % per_line) == 0:
s += "%03X: " % i
if (i % per_line) == (per_line // 2):
s += " " # extra space to mark 16
s += " %03X" % self.fat[i]
if (i % per_line) == (per_line - 1):
s += "\n"
return s
def _cluster_offset(self, cluster):
"""Image offset of a FAT indexed logical cluster."""
if cluster < 2:
return self.root
return self.cluster2 + (self.sector_size * self.cluster_sects * (cluster-2))
def _read_chain(self, cluster, size):
data = bytearray()
"""Returns up to size bytes from a chain of clusters starting at cluster."""
while cluster < 0xFF0:
offset = self._cluster_offset(cluster)
#print("read_chain(%03X,%d) at %08X" % (cluster,size,offset))
if cluster < 2:
return data + bytearray(self.data[self.root:self.root+size]) # root directory
read = min(size,(self.sector_size * self.cluster_sects))
data = data + bytearray(self.data[offset:offset+read])
size -= read
if (size < 1):
return data
cluster = self.fat[cluster]
return data
def _read_dir_chain(self, cluster):
"""Reads an entire directory chain starting at the given cluster (0 for root)."""
if cluster < 2:
# root directory is contiguous
return self.data[self.root:self.root+(self.root_max*32)]
# directories just occupy as many clusters as in their FAT chain, using a dummy max size
return self._read_chain(cluster, self.sector_size*self.sectors//self.cluster_sects)
def _delete_chain(self, cluster):
"""Deletes a FAT chain."""
while cluster < 0xFF0 and cluster >= 2:
link = self.fat[cluster]
self.fat[cluster] = 0
cluster = link
def _add_chain(self,data):
"""Adds a block of data to the disk and creates its FAT chain. Returns start cluster, or -1 for failure."""
cluster_size = self.sector_size * self.cluster_sects
clusters = (len(data) + cluster_size-1) // cluster_size
if clusters < 1:
clusters = 1
# find a chain of free clusters
chain = []
for i in range(2,len(self.fat)):
if self.fat[i] == 0:
chain.append(i)
if len(chain) >= clusters:
break
if len(chain) < clusters:
return -1 # out of space
# store the FAT chain
start_cluster = chain[0]
for i in range(0,len(chain)-1):
self.fat[chain[i]] = chain[i+1]
self.fat[chain[len(chain)-1]] = 0xFFF
# store the data in the given clusters
data = bytearray(data)
c = 0
while len(data) > 0:
write = min(len(data),cluster_size)
offset = self._cluster_offset(chain[c])
self.data[offset:offset+write] = data[0:write]
c += 1
data = data[write:]
# done
return start_cluster
def _files_dir(self, cluster, path):
"""Returns a list of files in the directory starting at the given cluster. Recursive."""
#print("files_dir(%03X,'%s')" % (cluster, path))
entries = []
directory = self._read_dir_chain(cluster)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],cluster,i)
#print(("entry %d (%08X)\n"%(i,self._dir_entry_offset(cluster,i)))+e.info())
if e.incipit == 0x00: # end of directory
return entries
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x08) != 0: # volume label
continue
if (e.attributes & 0x10) != 0: # subdirectory
if e.path == "." or e.path == "..":
continue
subdir = path + e.path + "\\"
entries.append(subdir)
entries = entries + self._files_dir(e.cluster,subdir)
else:
entries.append(path + e.path)
return entries
def files(self):
"""Returns a list of files in the image."""
root = self.sector_size * (self.reserved_sects + (self.fat_count * self.fat_sects))
return self._files_dir(0,"")
def files_info(self):
"""String of the file list."""
s = ""
for path in self.files():
s += path + "\n"
return s
def _dir_entry_offset(self,cluster,dir_index):
"""Find the offset in self.data to a particular directory entry."""
if (cluster < 2):
return self.root + (32 * dir_index)
if (cluster >= 0xFF0):
raise self.Error("Directory entry %d not in its cluster chain?" % dir_index)
per_cluster = (self.sector_size*self.cluster_sects)//32
if (dir_index < per_cluster): # within this cluster
return self._cluster_offset(cluster) + (32 * dir_index)
# continue to next cluster
return self._dir_entry_offset(self.fat[cluster],dir_index-per_cluster)
def delete_file(self, entry):
"""Deletes a FileEntry."""
self._delete_chain(entry.cluster) # delete its FAT chain
offset = self._dir_entry_offset(entry.dir_cluster,entry.dir_index)
self.data[offset+0] = Floppy.EMPTY # empty this entry
def _find_path_dir(self, cluster, path):
"""Recursive find path, breaking out subdirectories progressively."""
#print("_find_path_dir(%03X,'%s')" % (cluster,path))
separator = path.find("\\")
path_seek = path
path_next = ""
if separator >= 0:
path_seek = path[0:separator]
path_next = path[separator+1:]
directory = self._read_dir_chain(cluster)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],cluster,i)
if e.incipit == 0x00: # end of directory
return None
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x08) != 0: # volume label
continue
if (e.attributes & 0x10) != 0: # subdirectory
if e.path == "." or e.path == "..":
continue
if e.path == path_seek:
if (len(path_next) > 0):
return self._find_path_dir(e.cluster, path_next)
else:
return e
elif e.path == path_seek and path_next == "":
return e
return None
def find_path(self, path):
"""Finds a FileEntry for a given path."""
return self._find_path_dir(0,path)
def _delete_tree(self,de):
"""Recursively deletes directory entries."""
#print("_delete_tree\n" + de.info())
directory = self._read_dir_chain(de.cluster)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],de.cluster,i)
#print(e.info())
if e.incipit == 0x00: # end of directory
return
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x08) != 0: # volume label
continue
if (e.attributes & 0x10) != 0: # subdirectory
if e.path == "." or e.path == "..":
continue
self._delete_tree(e) # recurse
self.delete_file(e)
else:
self.delete_file(e)
def delete_path(self, path):
"""Finds a file or directory and deletes it (recursive), if it exists. Returns True if successful."""
e = self.find_path(path)
if (e == None):
return False
if (e.attributes & 0x10) != 0:
self._delete_tree(e)
self.delete_file(e)
return True
def _add_entry(self, dir_cluster, entry):
"""
Adds an entry to a directory starting at the given cluster, appending a new cluster if needed.
Sets entry.dir_cluster and entry.dir_index to match their new directory.
Returns False if out of space.
"""
#print(("_add_entry(%d):\n"%dir_cluster) + entry.info())
directory = self._read_dir_chain(dir_cluster)
dir_len = len(directory)//32
i = 0
terminal = False
while i < dir_len:
e = self.FileEntry(directory[(i*32):(i*32)+32],dir_cluster,i)
#print(e.info())
if e.incipit == 0x00:
terminal = True # make sure to add another terminal entry after this one
break
if e.incipit == Floppy.EMPTY:
break
i += 1
# extend directory if out of room
if i >= dir_len:
if dir_cluster < 2:
return False # no room in root
# add a zero-filled page to the end of this directory's FAT chain
chain = self._add_chain(bytearray([0]*(self.sector_size*self.cluster_sects)))
if (chain < 0):
return False # no free clusters
tail = dir_cluster
while self.fat[tail] < 0xFF0:
tail = self.fat[tail]
self.fat[tail] = chain
self.fat[chain] = 0xFFF
# insert entry
entry.dir_cluster = dir_cluster
entry.dir_index = i
offset = self._dir_entry_offset(dir_cluster,i)
self.data[offset:offset+32] = entry.compile()
# add a new terminal if needed
if terminal:
i += 1
if i < dir_len: # if it was the last entry, no new terminal is needed
offset = self._dir_entry_offset(dir_cluster,i)
self.data[offset:offset+32] = Floppy.FileEntry.new_terminal().compile()
# success!
return True
def _add_dir_recursive(self, cluster, path):
"""Recursively creates directory, returns cluster of created dir, -1 if failed."""
#print("_add_dir_recursive(%03X,'%s')"%(cluster,path))
separator = path.find("\\")
path_seek = path
path_next = ""
if separator >= 0:
path_seek = path[0:separator]
path_next = path[separator+1:]
directory = self._read_dir_chain(cluster)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],cluster,i)
#print(e.info())
if e.incipit == 0x00: # end of directory
break
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x10) != 0: # subdirectory
if e.path == path_seek: # already exists
if len(path_next) < 1:
return e.cluster # return existing directory
else:
return self._add_dir_recursive(e.cluster,path_next) # keep descending
# not found: create the directory
dp0 = Floppy.FileEntry.new_dir("_") # .
dp1 = Floppy.FileEntry.new_dir("__") # ..
dp1.cluster = cluster
dir_block = dp0.compile() + dp1.compile() + bytearray([0] * ((self.sector_size*self.cluster_sects)-64))
new_cluster = self._add_chain(dir_block)
if new_cluster < 0:
return -1 # out of space
# fix up "." to point to itself
dp0.cluster = new_cluster
offset = self._dir_entry_offset(new_cluster,0)
self.data[offset:offset+32] = dp0.compile()
# fix special directory names (FileEntry.compile() is incapable of . or ..)
self.data[offset+ 0:offset+11] = bytearray(". ".encode("ASCII"))
self.data[offset+32:offset+43] = bytearray(".. ".encode("ASCII"))
# create entry to point to new directory cluster
new_dir = Floppy.FileEntry.new_dir(path_seek)
new_dir.cluster = new_cluster
if not self._add_entry(cluster, new_dir):
self._delete_chain(new_cluster)
return -1 # out of space
# return the entry if tail is reached, or keep descending
if len(path_next) < 1:
return new_cluster
else:
return self._add_dir_recursive(new_cluster,path_next)
def add_dir_path(self, path):
"""
Recursively ensures that the given directory path exists, creating it if necessary.
Path should not end in backslash.
Returns cluster of directory at path, or -1 if failed.
"""
if (len(path) < 1):
return 0 # root
return self._add_dir_recursive(0,path)
def add_file_path(self, path, data):
"""
Adds the given data as a file at the given path.
Will automatically create directories to complete the path.
Returns False if failed.
"""
self.delete_path(path) # remove file if it already exists
dir_path = ""
file_path = path
separator = path.rfind("\\")
if (separator >= 0):
dir_path = path[0:separator]
file_path = path[separator+1:]
dir_cluster = self.add_dir_path(dir_path)
if dir_cluster < 0:
return False # couldn't find or create directory
cluster = self._add_chain(data)
if (cluster < 0):
return False # out of space
entry = Floppy.FileEntry.new_file(file_path)
entry.cluster = cluster
entry.size = len(data)
if not self._add_entry(dir_cluster, entry):
self._delete_chain(cluster)
return False # out of space for directory entry
offset = self._dir_entry_offset(entry.dir_cluster, entry.dir_index)
self.data[offset:offset+32] = entry.compile()
return True
def extract_file_path(self, path):
"""Finds a file and returns all of its data. None on failure."""
e = self.find_path(path)
if e == None:
return None
return self._read_chain(e.cluster, e.size)
def set_volume_id(self, value=None):
"""Sets the volume ID. None for time-based."""
if (value == None):
(date,time) = Floppy.FileEntry.fat_time_now()
value = time | (date << 16)
self.volume_id = value & 0xFFFFFFFF
def set_volume_label(self, label):
"""Sets the volume label. Creates one if necessary."""
self.data[38] = 0x29
self.volume_label = label
self.data[54:62] = Floppy._filestring("FAT12",8)
# adjust existing volume entry in root
directory = self._read_dir_chain(0)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],0,i)
if e.incipit == 0x00: # end of directory
break
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x08) != 0: # existing volume label
offset = self._dir_entry_offset(0,i)
self.data[offset:offset+11] = Floppy._filestring(label,11)
return
# volume entry does not exist in root, add one
self._add_entry(0,Floppy.FileEntry.new_volume(label))
return
def open(filename):
"""Opens a floppy image file and creates a Floppy() instance from it."""
return Floppy(open(filename,"rb").read())
def flush(self):
"""Commits all unfinished changes to self.data image."""
self._fat_flush()
self._boot_flush()
def save(self, filename):
"""Saves image (self.data) to file. Implies flush()."""
self.flush()
open(filename,"wb").write(self.data)
def extract_all(self, out_directory):
"""Extracts all files from image to specified directory."""
for in_path in self.files():
out_path = os.path.join(out_directory,in_path)
out_dir = os.path.dirname(out_path)
if not os.path.exists(out_dir):
try:
os.makedirs(out_dir)
except OSError as e:
if e.errono != errno.EEXIST:
raise
if not in_path.endswith("\\"):
open(out_path,"wb").write(self.extract_file_path(in_path))
print(out_path)
else:
print(out_path)
def add_all(self, in_directory, prefix=""):
"""
Adds all files from specified directory to image.
Files will be uppercased. Long filenames are not checked and will cause an exception.
prefix can be used to prefix a directory path (ending with \) to the added files.
"""
result = True
def dospath(s):
s = s.upper()
s = s.replace("/","\\")
return s
if len(in_directory) < 1:
in_directory = "."
in_directory = os.path.normpath(in_directory) + os.sep
for (root, dirs, files) in os.walk(in_directory):
base = root[len(in_directory):]
for d in dirs:
dir_path = prefix + dospath(os.path.join(base,d))
result = result and (self.add_dir_path(dir_path) >= 0)
print(dir_path + "\\")
for f in files:
file_path = prefix + dospath(os.path.join(base,f))
data = open(os.path.join(root,f),"rb").read()
result = result and self.add_file_path(file_path,data)
print(file_path + " (%d bytes)" % len(data))
return result