mirror of
https://github.com/github/awesome-copilot.git
synced 2026-04-30 20:25:55 +00:00
update eval-driven-dev skill (#1434)
* update eval-driven-dev skill * fix: update skill update command to use correct repository path * address comments. * update eval driven dev
This commit is contained in:
@@ -0,0 +1,64 @@
|
||||
# Runnable Example: CLI Application
|
||||
|
||||
**When the app is invoked from the command line** (e.g., `python -m myapp`, a CLI tool with argparse/click).
|
||||
|
||||
**Approach**: Use `asyncio.create_subprocess_exec` to invoke the CLI and capture output.
|
||||
|
||||
```python
|
||||
# pixie_qa/run_app.py
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
from pydantic import BaseModel
|
||||
import pixie
|
||||
|
||||
|
||||
class AppArgs(BaseModel):
|
||||
query: str
|
||||
|
||||
|
||||
class AppRunnable(pixie.Runnable[AppArgs]):
|
||||
"""Drives a CLI application via subprocess."""
|
||||
|
||||
@classmethod
|
||||
def create(cls) -> "AppRunnable":
|
||||
return cls()
|
||||
|
||||
async def run(self, args: AppArgs) -> None:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
sys.executable, "-m", "myapp", "--query", args.query,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError(f"App failed (exit {proc.returncode}): {stderr.decode()}")
|
||||
```
|
||||
|
||||
## When the CLI needs patched dependencies
|
||||
|
||||
If the CLI reads from external services, create a wrapper entry point that patches dependencies before running the real CLI:
|
||||
|
||||
```python
|
||||
# pixie_qa/patched_app.py
|
||||
"""Entry point that patches external deps before running the real CLI."""
|
||||
import myapp.config as config
|
||||
config.redis_url = "mock://localhost"
|
||||
|
||||
from myapp.main import main
|
||||
main()
|
||||
```
|
||||
|
||||
Then point your Runnable at the wrapper:
|
||||
|
||||
```python
|
||||
async def run(self, args: AppArgs) -> None:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
sys.executable, "-m", "pixie_qa.patched_app", "--query", args.query,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||||
```
|
||||
|
||||
**Note**: For CLI apps, `wrap(purpose="input")` injection only works when the app runs in the same process. If using subprocess, you may need to pass test data via environment variables or config files instead.
|
||||
@@ -0,0 +1,126 @@
|
||||
# Runnable Example: FastAPI / Web Server
|
||||
|
||||
**When the app is a web server** (FastAPI, Flask, Starlette) and you need to exercise the full HTTP request pipeline.
|
||||
|
||||
**Approach**: Use `httpx.AsyncClient` with `ASGITransport` to run the ASGI app in-process. This is the fastest and most reliable approach — no subprocess, no port management.
|
||||
|
||||
```python
|
||||
# pixie_qa/run_app.py
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
import pixie
|
||||
|
||||
|
||||
class AppArgs(BaseModel):
|
||||
user_message: str
|
||||
|
||||
|
||||
class AppRunnable(pixie.Runnable[AppArgs]):
|
||||
"""Drives a FastAPI app via in-process ASGI transport."""
|
||||
|
||||
_client: httpx.AsyncClient
|
||||
|
||||
@classmethod
|
||||
def create(cls) -> "AppRunnable":
|
||||
return cls()
|
||||
|
||||
async def setup(self) -> None:
|
||||
from myapp.main import app # your FastAPI/Starlette app instance
|
||||
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
self._client = httpx.AsyncClient(transport=transport, base_url="http://test")
|
||||
|
||||
async def run(self, args: AppArgs) -> None:
|
||||
await self._client.post("/chat", json={"message": args.user_message})
|
||||
|
||||
async def teardown(self) -> None:
|
||||
await self._client.aclose()
|
||||
```
|
||||
|
||||
## ASGITransport skips lifespan events
|
||||
|
||||
`httpx.ASGITransport` does **not** trigger ASGI lifespan events (`startup` / `shutdown`). If the app initializes resources in its lifespan (database connections, caches, service clients), you must replicate that initialization manually in `setup()`:
|
||||
|
||||
```python
|
||||
async def setup(self) -> None:
|
||||
# Manually replicate what the app's lifespan does
|
||||
from myapp.db import get_connection, init_db, seed_data
|
||||
import myapp.main as app_module
|
||||
|
||||
conn = get_connection()
|
||||
init_db(conn)
|
||||
seed_data(conn)
|
||||
app_module.db_conn = conn # set the module-level global the app expects
|
||||
|
||||
transport = httpx.ASGITransport(app=app_module.app)
|
||||
self._client = httpx.AsyncClient(transport=transport, base_url="http://test")
|
||||
|
||||
async def teardown(self) -> None:
|
||||
await self._client.aclose()
|
||||
# Clean up the manually-initialized resources
|
||||
import myapp.main as app_module
|
||||
if hasattr(app_module, "db_conn") and app_module.db_conn:
|
||||
app_module.db_conn.close()
|
||||
```
|
||||
|
||||
## Concurrency with shared mutable state
|
||||
|
||||
If the app uses shared mutable state (in-memory SQLite, file-based DB, global caches), add a semaphore to serialise access:
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
class AppRunnable(pixie.Runnable[AppArgs]):
|
||||
_client: httpx.AsyncClient
|
||||
_sem: asyncio.Semaphore
|
||||
|
||||
@classmethod
|
||||
def create(cls) -> "AppRunnable":
|
||||
inst = cls()
|
||||
inst._sem = asyncio.Semaphore(1)
|
||||
return inst
|
||||
|
||||
async def setup(self) -> None:
|
||||
from myapp.main import app
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
self._client = httpx.AsyncClient(transport=transport, base_url="http://test")
|
||||
|
||||
async def run(self, args: AppArgs) -> None:
|
||||
async with self._sem:
|
||||
await self._client.post("/chat", json={"message": args.user_message})
|
||||
|
||||
async def teardown(self) -> None:
|
||||
await self._client.aclose()
|
||||
```
|
||||
|
||||
Only use the semaphore when needed — if the app uses per-session state keyed by unique IDs (call_sid, session_id), concurrent calls are naturally isolated and no lock is needed.
|
||||
|
||||
## Alternative: External server with httpx
|
||||
|
||||
When the app can't be imported directly (complex startup, `uvicorn.run()` in `__main__`), start it as a subprocess and hit it with HTTP:
|
||||
|
||||
```python
|
||||
class AppRunnable(pixie.Runnable[AppArgs]):
|
||||
_client: httpx.AsyncClient
|
||||
|
||||
@classmethod
|
||||
def create(cls) -> "AppRunnable":
|
||||
return cls()
|
||||
|
||||
async def setup(self) -> None:
|
||||
# Assumes the server is already running (started via run-with-timeout.sh)
|
||||
self._client = httpx.AsyncClient(base_url="http://localhost:8000")
|
||||
|
||||
async def run(self, args: AppArgs) -> None:
|
||||
await self._client.post("/chat", json={"message": args.user_message})
|
||||
|
||||
async def teardown(self) -> None:
|
||||
await self._client.aclose()
|
||||
```
|
||||
|
||||
Start the server before running `pixie trace` or `pixie test`:
|
||||
|
||||
```bash
|
||||
bash resources/run-with-timeout.sh 120 uv run python -m myapp.server
|
||||
sleep 3 # wait for readiness
|
||||
```
|
||||
@@ -0,0 +1,60 @@
|
||||
# Runnable Example: Standalone Function (No Server)
|
||||
|
||||
**When the app is a plain Python function or module** — no web framework, no server, no infrastructure.
|
||||
|
||||
**Approach**: Import and call the function directly from `run()`. This is the simplest case.
|
||||
|
||||
```python
|
||||
# pixie_qa/run_app.py
|
||||
from pydantic import BaseModel
|
||||
import pixie
|
||||
|
||||
|
||||
class AppArgs(BaseModel):
|
||||
question: str
|
||||
|
||||
|
||||
class AppRunnable(pixie.Runnable[AppArgs]):
|
||||
"""Drives a standalone function for tracing and evaluation."""
|
||||
|
||||
@classmethod
|
||||
def create(cls) -> "AppRunnable":
|
||||
return cls()
|
||||
|
||||
async def run(self, args: AppArgs) -> None:
|
||||
from myapp.agent import answer_question
|
||||
await answer_question(args.question)
|
||||
```
|
||||
|
||||
If the function is synchronous, wrap it with `asyncio.to_thread`:
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
async def run(self, args: AppArgs) -> None:
|
||||
from myapp.agent import answer_question
|
||||
await asyncio.to_thread(answer_question, args.question)
|
||||
```
|
||||
|
||||
If the function depends on an external service (e.g., a vector store), the `wrap(purpose="input")` calls you added in Step 2a handle it automatically — the registry injects test data in eval mode.
|
||||
|
||||
### When to use `setup()` / `teardown()`
|
||||
|
||||
Most standalone functions don't need lifecycle methods. Use them only when the function requires a shared resource (e.g., a pre-loaded embedding model, a database connection):
|
||||
|
||||
```python
|
||||
class AppRunnable(pixie.Runnable[AppArgs]):
|
||||
_model: SomeModel
|
||||
|
||||
@classmethod
|
||||
def create(cls) -> "AppRunnable":
|
||||
return cls()
|
||||
|
||||
async def setup(self) -> None:
|
||||
from myapp.models import load_model
|
||||
self._model = load_model()
|
||||
|
||||
async def run(self, args: AppArgs) -> None:
|
||||
from myapp.agent import answer_question
|
||||
await answer_question(args.question, model=self._model)
|
||||
```
|
||||
Reference in New Issue
Block a user