Skip to content

File: DepthFlow/Server.py

DepthFlow.Server

PydanticImage

PydanticImage = Union[str, Path, HttpUrl]

DepthInput

Bases: BrokenModel

Source code in Projects/DepthFlow/DepthFlow/Server.py
49
50
51
class DepthInput(BrokenModel):
    image: PydanticImage = DepthScene.DEFAULT_IMAGE
    depth: Optional[PydanticImage] = None

image

image: PydanticImage = DepthScene.DEFAULT_IMAGE

depth

depth: Optional[PydanticImage] = None

DepthPayload

Bases: BrokenModel

Source code in Projects/DepthFlow/DepthFlow/Server.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class DepthPayload(BrokenModel):
    input:     DepthInput     = Field(default_factory=DepthInput)
    estimator: DepthEstimator = Field(default_factory=DepthAnythingV2)
    animation: DepthAnimation = Field(default_factory=DepthAnimation)
    upscaler:  BrokenUpscaler = Field(default_factory=NoUpscaler)
    render:    RenderSettings = Field(default_factory=RenderSettings)
    ffmpeg:    BrokenFFmpeg   = Field(default_factory=BrokenFFmpeg)
    expire:    int            = Field(3600, exclude=True)
    hash:      int            = Field(0, exclude=True)
    priority:  int            = Field(0, exclude=True)

    # Priority queue sorting

    def __lt__(self, other: Self) -> bool:
        return (self.priority > other.priority)

    def __gt__(self, other: Self) -> bool:
        return (self.priority < other.priority)

input

input: DepthInput = Field(default_factory=DepthInput)

estimator

estimator: DepthEstimator = Field(
    default_factory=DepthAnythingV2
)

animation

animation: DepthAnimation = Field(
    default_factory=DepthAnimation
)

upscaler

upscaler: BrokenUpscaler = Field(default_factory=NoUpscaler)

render

render: RenderSettings = Field(
    default_factory=RenderSettings
)

ffmpeg

ffmpeg: BrokenFFmpeg = Field(default_factory=BrokenFFmpeg)

expire

expire: int = Field(3600, exclude=True)

hash

hash: int = Field(0, exclude=True)

priority

priority: int = Field(0, exclude=True)

__lt__

__lt__(other: Self) -> bool
Source code in Projects/DepthFlow/DepthFlow/Server.py
66
67
def __lt__(self, other: Self) -> bool:
    return (self.priority > other.priority)

__gt__

__gt__(other: Self) -> bool
Source code in Projects/DepthFlow/DepthFlow/Server.py
69
70
def __gt__(self, other: Self) -> bool:
    return (self.priority < other.priority)

DepthWorker

Bases: ParallelQueue

Source code in Projects/DepthFlow/DepthFlow/Server.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class DepthWorker(ParallelQueue):
    def worker(self):
        scene = DepthScene(backend="headless")

        for endurance in itertools.count(1):
            task: DepthPayload = self.next()
            print("Rendering payload:", task.json())

            try:
                # The classes are already cooked by fastapi!
                scene.estimator = task.estimator
                scene.animation = task.animation
                scene.upscaler  = task.upscaler
                scene.ffmpeg    = task.ffmpeg
                scene.ffmpeg.empty_audio()
                scene.input(
                    image=task.input.image,
                    depth=task.input.depth
                )

                # Render the video, read contents, delete temp file
                with tempfile.NamedTemporaryFile(
                    suffix=("."+task.render.format),
                    delete=False,
                ) as temp:
                    video: bytes = scene.main(
                        **task.render.dict(),
                        output=Path(temp.name),
                        progress=False
                    )[0].read_bytes()

                    self.done(task, video)

            except Exception as error:
                log.error(f"Error rendering video: {error}")
                self.done(task, error)

            finally:
                with contextlib.suppress(FileNotFoundError):
                    os.unlink(temp.name)

worker

worker()
Source code in Projects/DepthFlow/DepthFlow/Server.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def worker(self):
    scene = DepthScene(backend="headless")

    for endurance in itertools.count(1):
        task: DepthPayload = self.next()
        print("Rendering payload:", task.json())

        try:
            # The classes are already cooked by fastapi!
            scene.estimator = task.estimator
            scene.animation = task.animation
            scene.upscaler  = task.upscaler
            scene.ffmpeg    = task.ffmpeg
            scene.ffmpeg.empty_audio()
            scene.input(
                image=task.input.image,
                depth=task.input.depth
            )

            # Render the video, read contents, delete temp file
            with tempfile.NamedTemporaryFile(
                suffix=("."+task.render.format),
                delete=False,
            ) as temp:
                video: bytes = scene.main(
                    **task.render.dict(),
                    output=Path(temp.name),
                    progress=False
                )[0].read_bytes()

                self.done(task, video)

        except Exception as error:
            log.error(f"Error rendering video: {error}")
            self.done(task, error)

        finally:
            with contextlib.suppress(FileNotFoundError):
                os.unlink(temp.name)

