mirror of
https://github.com/deepseek-ai/smallpond
synced 2025-06-26 18:27:45 +00:00
41 lines
1.7 KiB
Python
41 lines
1.7 KiB
Python
import os
|
|
|
|
from smallpond.dataframe import Session
|
|
|
|
|
|
def test_shutdown_cleanup(sp: Session):
|
|
assert os.path.exists(sp._runtime_ctx.queue_root), "queue directory should exist"
|
|
assert os.path.exists(sp._runtime_ctx.staging_root), "staging directory should exist"
|
|
assert os.path.exists(sp._runtime_ctx.temp_root), "temp directory should exist"
|
|
|
|
# create some tasks and complete them
|
|
df = sp.from_items([1, 2, 3])
|
|
df.write_parquet(sp._runtime_ctx.output_root)
|
|
sp.shutdown()
|
|
|
|
# shutdown should clean up directories
|
|
assert not os.path.exists(sp._runtime_ctx.queue_root), "queue directory should be cleared"
|
|
assert not os.path.exists(sp._runtime_ctx.staging_root), "staging directory should be cleared"
|
|
assert not os.path.exists(sp._runtime_ctx.temp_root), "temp directory should be cleared"
|
|
with open(sp._runtime_ctx.job_status_path) as fin:
|
|
assert "success" in fin.read(), "job status should be success"
|
|
|
|
|
|
def test_shutdown_no_cleanup_on_failure(sp: Session):
|
|
df = sp.from_items([1, 2, 3])
|
|
try:
|
|
# create a task that will fail
|
|
df.map(lambda x: x / 0).compute()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
raise RuntimeError("task should fail")
|
|
sp.shutdown()
|
|
|
|
# shutdown should not clean up directories
|
|
assert os.path.exists(sp._runtime_ctx.queue_root), "queue directory should not be cleared"
|
|
assert os.path.exists(sp._runtime_ctx.staging_root), "staging directory should not be cleared"
|
|
assert os.path.exists(sp._runtime_ctx.temp_root), "temp directory should not be cleared"
|
|
with open(sp._runtime_ctx.job_status_path) as fin:
|
|
assert "failure" in fin.read(), "job status should be failure"
|