From 50d1686b1e8fbe94146de0c131e135ec45b315dd Mon Sep 17 00:00:00 2001 From: Aleksander Cynarski Date: Thu, 16 Apr 2026 19:39:02 +0200 Subject: [PATCH] feat: initial commit --- .env.example | 18 ++ README.md | 304 ++++++++++++++++++ Taskfile.yml | 53 +++ cmd/bot/main.go | 125 +++++++ deploy/Dockerfile | 15 + deploy/docker-compose.yml | 31 ++ go.mod | 19 ++ go.sum | 26 ++ internal/application/dto/incoming_message.go | 17 + internal/application/dto/workflow_result.go | 7 + .../usecase/handle_text_message.go | 115 +++++++ .../usecase/handle_text_message_test.go | 131 ++++++++ .../usecase/handle_voice_message.go | 80 +++++ .../usecase/handle_voice_message_test.go | 99 ++++++ internal/config/config.go | 69 ++++ internal/domain/apperror/errors.go | 25 ++ internal/domain/entity/intent.go | 28 ++ internal/domain/entity/message.go | 29 ++ internal/domain/entity/user.go | 22 ++ internal/domain/entity/workflow.go | 29 ++ internal/domain/port/file_downloader.go | 7 + internal/domain/port/intent_router.go | 10 + internal/domain/port/message_gateway.go | 8 + internal/domain/port/session_store.go | 12 + internal/domain/port/speech_transcriber.go | 7 + internal/domain/port/workflow_dispatcher.go | 10 + internal/infrastructure/n8n/payload.go | 8 + .../infrastructure/n8n/webhook_dispatcher.go | 124 +++++++ .../router/rule_based_router.go | 44 +++ .../infrastructure/speech/audio_converter.go | 63 ++++ .../infrastructure/speech/openai_whisper.go | 89 +++++ .../storage/redis_session_store.go | 62 ++++ .../infrastructure/telegram/bot_gateway.go | 37 +++ .../telegram/file_downloader.go | 48 +++ .../infrastructure/telegram/update_poller.go | 50 +++ internal/interfaces/health_handler.go | 27 ++ internal/interfaces/telegram_handler.go | 74 +++++ test/testutil/fake_file_downloader.go | 25 ++ test/testutil/fake_intent_router.go | 15 + test/testutil/fake_message_gateway.go | 22 ++ test/testutil/fake_session_store.go | 43 +++ test/testutil/fake_speech_transcriber.go | 14 + test/testutil/fake_workflow_dispatcher.go | 19 ++ test/testutil/fixtures.go | 29 ++ 44 files changed, 2089 insertions(+) create mode 100644 .env.example create mode 100644 README.md create mode 100644 Taskfile.yml create mode 100644 cmd/bot/main.go create mode 100644 deploy/Dockerfile create mode 100644 deploy/docker-compose.yml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/application/dto/incoming_message.go create mode 100644 internal/application/dto/workflow_result.go create mode 100644 internal/application/usecase/handle_text_message.go create mode 100644 internal/application/usecase/handle_text_message_test.go create mode 100644 internal/application/usecase/handle_voice_message.go create mode 100644 internal/application/usecase/handle_voice_message_test.go create mode 100644 internal/config/config.go create mode 100644 internal/domain/apperror/errors.go create mode 100644 internal/domain/entity/intent.go create mode 100644 internal/domain/entity/message.go create mode 100644 internal/domain/entity/user.go create mode 100644 internal/domain/entity/workflow.go create mode 100644 internal/domain/port/file_downloader.go create mode 100644 internal/domain/port/intent_router.go create mode 100644 internal/domain/port/message_gateway.go create mode 100644 internal/domain/port/session_store.go create mode 100644 internal/domain/port/speech_transcriber.go create mode 100644 internal/domain/port/workflow_dispatcher.go create mode 100644 internal/infrastructure/n8n/payload.go create mode 100644 internal/infrastructure/n8n/webhook_dispatcher.go create mode 100644 internal/infrastructure/router/rule_based_router.go create mode 100644 internal/infrastructure/speech/audio_converter.go create mode 100644 internal/infrastructure/speech/openai_whisper.go create mode 100644 internal/infrastructure/storage/redis_session_store.go create mode 100644 internal/infrastructure/telegram/bot_gateway.go create mode 100644 internal/infrastructure/telegram/file_downloader.go create mode 100644 internal/infrastructure/telegram/update_poller.go create mode 100644 internal/interfaces/health_handler.go create mode 100644 internal/interfaces/telegram_handler.go create mode 100644 test/testutil/fake_file_downloader.go create mode 100644 test/testutil/fake_intent_router.go create mode 100644 test/testutil/fake_message_gateway.go create mode 100644 test/testutil/fake_session_store.go create mode 100644 test/testutil/fake_speech_transcriber.go create mode 100644 test/testutil/fake_workflow_dispatcher.go create mode 100644 test/testutil/fixtures.go diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..4d77b9c --- /dev/null +++ b/.env.example @@ -0,0 +1,18 @@ +TELEGRAM_BOT_TOKEN=your_bot_token_here +BOT_MODE=polling +TELEGRAM_DEBUG=false + +N8N_BASE_URL=http://localhost:5678 +N8N_AUTH_TOKEN= + +OPENAI_API_KEY=your_openai_key_here +STT_PROVIDER=openai +WHISPER_MODEL=whisper-1 +WHISPER_LANGUAGE= + +REDIS_URL=redis://localhost:6379 +SESSION_TTL=24 + +LOG_LEVEL=info +LOG_FORMAT=json +SERVER_PORT=8080 diff --git a/README.md b/README.md new file mode 100644 index 0000000..d41c066 --- /dev/null +++ b/README.md @@ -0,0 +1,304 @@ +# gw_telegram + +Bramka Telegram do n8n i systemów agentów AI. Bot przyjmuje wiadomości tekstowe i głosowe od użytkowników i routuje je do odpowiednich workflow w n8n lub agentów AI. + +## Funkcje + +- **Wiadomości tekstowe** — routing oparty na regułach regex do dowolnego workflow n8n +- **Wiadomości głosowe** — pipeline: pobieranie OGG → konwersja ffmpeg (WAV 16kHz) → transkrypcja Whisper API → routing jak tekst +- **Sesje użytkowników** — historia konwersacji przechowywana w Redis z konfigurowalnym TTL +- **Graceful shutdown** — obsługa sygnałów `SIGINT`/`SIGTERM`, bot kończy obsługę aktualnych wiadomości przed zatrzymaniem +- **Tryby pracy** — polling (domyślny) lub webhook + +## Architektura + +Projekt stosuje **Clean Architecture** z pełnym oddzieleniem warstw: + +``` +cmd/bot/main.go # composition root — łączy wszystkie warstwy +internal/ + domain/ # logika biznesowa, zero zależności zewnętrznych + entity/ # modele: Message, Session, Intent, Route, WorkflowRequest + port/ # interfejsy (bramy zależności) + apperror/ # typowane błędy domenowe + application/ # przypadki użycia + usecase/ # HandleTextMessage, HandleVoiceMessage + dto/ # obiekty transferu danych + infrastructure/ # implementacje portów + telegram/ # BotGateway, UpdatePoller, FileDownloader + n8n/ # WebhookDispatcher + speech/ # OpenAIWhisper, FFmpegConverter + router/ # RuleBasedRouter + storage/ # RedisSessionStore + interfaces/ # warstwa wejściowa + telegram_handler.go # mapowanie update → use case + health_handler.go # endpointy /health i /ready + config/ # ładowanie konfiguracji z env +test/testutil/ # fake'i dla wszystkich portów (TDD) +deploy/ # Dockerfile, docker-compose.yml +``` + +### Pipeline wiadomości głosowej + +``` +Telegram Voice Update + │ + ▼ +TelegramHandler.Handle() + │ + ▼ +HandleVoiceMessage.Execute() + 1. SendTyping (wskaźnik pisania) + 2. FileDownloader.Download(fileID) → OGG Opus bytes + 3. FFmpegConverter.Convert() → WAV 16kHz mono + 4. OpenAIWhisper.Transcribe() → tekst + │ + ▼ +HandleTextMessage.Execute() → routing → n8n → odpowiedź +``` + +### Kontrakt JSON z n8n + +Bot wysyła do webhooka n8n: + +```json +{ + "request_id": "uuid-v4", + "chat_id": 123456789, + "user_id": 987654321, + "username": "jankowalski", + "message_text": "Gdzie jest moje zamówienie #12345?", + "intent_name": "order_inquiry", + "timestamp": "2026-04-16T10:00:00Z", + "metadata": { + "route_target_type": "n8n" + } +} +``` + +n8n odpowiada: + +```json +{ + "reply": "Twoje zamówienie #12345 jest w drodze.", + "actions": [], + "next_workflow": null +} +``` + +Jeśli n8n zwróci surowy tekst (nie JSON), bot wyśle go bezpośrednio do użytkownika. + +## Wymagania + +- Go 1.22+ +- Redis 7+ +- ffmpeg (wymagany do obsługi wiadomości głosowych) +- [Task](https://taskfile.dev) (`go install github.com/go-task/task/v3/cmd/task@latest`) +- Token bota Telegram ([@BotFather](https://t.me/BotFather)) +- Klucz API OpenAI (do transkrypcji głosu) +- Działająca instancja n8n + +## Konfiguracja + +Konfiguracja odbywa się wyłącznie przez zmienne środowiskowe. Skopiuj `.env.example` jako punkt startowy: + +```bash +cp .env.example .env +``` + +### Zmienne środowiskowe + +#### Bot Telegram (wymagane) + +| Zmienna | Domyślna | Opis | +|---|---|---| +| `TELEGRAM_BOT_TOKEN` | — | Token bota z @BotFather | +| `BOT_MODE` | `polling` | Tryb pracy: `polling` lub `webhook` | +| `TELEGRAM_WEBHOOK_URL` | — | URL webhooka (tylko dla `BOT_MODE=webhook`) | +| `TELEGRAM_DEBUG` | `false` | Włącza szczegółowe logi Telegram API | + +#### n8n (wymagane) + +| Zmienna | Domyślna | Opis | +|---|---|---| +| `N8N_BASE_URL` | — | Bazowy URL instancji n8n, np. `http://localhost:5678` | +| `N8N_AUTH_TOKEN` | — | Token Bearer do uwierzytelniania webhooka | +| `N8N_TIMEOUT` | `30` | Timeout żądania do n8n (sekundy) | +| `N8N_RETRY_COUNT` | `3` | Liczba ponownych prób przy błędzie | + +#### Speech-to-Text + +| Zmienna | Domyślna | Opis | +|---|---|---| +| `STT_PROVIDER` | `openai` | Dostawca transkrypcji: `openai` | +| `OPENAI_API_KEY` | — | Klucz API OpenAI (wymagany jeśli `STT_PROVIDER=openai`) | +| `WHISPER_MODEL` | `whisper-1` | Model Whisper | +| `WHISPER_LANGUAGE` | — | Kod języka BCP-47 (np. `pl`, `en`). Puste = autodetekcja | +| `FFMPEG_PATH` | `ffmpeg` | Ścieżka do binarki ffmpeg | + +#### Redis + +| Zmienna | Domyślna | Opis | +|---|---|---| +| `REDIS_URL` | `redis://localhost:6379` | URL połączenia z Redis | +| `SESSION_TTL` | `24` | Czas życia sesji użytkownika (godziny) | + +#### Serwer HTTP + +| Zmienna | Domyślna | Opis | +|---|---|---| +| `SERVER_PORT` | `8080` | Port serwera HTTP (`/health`, `/ready`) | + +#### Logowanie + +| Zmienna | Domyślna | Opis | +|---|---|---| +| `LOG_LEVEL` | `info` | Poziom logów: `debug`, `info`, `warn`, `error` | +| `LOG_FORMAT` | `json` | Format logów: `json` lub `text` | + +## Uruchomienie + +### Lokalne (bez Dockera) + +Wymagania: Go 1.22+, Redis, ffmpeg zainstalowane lokalnie. + +```bash +# 1. Sklonuj i przejdź do katalogu +git clone +cd gw_telegram + +# 2. Pobierz zależności +go mod download + +# 3. Skonfiguruj zmienne środowiskowe +cp .env.example .env +# edytuj .env — uzupełnij TELEGRAM_BOT_TOKEN, N8N_BASE_URL, OPENAI_API_KEY + +# 4. Eksportuj zmienne +export $(grep -v '^#' .env | xargs) + +# 5. Uruchom +task run +``` + +### Docker Compose (zalecane) + +```bash +# 1. Skonfiguruj zmienne +cp .env.example .env +# uzupełnij .env + +# 2. Zbuduj i uruchom +task docker:up + +# 3. Sprawdź logi +docker compose -f deploy/docker-compose.yml logs -f bot + +# 4. Zatrzymaj +task docker:down +``` + +Docker Compose uruchamia bota razem z Redisem. Redis automatycznie persystuje dane sesji w woluminie `redis_data`. + +### Binarka produkcyjna + +```bash +task build +# binarka: ./bin/bot + +TELEGRAM_BOT_TOKEN=xxx N8N_BASE_URL=http://n8n:5678 ./bin/bot +``` + +## Routing wiadomości + +Routing jest oparty na regułach regex z priorytetem. Reguły są definiowane w kodzie (`cmd/bot/main.go`) i dopasowywane od najwyższego priorytetu. Pierwsza pasująca reguła wygrywa. + +### Domyślne reguły + +| Pattern | Intent | Target | Priorytet | +|---|---|---|---| +| `^/start` | `start` | builtin | 100 | +| `^/help` | `help` | builtin | 100 | +| `.*` | `general_query` | n8n: `default` | 0 | + +### Dodawanie nowego workflow n8n + +1. Dodaj regułę w `cmd/bot/main.go`: + +```go +{ + Pattern: regexp.MustCompile(`(?i)zamówienie|order`), + IntentName: "order_inquiry", + Target: entity.RouteTarget{ + Type: entity.RouteTargetN8n, + WorkflowID: "order-webhook", + }, + Priority: 50, +}, +``` + +2. Dodaj konfigurację workflow w mapie `workflows`: + +```go +workflows := map[string]n8n.WorkflowConfig{ + "default": { + WebhookURL: cfg.N8n.BaseURL + "/webhook/default", + AuthToken: cfg.N8n.AuthToken, + }, + "order-webhook": { + WebhookURL: cfg.N8n.BaseURL + "/webhook/order-inquiry", + AuthToken: cfg.N8n.AuthToken, + }, +} +``` + +## Testy + +```bash +# Testy jednostkowe +task test + +# Testy z raportem pokrycia (HTML) +task test:cover +# otwórz coverage.html + +# Testy integracyjne (wymaga Dockera) +task test:int +``` + +Projekt stosuje TDD. Każdy przypadek użycia ma testy oparte na ręcznie pisanych fake'ach portów (`test/testutil/`), bez generowanych mocków. + +### Pokrycie testami + +| Pakiet | Testowane scenariusze | +|---|---| +| `HandleTextMessage` | Happy path, pusty tekst, brak trasy, błąd dispatchera, persystencja sesji | +| `HandleVoiceMessage` | Happy path, błąd pobierania pliku, błąd transkrypcji, konwersja audio | + +## Linting + +```bash +task lint +``` + +Projekt używa `golangci-lint`. Konfiguracja w `.golangci.yml`. + +## Health Check + +Serwer HTTP (`SERVER_PORT`) udostępnia: + +- `GET /health` — zawsze zwraca `200 OK` jeśli proces działa +- `GET /ready` — zwraca `200 OK` gdy połączenie z Telegram API i Redis jest aktywne + +Przydatne do probes w Kubernetes i healthcheck w Docker Compose. + +## Zależności + +| Biblioteka | Zastosowanie | +|---|---| +| `go-telegram-bot-api/v5` | Telegram Bot API (polling + wysyłanie wiadomości) | +| `kelseyhightower/envconfig` | Ładowanie konfiguracji ze zmiennych środowiskowych | +| `redis/go-redis/v9` | Klient Redis do przechowywania sesji | +| `google/uuid` | Generowanie RequestID | +| `stretchr/testify` | Asercje w testach | +| `log/slog` (stdlib) | Strukturalne logowanie JSON | diff --git a/Taskfile.yml b/Taskfile.yml new file mode 100644 index 0000000..ec3b41b --- /dev/null +++ b/Taskfile.yml @@ -0,0 +1,53 @@ +version: "3" + +tasks: + build: + desc: Build the bot binary + cmds: + - go build -o bin/bot ./cmd/bot + + test: + desc: Run unit tests + cmds: + - go test ./... + + test:cover: + desc: Run unit tests with HTML coverage report + cmds: + - go test -coverprofile=coverage.out ./... + - go tool cover -html=coverage.out -o coverage.html + + test:int: + desc: Run integration tests (requires Docker) + cmds: + - go test -tags=integration ./test/integration/... + + lint: + desc: Run golangci-lint + cmds: + - golangci-lint run ./... + + run: + desc: Run the bot locally + cmds: + - go run ./cmd/bot + + tidy: + desc: Tidy go modules + cmds: + - go mod tidy + + docker:build: + desc: Build Docker image + cmds: + - docker build -t gw_telegram -f deploy/Dockerfile . + + docker:up: + desc: Start services via docker compose + cmds: + - docker compose -f deploy/docker-compose.yml up -d + + docker:down: + desc: Stop services via docker compose + cmds: + - docker compose -f deploy/docker-compose.yml down diff --git a/cmd/bot/main.go b/cmd/bot/main.go new file mode 100644 index 0000000..78b28cf --- /dev/null +++ b/cmd/bot/main.go @@ -0,0 +1,125 @@ +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "regexp" + "syscall" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" + "github.com/redis/go-redis/v9" + + "github.com/paramah/gw_telegram/internal/application/usecase" + "github.com/paramah/gw_telegram/internal/config" + "github.com/paramah/gw_telegram/internal/domain/entity" + "github.com/paramah/gw_telegram/internal/infrastructure/n8n" + "github.com/paramah/gw_telegram/internal/infrastructure/router" + "github.com/paramah/gw_telegram/internal/infrastructure/speech" + "github.com/paramah/gw_telegram/internal/infrastructure/storage" + infratelegram "github.com/paramah/gw_telegram/internal/infrastructure/telegram" + "github.com/paramah/gw_telegram/internal/interfaces" +) + +func main() { + cfg := config.MustLoad() + logger := newLogger(cfg.Log) + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + // Telegram bot + bot, err := tgbotapi.NewBotAPI(cfg.Bot.Token) + if err != nil { + logger.Error("failed to create telegram bot", "error", err) + os.Exit(1) + } + bot.Debug = cfg.Bot.Debug + logger.Info("telegram bot authorized", "username", bot.Self.UserName) + + // Redis + redisOpts, err := redis.ParseURL(cfg.Redis.URL) + if err != nil { + logger.Error("invalid redis url", "error", err) + os.Exit(1) + } + redisClient := redis.NewClient(redisOpts) + + // Infrastructure + gateway := infratelegram.NewBotGateway(bot, logger) + downloader := infratelegram.NewTelegramFileDownloader(bot) + sessionStore := storage.NewRedisSessionStore(redisClient, cfg.Redis.TTLHours) + + // n8n workflows (configure via env: N8N_WORKFLOW_=) + workflows := map[string]n8n.WorkflowConfig{ + "default": { + ID: "default", + WebhookURL: cfg.N8n.BaseURL + "/webhook/default", + AuthToken: cfg.N8n.AuthToken, + }, + } + dispatcher := n8n.NewWebhookDispatcher(workflows, logger) + + // Intent router with default rules + rules := []router.Rule{ + { + Pattern: regexp.MustCompile(`(?i)^/start`), + IntentName: "start", + Target: entity.RouteTarget{Type: entity.RouteTargetBuiltin, WorkflowID: "start"}, + Priority: 100, + }, + { + Pattern: regexp.MustCompile(`(?i)^/help`), + IntentName: "help", + Target: entity.RouteTarget{Type: entity.RouteTargetBuiltin, WorkflowID: "help"}, + Priority: 100, + }, + { + Pattern: regexp.MustCompile(`.*`), + IntentName: "general_query", + Target: entity.RouteTarget{Type: entity.RouteTargetN8n, WorkflowID: "default"}, + Priority: 0, + }, + } + intentRouter := router.NewRuleBasedRouter(rules) + + // Speech + transcriber := speech.NewOpenAIWhisper(cfg.Speech.OpenAIKey, cfg.Speech.WhisperModel, cfg.Speech.Language) + converter := speech.NewFFmpegConverter(cfg.Speech.FFmpegPath, "") + + // Use cases + textUC := usecase.NewHandleTextMessage(intentRouter, dispatcher, sessionStore, gateway, logger) + voiceUC := usecase.NewHandleVoiceMessage(downloader, converter, transcriber, textUC, gateway, logger) + + // Handler + poller + handler := interfaces.NewTelegramHandler(textUC, voiceUC, logger) + poller := infratelegram.NewUpdatePoller(bot, handler, logger) + + logger.Info("starting bot", "mode", cfg.Bot.Mode) + if err := poller.Start(ctx); err != nil && err != context.Canceled { + logger.Error("poller stopped with error", "error", err) + os.Exit(1) + } + + logger.Info("bot stopped gracefully") +} + +func newLogger(cfg config.LogConfig) *slog.Logger { + var level slog.Level + switch cfg.Level { + case "debug": + level = slog.LevelDebug + case "warn": + level = slog.LevelWarn + case "error": + level = slog.LevelError + default: + level = slog.LevelInfo + } + opts := &slog.HandlerOptions{Level: level} + if cfg.Format == "text" { + return slog.New(slog.NewTextHandler(os.Stdout, opts)) + } + return slog.New(slog.NewJSONHandler(os.Stdout, opts)) +} diff --git a/deploy/Dockerfile b/deploy/Dockerfile new file mode 100644 index 0000000..0000d44 --- /dev/null +++ b/deploy/Dockerfile @@ -0,0 +1,15 @@ +# Stage 1: Build +FROM golang:1.22-alpine AS builder +WORKDIR /build +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-w -s" -o /bot ./cmd/bot + +# Stage 2: Runtime +FROM alpine:3.19 +RUN apk add --no-cache ffmpeg ca-certificates tzdata +COPY --from=builder /bot /bot +RUN adduser -D -u 1001 botuser +USER botuser +ENTRYPOINT ["/bot"] diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml new file mode 100644 index 0000000..d90e0b3 --- /dev/null +++ b/deploy/docker-compose.yml @@ -0,0 +1,31 @@ +services: + bot: + build: + context: .. + dockerfile: deploy/Dockerfile + restart: unless-stopped + environment: + - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN} + - N8N_BASE_URL=${N8N_BASE_URL} + - N8N_AUTH_TOKEN=${N8N_AUTH_TOKEN} + - REDIS_URL=redis://redis:6379 + - OPENAI_API_KEY=${OPENAI_API_KEY} + - BOT_MODE=polling + - LOG_LEVEL=info + depends_on: + redis: + condition: service_healthy + + redis: + image: redis:7-alpine + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + volumes: + - redis_data:/data + +volumes: + redis_data: diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d8c0b9e --- /dev/null +++ b/go.mod @@ -0,0 +1,19 @@ +module github.com/paramah/gw_telegram + +go 1.22 + +require ( + github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 + github.com/google/uuid v1.6.0 + github.com/kelseyhightower/envconfig v1.4.0 + github.com/redis/go-redis/v9 v9.5.1 + github.com/stretchr/testify v1.9.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..19581b8 --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 h1:wG8n/XJQ07TmjbITcGiUaOtXxdrINDz1b0J1w0SzqDc= +github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1/go.mod h1:A2S0CWkNylc2phvKXWBBdD3K0iGnDBGbzRpISP2zBl8= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= +github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/application/dto/incoming_message.go b/internal/application/dto/incoming_message.go new file mode 100644 index 0000000..026eb9c --- /dev/null +++ b/internal/application/dto/incoming_message.go @@ -0,0 +1,17 @@ +package dto + +import "time" + +type IncomingMessageDTO struct { + MessageID int64 + ChatID int64 + UserID int64 + Username string + FirstName string + LastName string + Text string + VoiceFileID string + IsVoice bool + Timestamp time.Time + Language string +} diff --git a/internal/application/dto/workflow_result.go b/internal/application/dto/workflow_result.go new file mode 100644 index 0000000..f854dc7 --- /dev/null +++ b/internal/application/dto/workflow_result.go @@ -0,0 +1,7 @@ +package dto + +type WorkflowResultDTO struct { + RequestID string + ReplyText string + Error error +} diff --git a/internal/application/usecase/handle_text_message.go b/internal/application/usecase/handle_text_message.go new file mode 100644 index 0000000..68414b2 --- /dev/null +++ b/internal/application/usecase/handle_text_message.go @@ -0,0 +1,115 @@ +package usecase + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/paramah/gw_telegram/internal/application/dto" + "github.com/paramah/gw_telegram/internal/domain/apperror" + "github.com/paramah/gw_telegram/internal/domain/entity" + "github.com/paramah/gw_telegram/internal/domain/port" +) + +type HandleTextMessage struct { + router port.IntentRouter + dispatcher port.WorkflowDispatcher + sessions port.SessionStore + gateway port.MessageGateway + logger *slog.Logger +} + +func NewHandleTextMessage( + router port.IntentRouter, + dispatcher port.WorkflowDispatcher, + sessions port.SessionStore, + gateway port.MessageGateway, + logger *slog.Logger, +) *HandleTextMessage { + return &HandleTextMessage{ + router: router, + dispatcher: dispatcher, + sessions: sessions, + gateway: gateway, + logger: logger, + } +} + +func (h *HandleTextMessage) Execute(ctx context.Context, in dto.IncomingMessageDTO) error { + if in.Text == "" { + return apperror.ErrMessageEmpty + } + + msg := entity.Message{ + MessageID: in.MessageID, + ChatID: in.ChatID, + UserID: in.UserID, + Username: in.Username, + Type: entity.MessageTypeText, + Text: in.Text, + Timestamp: in.Timestamp, + Metadata: map[string]any{"language": in.Language}, + } + + session, err := h.sessions.Get(ctx, in.UserID) + if err != nil { + h.logger.WarnContext(ctx, "session not found, using empty session", "user_id", in.UserID) + session = entity.Session{ + UserID: in.UserID, + ChatID: in.ChatID, + Data: make(map[string]any), + UpdatedAt: time.Now(), + } + } + + route, err := h.router.Route(ctx, msg) + if err != nil { + h.logger.ErrorContext(ctx, "routing failed", "error", err, "user_id", in.UserID) + _ = h.gateway.SendText(ctx, in.ChatID, "Sorry, I couldn't process your message. Please try again.") + return fmt.Errorf("handle text message: route: %w", err) + } + + session.History = append(session.History, msg) + if len(session.History) > 20 { + session.History = session.History[len(session.History)-20:] + } + + req := entity.WorkflowRequest{ + RequestID: uuid.New().String(), + ChatID: in.ChatID, + UserID: in.UserID, + Username: in.Username, + MessageText: in.Text, + Intent: entity.Intent{ + Name: route.IntentName, + }, + Session: session, + Timestamp: in.Timestamp, + Metadata: map[string]any{"route_target_type": string(route.Target.Type)}, + } + + _ = h.gateway.SendTyping(ctx, in.ChatID) + + resp, err := h.dispatcher.Dispatch(ctx, req) + if err != nil { + h.logger.ErrorContext(ctx, "dispatch failed", "error", err, "request_id", req.RequestID) + _ = h.gateway.SendText(ctx, in.ChatID, "Sorry, the service is temporarily unavailable. Please try again later.") + return fmt.Errorf("handle text message: dispatch: %w", err) + } + + if resp.ReplyText != "" { + if err := h.gateway.SendText(ctx, in.ChatID, resp.ReplyText); err != nil { + return fmt.Errorf("handle text message: send reply: %w", err) + } + } + + session.CurrentWorkflow = route.Target.WorkflowID + session.UpdatedAt = time.Now() + if err := h.sessions.Set(ctx, session); err != nil { + h.logger.WarnContext(ctx, "failed to persist session", "error", err, "user_id", in.UserID) + } + + return nil +} diff --git a/internal/application/usecase/handle_text_message_test.go b/internal/application/usecase/handle_text_message_test.go new file mode 100644 index 0000000..d7b6903 --- /dev/null +++ b/internal/application/usecase/handle_text_message_test.go @@ -0,0 +1,131 @@ +package usecase_test + +import ( + "context" + "errors" + "log/slog" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/paramah/gw_telegram/internal/application/dto" + "github.com/paramah/gw_telegram/internal/application/usecase" + "github.com/paramah/gw_telegram/internal/domain/apperror" + "github.com/paramah/gw_telegram/internal/domain/entity" + "github.com/paramah/gw_telegram/test/testutil" +) + +func newTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) +} + +func validTextDTO() dto.IncomingMessageDTO { + return dto.IncomingMessageDTO{ + MessageID: 1, + ChatID: 100, + UserID: 200, + Username: "testuser", + Text: "Hello, I need help", + Timestamp: time.Now(), + Language: "en", + } +} + +func TestHandleTextMessage_Execute_HappyPath(t *testing.T) { + router := &testutil.FakeIntentRouter{ + RouteResult: entity.Route{ + IntentName: "general_query", + Target: entity.RouteTarget{ + Type: entity.RouteTargetN8n, + WorkflowID: "general-webhook", + }, + }, + } + dispatcher := &testutil.FakeWorkflowDispatcher{ + Response: entity.WorkflowResponse{ + ReplyText: "Here is my response", + }, + } + sessions := &testutil.FakeSessionStore{} + gateway := &testutil.FakeMessageGateway{} + + uc := usecase.NewHandleTextMessage(router, dispatcher, sessions, gateway, newTestLogger()) + err := uc.Execute(context.Background(), validTextDTO()) + + require.NoError(t, err) + assert.Equal(t, 1, dispatcher.CallCount) + assert.Equal(t, "Here is my response", gateway.LastSentText) + assert.Equal(t, int64(100), gateway.LastChatID) +} + +func TestHandleTextMessage_Execute_EmptyText(t *testing.T) { + uc := usecase.NewHandleTextMessage( + &testutil.FakeIntentRouter{}, + &testutil.FakeWorkflowDispatcher{}, + &testutil.FakeSessionStore{}, + &testutil.FakeMessageGateway{}, + newTestLogger(), + ) + + in := validTextDTO() + in.Text = "" + err := uc.Execute(context.Background(), in) + + assert.ErrorIs(t, err, apperror.ErrMessageEmpty) +} + +func TestHandleTextMessage_Execute_RouteNotFound(t *testing.T) { + router := &testutil.FakeIntentRouter{Error: apperror.ErrRouteNotFound} + dispatcher := &testutil.FakeWorkflowDispatcher{} + sessions := &testutil.FakeSessionStore{} + gateway := &testutil.FakeMessageGateway{} + + uc := usecase.NewHandleTextMessage(router, dispatcher, sessions, gateway, newTestLogger()) + err := uc.Execute(context.Background(), validTextDTO()) + + assert.Error(t, err) + assert.True(t, errors.Is(err, apperror.ErrRouteNotFound)) + assert.Equal(t, 0, dispatcher.CallCount) + assert.NotEmpty(t, gateway.LastSentText) // error message sent to user +} + +func TestHandleTextMessage_Execute_DispatchError(t *testing.T) { + router := &testutil.FakeIntentRouter{ + RouteResult: entity.Route{IntentName: "general_query"}, + } + dispatcher := &testutil.FakeWorkflowDispatcher{ + Error: errors.New("n8n unavailable"), + } + sessions := &testutil.FakeSessionStore{} + gateway := &testutil.FakeMessageGateway{} + + uc := usecase.NewHandleTextMessage(router, dispatcher, sessions, gateway, newTestLogger()) + err := uc.Execute(context.Background(), validTextDTO()) + + assert.Error(t, err) + assert.Equal(t, 1, dispatcher.CallCount) + assert.NotEmpty(t, gateway.LastSentText) // fallback error message sent to user +} + +func TestHandleTextMessage_Execute_SessionPersisted(t *testing.T) { + router := &testutil.FakeIntentRouter{ + RouteResult: entity.Route{ + IntentName: "general_query", + Target: entity.RouteTarget{WorkflowID: "wf-1"}, + }, + } + dispatcher := &testutil.FakeWorkflowDispatcher{ + Response: entity.WorkflowResponse{ReplyText: "OK"}, + } + sessions := &testutil.FakeSessionStore{} + gateway := &testutil.FakeMessageGateway{} + + uc := usecase.NewHandleTextMessage(router, dispatcher, sessions, gateway, newTestLogger()) + err := uc.Execute(context.Background(), validTextDTO()) + + require.NoError(t, err) + assert.True(t, sessions.SetCalled) + assert.Equal(t, "wf-1", sessions.LastSetSession.CurrentWorkflow) +} diff --git a/internal/application/usecase/handle_voice_message.go b/internal/application/usecase/handle_voice_message.go new file mode 100644 index 0000000..9144f34 --- /dev/null +++ b/internal/application/usecase/handle_voice_message.go @@ -0,0 +1,80 @@ +package usecase + +import ( + "context" + "fmt" + "log/slog" + + "github.com/paramah/gw_telegram/internal/application/dto" + "github.com/paramah/gw_telegram/internal/domain/port" +) + +type HandleVoiceMessage struct { + downloader port.FileDownloader + converter AudioConverter + transcriber port.SpeechTranscriber + textHandler *HandleTextMessage + gateway port.MessageGateway + logger *slog.Logger +} + +// AudioConverter converts audio between formats (OGG -> WAV etc.) +type AudioConverter interface { + Convert(ctx context.Context, input []byte, fromMime, toMime string) ([]byte, error) +} + +func NewHandleVoiceMessage( + downloader port.FileDownloader, + converter AudioConverter, + transcriber port.SpeechTranscriber, + textHandler *HandleTextMessage, + gateway port.MessageGateway, + logger *slog.Logger, +) *HandleVoiceMessage { + return &HandleVoiceMessage{ + downloader: downloader, + converter: converter, + transcriber: transcriber, + textHandler: textHandler, + gateway: gateway, + logger: logger, + } +} + +func (h *HandleVoiceMessage) Execute(ctx context.Context, in dto.IncomingMessageDTO) error { + _ = h.gateway.SendTyping(ctx, in.ChatID) + + audioBytes, mimeType, err := h.downloader.Download(ctx, in.VoiceFileID) + if err != nil { + h.logger.ErrorContext(ctx, "voice download failed", "error", err, "file_id", in.VoiceFileID) + _ = h.gateway.SendText(ctx, in.ChatID, "Sorry, I couldn't download the audio message. Please try again.") + return fmt.Errorf("handle voice: download: %w", err) + } + + // Convert OGG Opus (Telegram default) to WAV for Whisper + if mimeType == "audio/ogg" || mimeType == "" { + wavBytes, err := h.converter.Convert(ctx, audioBytes, "audio/ogg", "audio/wav") + if err != nil { + h.logger.WarnContext(ctx, "audio conversion failed, trying raw", "error", err) + // fall through with original bytes + } else { + audioBytes = wavBytes + mimeType = "audio/wav" + } + } + + transcript, err := h.transcriber.Transcribe(ctx, audioBytes, mimeType) + if err != nil { + h.logger.ErrorContext(ctx, "transcription failed", "error", err) + _ = h.gateway.SendText(ctx, in.ChatID, "Sorry, I couldn't understand the voice message. Please try sending text instead.") + return fmt.Errorf("handle voice: transcribe: %w", err) + } + + h.logger.InfoContext(ctx, "voice transcribed", "user_id", in.UserID, "length", len(transcript)) + + textIn := in + textIn.Text = transcript + textIn.IsVoice = false + + return h.textHandler.Execute(ctx, textIn) +} diff --git a/internal/application/usecase/handle_voice_message_test.go b/internal/application/usecase/handle_voice_message_test.go new file mode 100644 index 0000000..13cf1d2 --- /dev/null +++ b/internal/application/usecase/handle_voice_message_test.go @@ -0,0 +1,99 @@ +package usecase_test + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/paramah/gw_telegram/internal/application/dto" + "github.com/paramah/gw_telegram/internal/application/usecase" + "github.com/paramah/gw_telegram/internal/domain/entity" + "github.com/paramah/gw_telegram/test/testutil" +) + +func validVoiceDTO() dto.IncomingMessageDTO { + return dto.IncomingMessageDTO{ + MessageID: 2, + ChatID: 100, + UserID: 200, + Username: "testuser", + VoiceFileID: "file_abc123", + IsVoice: true, + } +} + +func TestHandleVoiceMessage_Execute_HappyPath(t *testing.T) { + downloader := &testutil.FakeFileDownloader{ + Data: []byte("fake-ogg-audio"), + MimeType: "audio/ogg", + } + converter := &testutil.FakeAudioConverter{Output: []byte("fake-wav-audio")} + transcriber := &testutil.FakeSpeechTranscriber{Transcript: "I need help with my order"} + router := &testutil.FakeIntentRouter{ + RouteResult: entity.Route{IntentName: "order_inquiry"}, + } + dispatcher := &testutil.FakeWorkflowDispatcher{ + Response: entity.WorkflowResponse{ReplyText: "Order status: shipped"}, + } + sessions := &testutil.FakeSessionStore{} + gateway := &testutil.FakeMessageGateway{} + + textUC := usecase.NewHandleTextMessage(router, dispatcher, sessions, gateway, newTestLogger()) + uc := usecase.NewHandleVoiceMessage(downloader, converter, transcriber, textUC, gateway, newTestLogger()) + + err := uc.Execute(context.Background(), validVoiceDTO()) + + require.NoError(t, err) + assert.Equal(t, 1, transcriber.CallCount) + assert.Equal(t, 1, dispatcher.CallCount) + assert.Equal(t, "I need help with my order", dispatcher.LastRequest.MessageText) + assert.Equal(t, "Order status: shipped", gateway.LastSentText) +} + +func TestHandleVoiceMessage_Execute_DownloadFails(t *testing.T) { + downloader := &testutil.FakeFileDownloader{Error: errors.New("download error")} + transcriber := &testutil.FakeSpeechTranscriber{} + gateway := &testutil.FakeMessageGateway{} + + textUC := usecase.NewHandleTextMessage( + &testutil.FakeIntentRouter{}, + &testutil.FakeWorkflowDispatcher{}, + &testutil.FakeSessionStore{}, + gateway, + newTestLogger(), + ) + uc := usecase.NewHandleVoiceMessage(downloader, &testutil.FakeAudioConverter{}, transcriber, textUC, gateway, newTestLogger()) + + err := uc.Execute(context.Background(), validVoiceDTO()) + + assert.Error(t, err) + assert.Equal(t, 0, transcriber.CallCount) + assert.NotEmpty(t, gateway.LastSentText) // error message sent +} + +func TestHandleVoiceMessage_Execute_TranscriptionFails(t *testing.T) { + downloader := &testutil.FakeFileDownloader{ + Data: []byte("audio"), + MimeType: "audio/ogg", + } + transcriber := &testutil.FakeSpeechTranscriber{Error: errors.New("whisper error")} + gateway := &testutil.FakeMessageGateway{} + dispatcher := &testutil.FakeWorkflowDispatcher{} + + textUC := usecase.NewHandleTextMessage( + &testutil.FakeIntentRouter{}, + dispatcher, + &testutil.FakeSessionStore{}, + gateway, + newTestLogger(), + ) + uc := usecase.NewHandleVoiceMessage(downloader, &testutil.FakeAudioConverter{}, transcriber, textUC, gateway, newTestLogger()) + + err := uc.Execute(context.Background(), validVoiceDTO()) + + assert.Error(t, err) + assert.Equal(t, 0, dispatcher.CallCount) + assert.NotEmpty(t, gateway.LastSentText) +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..1ba4151 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,69 @@ +package config + +import ( + "fmt" + "os" + "github.com/kelseyhightower/envconfig" +) + +type Config struct { + Bot BotConfig + N8n N8nConfig + Speech SpeechConfig + Redis RedisConfig + Server ServerConfig + Log LogConfig +} + +type BotConfig struct { + Token string `envconfig:"TELEGRAM_BOT_TOKEN" required:"true"` + Mode string `envconfig:"BOT_MODE" default:"polling"` + WebhookURL string `envconfig:"TELEGRAM_WEBHOOK_URL"` + Debug bool `envconfig:"TELEGRAM_DEBUG" default:"false"` +} + +type N8nConfig struct { + BaseURL string `envconfig:"N8N_BASE_URL" required:"true"` + AuthToken string `envconfig:"N8N_AUTH_TOKEN"` + TimeoutSecs int `envconfig:"N8N_TIMEOUT" default:"30"` + RetryCount int `envconfig:"N8N_RETRY_COUNT" default:"3"` +} + +type SpeechConfig struct { + Provider string `envconfig:"STT_PROVIDER" default:"openai"` + OpenAIKey string `envconfig:"OPENAI_API_KEY"` + WhisperModel string `envconfig:"WHISPER_MODEL" default:"whisper-1"` + Language string `envconfig:"WHISPER_LANGUAGE" default:""` + FFmpegPath string `envconfig:"FFMPEG_PATH" default:"ffmpeg"` +} + +type RedisConfig struct { + URL string `envconfig:"REDIS_URL" default:"redis://localhost:6379"` + TTLHours int `envconfig:"SESSION_TTL" default:"24"` +} + +type ServerConfig struct { + Port int `envconfig:"SERVER_PORT" default:"8080"` +} + +type LogConfig struct { + Level string `envconfig:"LOG_LEVEL" default:"info"` + Format string `envconfig:"LOG_FORMAT" default:"json"` +} + +func Load() (*Config, error) { + var cfg Config + if err := envconfig.Process("", &cfg); err != nil { + return nil, fmt.Errorf("load config: %w", err) + } + return &cfg, nil +} + +func MustLoad() *Config { + cfg, err := Load() + if err != nil { + fmt.Fprintf(os.Stderr, "fatal: %v\n", err) + os.Exit(1) + } + return cfg +} diff --git a/internal/domain/apperror/errors.go b/internal/domain/apperror/errors.go new file mode 100644 index 0000000..9e48ed1 --- /dev/null +++ b/internal/domain/apperror/errors.go @@ -0,0 +1,25 @@ +package apperror + +import "errors" + +type AppError struct { + Code string + Message string +} + +func (e *AppError) Error() string { + return e.Message +} + +var ( + ErrMessageEmpty = &AppError{Code: "MSG_EMPTY", Message: "message text is empty"} + ErrRouteNotFound = &AppError{Code: "ROUTE_NOT_FOUND", Message: "no route found for message"} + ErrWorkflowTimeout = &AppError{Code: "WORKFLOW_TIMEOUT", Message: "workflow call timed out"} + ErrTranscriptionFailed = &AppError{Code: "TRANSCRIPTION_FAILED", Message: "voice transcription failed"} + ErrSessionNotFound = &AppError{Code: "SESSION_NOT_FOUND", Message: "session not found"} + ErrDownloadFailed = &AppError{Code: "DOWNLOAD_FAILED", Message: "file download failed"} +) + +func Is(err, target error) bool { + return errors.Is(err, target) +} diff --git a/internal/domain/entity/intent.go b/internal/domain/entity/intent.go new file mode 100644 index 0000000..bf9b09a --- /dev/null +++ b/internal/domain/entity/intent.go @@ -0,0 +1,28 @@ +package entity + +type Intent struct { + Name string + Confidence float64 + Entities map[string]any +} + +type RouteTargetType string + +const ( + RouteTargetN8n RouteTargetType = "n8n" + RouteTargetAIAgent RouteTargetType = "ai_agent" + RouteTargetBuiltin RouteTargetType = "builtin" +) + +type RouteTarget struct { + Type RouteTargetType + WorkflowID string + Endpoint string +} + +type Route struct { + Pattern string + IntentName string + Target RouteTarget + Priority int +} diff --git a/internal/domain/entity/message.go b/internal/domain/entity/message.go new file mode 100644 index 0000000..b171957 --- /dev/null +++ b/internal/domain/entity/message.go @@ -0,0 +1,29 @@ +package entity + +import "time" + +type MessageType string + +const ( + MessageTypeText MessageType = "text" + MessageTypeVoice MessageType = "voice" +) + +type Message struct { + MessageID int64 + ChatID int64 + UserID int64 + Username string + Type MessageType + Text string + VoiceFileID string + Timestamp time.Time + Metadata map[string]any +} + +type VoiceMessage struct { + FileID string + Duration int + MimeType string + FileSize int +} diff --git a/internal/domain/entity/user.go b/internal/domain/entity/user.go new file mode 100644 index 0000000..8707a8f --- /dev/null +++ b/internal/domain/entity/user.go @@ -0,0 +1,22 @@ +package entity + +import "time" + +type User struct { + TelegramID int64 + Username string + FirstName string + LastName string + Language string + CreatedAt time.Time +} + +type Session struct { + UserID int64 + ChatID int64 + CurrentWorkflow string + History []Message + Data map[string]any + UpdatedAt time.Time + TTL int // hours +} diff --git a/internal/domain/entity/workflow.go b/internal/domain/entity/workflow.go new file mode 100644 index 0000000..581f261 --- /dev/null +++ b/internal/domain/entity/workflow.go @@ -0,0 +1,29 @@ +package entity + +import "time" + +type WorkflowRequest struct { + RequestID string + ChatID int64 + UserID int64 + Username string + MessageText string + Intent Intent + Session Session + Timestamp time.Time + Metadata map[string]any +} + +type Action struct { + Type string + Key string + Value any +} + +type WorkflowResponse struct { + RequestID string + ReplyText string + Actions []Action + NextWorkflow string + Error error +} diff --git a/internal/domain/port/file_downloader.go b/internal/domain/port/file_downloader.go new file mode 100644 index 0000000..6bdc76c --- /dev/null +++ b/internal/domain/port/file_downloader.go @@ -0,0 +1,7 @@ +package port + +import "context" + +type FileDownloader interface { + Download(ctx context.Context, fileID string) ([]byte, string, error) // bytes, mimeType, error +} diff --git a/internal/domain/port/intent_router.go b/internal/domain/port/intent_router.go new file mode 100644 index 0000000..b2f6a7d --- /dev/null +++ b/internal/domain/port/intent_router.go @@ -0,0 +1,10 @@ +package port + +import ( + "context" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type IntentRouter interface { + Route(ctx context.Context, msg entity.Message) (entity.Route, error) +} diff --git a/internal/domain/port/message_gateway.go b/internal/domain/port/message_gateway.go new file mode 100644 index 0000000..9a5abaf --- /dev/null +++ b/internal/domain/port/message_gateway.go @@ -0,0 +1,8 @@ +package port + +import "context" + +type MessageGateway interface { + SendText(ctx context.Context, chatID int64, text string) error + SendTyping(ctx context.Context, chatID int64) error +} diff --git a/internal/domain/port/session_store.go b/internal/domain/port/session_store.go new file mode 100644 index 0000000..19a92f9 --- /dev/null +++ b/internal/domain/port/session_store.go @@ -0,0 +1,12 @@ +package port + +import ( + "context" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type SessionStore interface { + Get(ctx context.Context, userID int64) (entity.Session, error) + Set(ctx context.Context, session entity.Session) error + Delete(ctx context.Context, userID int64) error +} diff --git a/internal/domain/port/speech_transcriber.go b/internal/domain/port/speech_transcriber.go new file mode 100644 index 0000000..a828b9a --- /dev/null +++ b/internal/domain/port/speech_transcriber.go @@ -0,0 +1,7 @@ +package port + +import "context" + +type SpeechTranscriber interface { + Transcribe(ctx context.Context, audioData []byte, mimeType string) (string, error) +} diff --git a/internal/domain/port/workflow_dispatcher.go b/internal/domain/port/workflow_dispatcher.go new file mode 100644 index 0000000..30e4260 --- /dev/null +++ b/internal/domain/port/workflow_dispatcher.go @@ -0,0 +1,10 @@ +package port + +import ( + "context" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type WorkflowDispatcher interface { + Dispatch(ctx context.Context, req entity.WorkflowRequest) (entity.WorkflowResponse, error) +} diff --git a/internal/infrastructure/n8n/payload.go b/internal/infrastructure/n8n/payload.go new file mode 100644 index 0000000..c70470d --- /dev/null +++ b/internal/infrastructure/n8n/payload.go @@ -0,0 +1,8 @@ +package n8n + +// payload.go contains shared payload types for n8n webhook communication. +// The primary types (n8nPayload, n8nResponse) are defined in webhook_dispatcher.go +// to keep them co-located with their usage. +// +// This file is reserved for any additional payload structures needed +// for extended n8n integration (e.g., batch requests, webhook registration). diff --git a/internal/infrastructure/n8n/webhook_dispatcher.go b/internal/infrastructure/n8n/webhook_dispatcher.go new file mode 100644 index 0000000..a4d83f4 --- /dev/null +++ b/internal/infrastructure/n8n/webhook_dispatcher.go @@ -0,0 +1,124 @@ +package n8n + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "time" + + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type WorkflowConfig struct { + ID string + WebhookURL string + AuthToken string + Timeout time.Duration +} + +type WebhookDispatcher struct { + workflows map[string]WorkflowConfig + client *http.Client + logger *slog.Logger +} + +func NewWebhookDispatcher(workflows map[string]WorkflowConfig, logger *slog.Logger) *WebhookDispatcher { + return &WebhookDispatcher{ + workflows: workflows, + client: &http.Client{Timeout: 30 * time.Second}, + logger: logger, + } +} + +type n8nPayload struct { + RequestID string `json:"request_id"` + ChatID int64 `json:"chat_id"` + UserID int64 `json:"user_id"` + Username string `json:"username"` + MessageText string `json:"message_text"` + IntentName string `json:"intent_name"` + Timestamp time.Time `json:"timestamp"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +type n8nResponse struct { + Reply string `json:"reply"` + Actions []entity.Action `json:"actions,omitempty"` + NextWorkflow string `json:"next_workflow,omitempty"` +} + +func (d *WebhookDispatcher) Dispatch(ctx context.Context, req entity.WorkflowRequest) (entity.WorkflowResponse, error) { + wf, ok := d.workflows[req.Intent.Name] + if !ok { + // Try default workflow + wf, ok = d.workflows["default"] + if !ok { + return entity.WorkflowResponse{}, fmt.Errorf("no workflow configured for intent %q", req.Intent.Name) + } + } + + payload := n8nPayload{ + RequestID: req.RequestID, + ChatID: req.ChatID, + UserID: req.UserID, + Username: req.Username, + MessageText: req.MessageText, + IntentName: req.Intent.Name, + Timestamp: req.Timestamp, + Metadata: req.Metadata, + } + + body, err := json.Marshal(payload) + if err != nil { + return entity.WorkflowResponse{}, fmt.Errorf("marshal payload: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, wf.WebhookURL, bytes.NewReader(body)) + if err != nil { + return entity.WorkflowResponse{}, fmt.Errorf("create request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + if wf.AuthToken != "" { + httpReq.Header.Set("Authorization", "Bearer "+wf.AuthToken) + } + + client := d.client + if wf.Timeout > 0 { + client = &http.Client{Timeout: wf.Timeout} + } + + resp, err := client.Do(httpReq) + if err != nil { + return entity.WorkflowResponse{}, fmt.Errorf("n8n webhook call: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return entity.WorkflowResponse{}, fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return entity.WorkflowResponse{}, fmt.Errorf("n8n returned status %d: %s", resp.StatusCode, string(respBody)) + } + + var n8nResp n8nResponse + if err := json.Unmarshal(respBody, &n8nResp); err != nil { + // If not valid JSON, treat the raw body as reply text + return entity.WorkflowResponse{ + RequestID: req.RequestID, + ReplyText: string(respBody), + }, nil + } + + return entity.WorkflowResponse{ + RequestID: req.RequestID, + ReplyText: n8nResp.Reply, + Actions: n8nResp.Actions, + NextWorkflow: n8nResp.NextWorkflow, + }, nil +} diff --git a/internal/infrastructure/router/rule_based_router.go b/internal/infrastructure/router/rule_based_router.go new file mode 100644 index 0000000..98c7777 --- /dev/null +++ b/internal/infrastructure/router/rule_based_router.go @@ -0,0 +1,44 @@ +package router + +import ( + "context" + "regexp" + "sort" + + "github.com/paramah/gw_telegram/internal/domain/apperror" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type Rule struct { + Pattern *regexp.Regexp + IntentName string + Target entity.RouteTarget + Priority int +} + +type RuleBasedRouter struct { + rules []Rule +} + +func NewRuleBasedRouter(rules []Rule) *RuleBasedRouter { + sorted := make([]Rule, len(rules)) + copy(sorted, rules) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Priority > sorted[j].Priority + }) + return &RuleBasedRouter{rules: sorted} +} + +func (r *RuleBasedRouter) Route(_ context.Context, msg entity.Message) (entity.Route, error) { + for _, rule := range r.rules { + if rule.Pattern.MatchString(msg.Text) { + return entity.Route{ + Pattern: rule.Pattern.String(), + IntentName: rule.IntentName, + Target: rule.Target, + Priority: rule.Priority, + }, nil + } + } + return entity.Route{}, apperror.ErrRouteNotFound +} diff --git a/internal/infrastructure/speech/audio_converter.go b/internal/infrastructure/speech/audio_converter.go new file mode 100644 index 0000000..50f09da --- /dev/null +++ b/internal/infrastructure/speech/audio_converter.go @@ -0,0 +1,63 @@ +package speech + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + + "github.com/google/uuid" +) + +type FFmpegConverter struct { + ffmpegPath string + tempDir string +} + +func NewFFmpegConverter(ffmpegPath, tempDir string) *FFmpegConverter { + if ffmpegPath == "" { + ffmpegPath = "ffmpeg" + } + if tempDir == "" { + tempDir = os.TempDir() + } + return &FFmpegConverter{ffmpegPath: ffmpegPath, tempDir: tempDir} +} + +func (c *FFmpegConverter) Convert(ctx context.Context, input []byte, fromMime, toMime string) ([]byte, error) { + id := uuid.New().String() + inFile := filepath.Join(c.tempDir, id+".input") + outFile := filepath.Join(c.tempDir, id+".wav") + + defer os.Remove(inFile) + defer os.Remove(outFile) + + if err := os.WriteFile(inFile, input, 0600); err != nil { + return nil, fmt.Errorf("write temp input: %w", err) + } + + cmd := exec.CommandContext(ctx, c.ffmpegPath, + "-i", inFile, + "-ar", "16000", + "-ac", "1", + "-f", "wav", + "-y", + outFile, + ) + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("ffmpeg conversion: %w: %s", err, stderr.String()) + } + + out, err := os.ReadFile(outFile) + if err != nil { + return nil, fmt.Errorf("read converted file: %w", err) + } + + return out, nil +} diff --git a/internal/infrastructure/speech/openai_whisper.go b/internal/infrastructure/speech/openai_whisper.go new file mode 100644 index 0000000..9807a1c --- /dev/null +++ b/internal/infrastructure/speech/openai_whisper.go @@ -0,0 +1,89 @@ +package speech + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "time" +) + +const whisperAPIURL = "https://api.openai.com/v1/audio/transcriptions" + +type OpenAIWhisper struct { + apiKey string + model string + language string + client *http.Client +} + +func NewOpenAIWhisper(apiKey, model, language string) *OpenAIWhisper { + return &OpenAIWhisper{ + apiKey: apiKey, + model: model, + language: language, + client: &http.Client{Timeout: 60 * time.Second}, + } +} + +type whisperResponse struct { + Text string `json:"text"` +} + +func (w *OpenAIWhisper) Transcribe(ctx context.Context, audioData []byte, mimeType string) (string, error) { + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + + fw, err := mw.CreateFormFile("file", "audio.wav") + if err != nil { + return "", fmt.Errorf("create form file: %w", err) + } + if _, err := fw.Write(audioData); err != nil { + return "", fmt.Errorf("write audio data: %w", err) + } + + if err := mw.WriteField("model", w.model); err != nil { + return "", fmt.Errorf("write model field: %w", err) + } + if err := mw.WriteField("response_format", "json"); err != nil { + return "", fmt.Errorf("write response_format: %w", err) + } + if w.language != "" { + if err := mw.WriteField("language", w.language); err != nil { + return "", fmt.Errorf("write language field: %w", err) + } + } + mw.Close() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, whisperAPIURL, &buf) + if err != nil { + return "", fmt.Errorf("create whisper request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+w.apiKey) + req.Header.Set("Content-Type", mw.FormDataContentType()) + + resp, err := w.client.Do(req) + if err != nil { + return "", fmt.Errorf("whisper API call: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("read whisper response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("whisper API error %d: %s", resp.StatusCode, string(body)) + } + + var result whisperResponse + if err := json.Unmarshal(body, &result); err != nil { + return "", fmt.Errorf("parse whisper response: %w", err) + } + + return result.Text, nil +} diff --git a/internal/infrastructure/storage/redis_session_store.go b/internal/infrastructure/storage/redis_session_store.go new file mode 100644 index 0000000..f614ac8 --- /dev/null +++ b/internal/infrastructure/storage/redis_session_store.go @@ -0,0 +1,62 @@ +package storage + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/redis/go-redis/v9" + "github.com/paramah/gw_telegram/internal/domain/apperror" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type RedisSessionStore struct { + client *redis.Client + ttl time.Duration +} + +func NewRedisSessionStore(client *redis.Client, ttlHours int) *RedisSessionStore { + return &RedisSessionStore{ + client: client, + ttl: time.Duration(ttlHours) * time.Hour, + } +} + +func sessionKey(userID int64) string { + return fmt.Sprintf("session:%d", userID) +} + +func (s *RedisSessionStore) Get(ctx context.Context, userID int64) (entity.Session, error) { + data, err := s.client.Get(ctx, sessionKey(userID)).Bytes() + if err == redis.Nil { + return entity.Session{}, apperror.ErrSessionNotFound + } + if err != nil { + return entity.Session{}, fmt.Errorf("redis get session: %w", err) + } + + var session entity.Session + if err := json.Unmarshal(data, &session); err != nil { + return entity.Session{}, fmt.Errorf("unmarshal session: %w", err) + } + return session, nil +} + +func (s *RedisSessionStore) Set(ctx context.Context, session entity.Session) error { + data, err := json.Marshal(session) + if err != nil { + return fmt.Errorf("marshal session: %w", err) + } + if err := s.client.Set(ctx, sessionKey(session.UserID), data, s.ttl).Err(); err != nil { + return fmt.Errorf("redis set session: %w", err) + } + return nil +} + +func (s *RedisSessionStore) Delete(ctx context.Context, userID int64) error { + if err := s.client.Del(ctx, sessionKey(userID)).Err(); err != nil { + return fmt.Errorf("redis del session: %w", err) + } + return nil +} diff --git a/internal/infrastructure/telegram/bot_gateway.go b/internal/infrastructure/telegram/bot_gateway.go new file mode 100644 index 0000000..9fe320b --- /dev/null +++ b/internal/infrastructure/telegram/bot_gateway.go @@ -0,0 +1,37 @@ +package telegram + +import ( + "context" + "fmt" + "log/slog" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" +) + +type BotGateway struct { + bot *tgbotapi.BotAPI + logger *slog.Logger +} + +func NewBotGateway(bot *tgbotapi.BotAPI, logger *slog.Logger) *BotGateway { + return &BotGateway{bot: bot, logger: logger} +} + +func (g *BotGateway) SendText(ctx context.Context, chatID int64, text string) error { + msg := tgbotapi.NewMessage(chatID, text) + msg.ParseMode = tgbotapi.ModeMarkdown + _, err := g.bot.Send(msg) + if err != nil { + return fmt.Errorf("telegram send text: %w", err) + } + return nil +} + +func (g *BotGateway) SendTyping(ctx context.Context, chatID int64) error { + action := tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping) + _, err := g.bot.Request(action) + if err != nil { + g.logger.WarnContext(ctx, "failed to send typing action", "error", err, "chat_id", chatID) + } + return nil +} diff --git a/internal/infrastructure/telegram/file_downloader.go b/internal/infrastructure/telegram/file_downloader.go new file mode 100644 index 0000000..72b5cc9 --- /dev/null +++ b/internal/infrastructure/telegram/file_downloader.go @@ -0,0 +1,48 @@ +package telegram + +import ( + "context" + "fmt" + "io" + "net/http" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" +) + +type TelegramFileDownloader struct { + bot *tgbotapi.BotAPI + client *http.Client +} + +func NewTelegramFileDownloader(bot *tgbotapi.BotAPI) *TelegramFileDownloader { + return &TelegramFileDownloader{ + bot: bot, + client: &http.Client{}, + } +} + +func (d *TelegramFileDownloader) Download(ctx context.Context, fileID string) ([]byte, string, error) { + file, err := d.bot.GetFile(tgbotapi.FileConfig{FileID: fileID}) + if err != nil { + return nil, "", fmt.Errorf("get file info: %w", err) + } + + url := file.Link(d.bot.Token) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, "", fmt.Errorf("create download request: %w", err) + } + + resp, err := d.client.Do(req) + if err != nil { + return nil, "", fmt.Errorf("download file: %w", err) + } + defer resp.Body.Close() + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, "", fmt.Errorf("read file body: %w", err) + } + + return data, "audio/ogg", nil +} diff --git a/internal/infrastructure/telegram/update_poller.go b/internal/infrastructure/telegram/update_poller.go new file mode 100644 index 0000000..9781a44 --- /dev/null +++ b/internal/infrastructure/telegram/update_poller.go @@ -0,0 +1,50 @@ +package telegram + +import ( + "context" + "log/slog" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" +) + +type UpdateHandler interface { + Handle(ctx context.Context, update tgbotapi.Update) +} + +type UpdatePoller struct { + bot *tgbotapi.BotAPI + handler UpdateHandler + logger *slog.Logger + timeout int +} + +func NewUpdatePoller(bot *tgbotapi.BotAPI, handler UpdateHandler, logger *slog.Logger) *UpdatePoller { + return &UpdatePoller{ + bot: bot, + handler: handler, + logger: logger, + timeout: 60, + } +} + +func (p *UpdatePoller) Start(ctx context.Context) error { + u := tgbotapi.NewUpdate(0) + u.Timeout = p.timeout + + updates := p.bot.GetUpdatesChan(u) + p.logger.InfoContext(ctx, "update poller started") + + for { + select { + case <-ctx.Done(): + p.bot.StopReceivingUpdates() + p.logger.InfoContext(ctx, "update poller stopped") + return ctx.Err() + case update, ok := <-updates: + if !ok { + return nil + } + go p.handler.Handle(ctx, update) + } + } +} diff --git a/internal/interfaces/health_handler.go b/internal/interfaces/health_handler.go new file mode 100644 index 0000000..3f178e5 --- /dev/null +++ b/internal/interfaces/health_handler.go @@ -0,0 +1,27 @@ +package interfaces + +import ( + "encoding/json" + "net/http" + "time" +) + +type HealthHandler struct{} + +func NewHealthHandler() *HealthHandler { + return &HealthHandler{} +} + +type healthResponse struct { + Status string `json:"status"` + Timestamp time.Time `json:"timestamp"` +} + +func (h *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(healthResponse{ + Status: "ok", + Timestamp: time.Now().UTC(), + }) +} diff --git a/internal/interfaces/telegram_handler.go b/internal/interfaces/telegram_handler.go new file mode 100644 index 0000000..e0ee51f --- /dev/null +++ b/internal/interfaces/telegram_handler.go @@ -0,0 +1,74 @@ +package interfaces + +import ( + "context" + "log/slog" + "time" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" + "github.com/paramah/gw_telegram/internal/application/dto" + "github.com/paramah/gw_telegram/internal/application/usecase" +) + +type TelegramHandler struct { + textUC *usecase.HandleTextMessage + voiceUC *usecase.HandleVoiceMessage + logger *slog.Logger +} + +func NewTelegramHandler( + textUC *usecase.HandleTextMessage, + voiceUC *usecase.HandleVoiceMessage, + logger *slog.Logger, +) *TelegramHandler { + return &TelegramHandler{textUC: textUC, voiceUC: voiceUC, logger: logger} +} + +func (h *TelegramHandler) Handle(ctx context.Context, update tgbotapi.Update) { + defer func() { + if r := recover(); r != nil { + h.logger.ErrorContext(ctx, "panic in update handler", "panic", r) + } + }() + + if update.Message == nil { + return + } + + msg := update.Message + + switch { + case msg.Voice != nil: + in := dto.IncomingMessageDTO{ + MessageID: int64(msg.MessageID), + ChatID: msg.Chat.ID, + UserID: msg.From.ID, + Username: msg.From.UserName, + FirstName: msg.From.FirstName, + LastName: msg.From.LastName, + VoiceFileID: msg.Voice.FileID, + IsVoice: true, + Timestamp: time.Unix(int64(msg.Date), 0), + Language: msg.From.LanguageCode, + } + if err := h.voiceUC.Execute(ctx, in); err != nil { + h.logger.ErrorContext(ctx, "voice handler error", "error", err, "user_id", msg.From.ID) + } + + case msg.Text != "": + in := dto.IncomingMessageDTO{ + MessageID: int64(msg.MessageID), + ChatID: msg.Chat.ID, + UserID: msg.From.ID, + Username: msg.From.UserName, + FirstName: msg.From.FirstName, + LastName: msg.From.LastName, + Text: msg.Text, + Timestamp: time.Unix(int64(msg.Date), 0), + Language: msg.From.LanguageCode, + } + if err := h.textUC.Execute(ctx, in); err != nil { + h.logger.ErrorContext(ctx, "text handler error", "error", err, "user_id", msg.From.ID) + } + } +} diff --git a/test/testutil/fake_file_downloader.go b/test/testutil/fake_file_downloader.go new file mode 100644 index 0000000..e51070a --- /dev/null +++ b/test/testutil/fake_file_downloader.go @@ -0,0 +1,25 @@ +package testutil + +import "context" + +type FakeFileDownloader struct { + Data []byte + MimeType string + Error error +} + +func (f *FakeFileDownloader) Download(_ context.Context, _ string) ([]byte, string, error) { + return f.Data, f.MimeType, f.Error +} + +type FakeAudioConverter struct { + Output []byte + Error error +} + +func (f *FakeAudioConverter) Convert(_ context.Context, _ []byte, _, _ string) ([]byte, error) { + if f.Output != nil { + return f.Output, f.Error + } + return []byte("converted-audio"), f.Error +} diff --git a/test/testutil/fake_intent_router.go b/test/testutil/fake_intent_router.go new file mode 100644 index 0000000..1b10ba2 --- /dev/null +++ b/test/testutil/fake_intent_router.go @@ -0,0 +1,15 @@ +package testutil + +import ( + "context" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type FakeIntentRouter struct { + RouteResult entity.Route + Error error +} + +func (f *FakeIntentRouter) Route(_ context.Context, _ entity.Message) (entity.Route, error) { + return f.RouteResult, f.Error +} diff --git a/test/testutil/fake_message_gateway.go b/test/testutil/fake_message_gateway.go new file mode 100644 index 0000000..da7c6ae --- /dev/null +++ b/test/testutil/fake_message_gateway.go @@ -0,0 +1,22 @@ +package testutil + +import "context" + +type FakeMessageGateway struct { + LastSentText string + LastChatID int64 + TypingChatIDs []int64 + SendError error + TypingError error +} + +func (f *FakeMessageGateway) SendText(_ context.Context, chatID int64, text string) error { + f.LastChatID = chatID + f.LastSentText = text + return f.SendError +} + +func (f *FakeMessageGateway) SendTyping(_ context.Context, chatID int64) error { + f.TypingChatIDs = append(f.TypingChatIDs, chatID) + return f.TypingError +} diff --git a/test/testutil/fake_session_store.go b/test/testutil/fake_session_store.go new file mode 100644 index 0000000..8c2dfe2 --- /dev/null +++ b/test/testutil/fake_session_store.go @@ -0,0 +1,43 @@ +package testutil + +import ( + "context" + "github.com/paramah/gw_telegram/internal/domain/apperror" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type FakeSessionStore struct { + Sessions map[int64]entity.Session + GetError error + SetCalled bool + LastSetSession entity.Session +} + +func (f *FakeSessionStore) Get(_ context.Context, userID int64) (entity.Session, error) { + if f.GetError != nil { + return entity.Session{}, f.GetError + } + if f.Sessions != nil { + if s, ok := f.Sessions[userID]; ok { + return s, nil + } + } + return entity.Session{}, apperror.ErrSessionNotFound +} + +func (f *FakeSessionStore) Set(_ context.Context, session entity.Session) error { + f.SetCalled = true + f.LastSetSession = session + if f.Sessions == nil { + f.Sessions = make(map[int64]entity.Session) + } + f.Sessions[session.UserID] = session + return nil +} + +func (f *FakeSessionStore) Delete(_ context.Context, userID int64) error { + if f.Sessions != nil { + delete(f.Sessions, userID) + } + return nil +} diff --git a/test/testutil/fake_speech_transcriber.go b/test/testutil/fake_speech_transcriber.go new file mode 100644 index 0000000..567393b --- /dev/null +++ b/test/testutil/fake_speech_transcriber.go @@ -0,0 +1,14 @@ +package testutil + +import "context" + +type FakeSpeechTranscriber struct { + Transcript string + Error error + CallCount int +} + +func (f *FakeSpeechTranscriber) Transcribe(_ context.Context, _ []byte, _ string) (string, error) { + f.CallCount++ + return f.Transcript, f.Error +} diff --git a/test/testutil/fake_workflow_dispatcher.go b/test/testutil/fake_workflow_dispatcher.go new file mode 100644 index 0000000..46cb5a0 --- /dev/null +++ b/test/testutil/fake_workflow_dispatcher.go @@ -0,0 +1,19 @@ +package testutil + +import ( + "context" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +type FakeWorkflowDispatcher struct { + Response entity.WorkflowResponse + Error error + CallCount int + LastRequest entity.WorkflowRequest +} + +func (f *FakeWorkflowDispatcher) Dispatch(_ context.Context, req entity.WorkflowRequest) (entity.WorkflowResponse, error) { + f.CallCount++ + f.LastRequest = req + return f.Response, f.Error +} diff --git a/test/testutil/fixtures.go b/test/testutil/fixtures.go new file mode 100644 index 0000000..b27ab12 --- /dev/null +++ b/test/testutil/fixtures.go @@ -0,0 +1,29 @@ +package testutil + +import ( + "time" + "github.com/paramah/gw_telegram/internal/domain/entity" +) + +func NewTextMessage(chatID, userID int64, text string) entity.Message { + return entity.Message{ + MessageID: 1, + ChatID: chatID, + UserID: userID, + Username: "testuser", + Type: entity.MessageTypeText, + Text: text, + Timestamp: time.Now(), + Metadata: make(map[string]any), + } +} + +func NewSession(userID, chatID int64) entity.Session { + return entity.Session{ + UserID: userID, + ChatID: chatID, + Data: make(map[string]any), + UpdatedAt: time.Now(), + TTL: 24, + } +}