DepthServer

Bases: BrokenFastAPI

Source code in Projects/DepthFlow/DepthFlow/Server.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@define
class DepthServer(BrokenFastAPI):
    cli: BrokenTyper = Factory(lambda: BrokenTyper(chain=True))

    queue: ParallelQueue = None
    """The queue of workers processing tasks"""

    def __attrs_post_init__(self):
        self.api.title = "DepthFlow Project API"
        self.api.version = Runtime.Version
        self.cli.description = DEPTHFLOW_ABOUT

        # Commands
        with self.cli.panel("📦 Server endpoints"):
            self.cli.command(self.launch)
            self.cli.command(self.runpod)

        with self.cli.panel("🚀 Core"):
            self.cli.command(self.config)
            self.cli.command(self.test)

        # Endpoints
        self.api.post("/estimate")(self.estimate)
        self.api.post("/upscale")(self.upscale)
        self.api.post("/render")(self.render)

        # Processing
        self.queue = DepthWorker(
            cache_path=(DEPTHFLOW.DIRECTORIES.CACHE/"ServerRender"),
            cache_size=Environment.float("DEPTHSERVER_CACHE_SIZE_MB", 500),
            size=Environment.int("DEPTHSERVER_WORKERS", 3),
        ).start()

    def config(self,
        workers: WorkersType=3
    ) -> None:
        self.queue.size = workers

    # -------------------------------------------|
    # Routes

    async def estimate(self,
        image: PydanticImage,
        estimator: DepthEstimator=DepthAnythingV2(),
    ) -> Response:
        return Response(
            media_type="image/png",
            content=Image.open(estimator.estimate(image)).tobytes()
        )

    async def upscale(self,
        image: PydanticImage,
        upscaler: BrokenUpscaler=NoUpscaler(),
    ) -> Response:
        return Response(
            media_type="image/png",
            content=upscaler.upscale(image).tobytes()
        )

    async def render(self, task: DepthPayload) -> Response:
        start: float = time.perf_counter()
        task = DepthPayload.load(task)
        task.hash = hash(task)

        for index in itertools.count(1):

            # Video is already cached or finished
            if (video := self.queue.get(task)):
                if isinstance((error := video), Exception):
                    return Response(
                        status_code=500,
                        media_type="text/plain",
                        content=str(error),
                    )

                return Response(
                    media_type=f"video/{task.render.format}",
                    content=video,
                    headers=dict(
                        took=f"{time.perf_counter() - start:.2f}",
                        cached=str(index == 1).lower(),
                    ),
                )

            # Timeout logic to prevent hanging
            elif (start + 30 < time.perf_counter()):
                return Response(
                    status_code=503,
                    media_type="text/plain",
                    content="Request timed out",
                )

            self.queue.put(task)
            await asyncio.sleep(0.100)

    # -------------------------------------------|
    # Testing

    def test(self,
        jobs: Annotated[int, Option("--jobs", "-j",
            help="How many jobs to queue up")]=1,
    ) -> None:
        def request(client: int) -> None:
            config = DepthPayload(
                priority=client,
                input=DepthInput(
                    image="https://w.wallhaven.cc/full/ex/wallhaven-ex1yxk.jpg"
                    # image="/home/tremeschin/plant.jpg"
                ),
                ffmpeg=BrokenFFmpeg().h264_nvenc(),
                render=RenderSettings(
                    ssaa=1.0,
                    width=1280,
                    fps=60,
                    loop=1,
                    time=0.01 + client/1000000,
                ),
                animation=DepthAnimation(
                    steps=[
                        Actions.Orbital(),
                        # Actions.Lens(),
                    ]
                )
            )

            # Debug print the payload
            from rich.pretty import pprint
            pprint(f"POST: {config.json()}")

            # Actually send the job request
            response = requests.post(
                url=f"{self.api_url}/render",
                json=config.dict()
            )

            # Save the video to disk
            Path(path := f"/tmp/video-{client}.mp4") \
                .write_bytes(response.content)

            headers = DotMap(response.headers)
            log.success(f"Saved video to {path}, cached: {headers.cached}")

        # Stress test parallel requests
        with ThreadPoolExecutor(max_workers=10) as pool:
            for worker in range(jobs):
                pool.submit(request, worker)

cli

cli: BrokenTyper = Factory(lambda: BrokenTyper(chain=True))

queue

queue: ParallelQueue = None

The queue of workers processing tasks

__attrs_post_init__

