: bu ana fikri bulunabilir
def test_delete_add_no_close(self):
fname_list = ["foo.txt", "bar.txt", "blu.bla", "sup.bro", "rollah"]
data_list = [''.join([chr(randint(0, 255)) for i in range(100)]) for i in range(len(fname_list))]
# add some files to the zip
with zipfile.ZipFile(TESTFN, "w") as zf:
for fname, data in zip(fname_list, data_list):
zf.writestr(fname, data)
for no in range(0, 2):
with zipfile.ZipFile(TESTFN, "a") as zf:
zf.remove(fname_list[no])
zf.writestr(fname_list[no], data_list[no])
zf.remove(fname_list[no+1])
zf.writestr(fname_list[no+1], data_list[no+1])
# try to access prior deleted/added file and prior last file (which got moved, while delete)
for fname, data in zip(fname_list, data_list):
self.assertEqual(zf.read(fname), data)
My modifiye zipfile modülü ve tam unittest dosyasını: Bu unittest bu hatayı çoğalır kalan parçalar. (Kaldırılan dosyadan sonra saklananlar) Bu yüzden bu kod bölümünü yeniden yazdım ve bu dosyaları/parçaları her seferinde kopyalar. Ayrıca her biri için dosya üstbilgisini (geçerli olduğundan emin olmak için) ve zip dosyasının sonundaki merkezi dizini yeniden yazarım. Benim kaldır işlevi artık şuna benzer:
veya kitaplığın repo https://gist.github.com/FreakyBytes/30a6f9866154d82f1c3863f2e4969cc4 yazıyorum: https://github.com/FreakyBytes/pyCombineArchive
def remove(self, member):
"""Remove a file from the archive. Only works if the ZipFile was opened
with mode 'a'."""
if "a" not in self.mode:
raise RuntimeError('remove() requires mode "a"')
if not self.fp:
raise RuntimeError(
"Attempt to modify ZIP archive that was already closed")
fp = self.fp
# Make sure we have an info object
if isinstance(member, ZipInfo):
# 'member' is already an info object
zinfo = member
else:
# Get info object for member
zinfo = self.getinfo(member)
# start at the pos of the first member (smallest offset)
position = min([info.header_offset for info in self.filelist]) # start at the beginning of first file
for info in self.filelist:
fileheader = info.FileHeader()
# is member after delete one?
if info.header_offset > zinfo.header_offset and info != zinfo:
# rewrite FileHeader and copy compressed data
# Skip the file header:
fp.seek(info.header_offset)
fheader = fp.read(sizeFileHeader)
if fheader[0:4] != stringFileHeader:
raise BadZipFile("Bad magic number for file header")
fheader = struct.unpack(structFileHeader, fheader)
fname = fp.read(fheader[_FH_FILENAME_LENGTH])
if fheader[_FH_EXTRA_FIELD_LENGTH]:
fp.read(fheader[_FH_EXTRA_FIELD_LENGTH])
if zinfo.flag_bits & 0x800:
# UTF-8 filename
fname_str = fname.decode("utf-8")
else:
fname_str = fname.decode("cp437")
if fname_str != info.orig_filename:
if not self._filePassed:
fp.close()
raise BadZipFile(
'File name in directory %r and header %r differ.'
% (zinfo.orig_filename, fname))
# read the actual data
data = fp.read(fheader[_FH_COMPRESSED_SIZE])
# modify info obj
info.header_offset = position
# jump to new position
fp.seek(info.header_offset, 0)
# write fileheader and data
fp.write(fileheader)
fp.write(data)
if zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR:
# Write CRC and file sizes after the file data
fp.write(struct.pack("<LLL", info.CRC, info.compress_size,
info.file_size))
# update position
fp.flush()
position = fp.tell()
elif info != zinfo:
# move to next position
position = position + info.compress_size + len(fileheader) + self._get_data_descriptor_size(info)
# Fix class members with state
self.start_dir = position
self._didModify = True
self.filelist.remove(zinfo)
del self.NameToInfo[zinfo.filename]
# write new central directory (includes truncate)
fp.seek(position, 0)
self._write_central_dir()
fp.seek(self.start_dir, 0) # jump to the beginning of the central directory, so it gets overridden at close()
Sen özünün son revizyonunda tam kod bulabilirsiniz