__attrs_post_init__()
Source code in Projects/DepthFlow/DepthFlow/Server.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def __attrs_post_init__(self):
    self.api.title = "DepthFlow Project API"
    self.api.version = Runtime.Version
    self.cli.description = DEPTHFLOW_ABOUT

    # Commands
    with self.cli.panel("📦 Server endpoints"):
        self.cli.command(self.launch)
        self.cli.command(self.runpod)

    with self.cli.panel("🚀 Core"):
        self.cli.command(self.config)
        self.cli.command(self.test)

    # Endpoints
    self.api.post("/estimate")(self.estimate)
    self.api.post("/upscale")(self.upscale)
    self.api.post("/render")(self.render)

    # Processing
    self.queue = DepthWorker(
        cache_path=(DEPTHFLOW.DIRECTORIES.CACHE/"ServerRender"),
        cache_size=Environment.float("DEPTHSERVER_CACHE_SIZE_MB", 500),
        size=Environment.int("DEPTHSERVER_WORKERS", 3),
    ).start()

config

config(workers: WorkersType = 3) -> None
Source code in Projects/DepthFlow/DepthFlow/Server.py
150
151
152
153
def config(self,
    workers: WorkersType=3
) -> None:
    self.queue.size = workers

estimate

estimate(
    image: PydanticImage,
    estimator: DepthEstimator = DepthAnythingV2(),
) -> Response
Source code in Projects/DepthFlow/DepthFlow/Server.py
158
159
160
161
162
163
164
165
async def estimate(self,
    image: PydanticImage,
    estimator: DepthEstimator=DepthAnythingV2(),
) -> Response:
    return Response(
        media_type="image/png",
        content=Image.open(estimator.estimate(image)).tobytes()
    )

upscale

upscale(
    image: PydanticImage,
    upscaler: BrokenUpscaler = NoUpscaler(),
) -> Response
Source code in Projects/DepthFlow/DepthFlow/Server.py
167
168
169
170
171
172
173
174
async def upscale(self,
    image: PydanticImage,
    upscaler: BrokenUpscaler=NoUpscaler(),
) -> Response:
    return Response(
        media_type="image/png",
        content=upscaler.upscale(image).tobytes()
    )

render

render(task: DepthPayload) -> Response
Source code in Projects/DepthFlow/DepthFlow/Server.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
async def render(self, task: DepthPayload) -> Response:
    start: float = time.perf_counter()
    task = DepthPayload.load(task)
    task.hash = hash(task)

    for index in itertools.count(1):

        # Video is already cached or finished
        if (video := self.queue.get(task)):
            if isinstance((error := video), Exception):
                return Response(
                    status_code=500,
                    media_type="text/plain",
                    content=str(error),
                )

            return Response(
                media_type=f"video/{task.render.format}",
                content=video,
                headers=dict(
                    took=f"{time.perf_counter() - start:.2f}",
                    cached=str(index == 1).lower(),
                ),
            )

        # Timeout logic to prevent hanging
        elif (start + 30 < time.perf_counter()):
            return Response(
                status_code=503,
                media_type="text/plain",
                content="Request timed out",
            )

        self.queue.put(task)
        await asyncio.sleep(0.100)

test

test(
    jobs: Annotated[
        int,
        Option(
            --jobs, -j, help="How many jobs to queue up"
        ),
    ] = 1
) -> None
Source code in Projects/DepthFlow/DepthFlow/Server.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def test(self,
    jobs: Annotated[int, Option("--jobs", "-j",
        help="How many jobs to queue up")]=1,
) -> None:
    def request(client: int) -> None:
        config = DepthPayload(
            priority=client,
            input=DepthInput(
                image="https://w.wallhaven.cc/full/ex/wallhaven-ex1yxk.jpg"
                # image="/home/tremeschin/plant.jpg"
            ),
            ffmpeg=BrokenFFmpeg().h264_nvenc(),
            render=RenderSettings(
                ssaa=1.0,
                width=1280,
                fps=60,
                loop=1,
                time=0.01 + client/1000000,
            ),
            animation=DepthAnimation(
                steps=[
                    Actions.Orbital(),
                    # Actions.Lens(),
                ]
            )
        )

        # Debug print the payload
        from rich.pretty import pprint
        pprint(f"POST: {config.json()}")

        # Actually send the job request
        response = requests.post(
            url=f"{self.api_url}/render",
            json=config.dict()
        )

        # Save the video to disk
        Path(path := f"/tmp/video-{client}.mp4") \
            .write_bytes(response.content)

        headers = DotMap(response.headers)
        log.success(f"Saved video to {path}, cached: {headers.cached}")

    # Stress test parallel requests
    with ThreadPoolExecutor(max_workers=10) as pool:
        for worker in range(jobs):
            pool.submit(request, worker)