File size: 2,059 Bytes
eb3b209
 
c801ad7
eb3b209
 
 
c801ad7
309b902
eb3b209
309b902
9e815ad
309b902
eb3b209
309b902
9e815ad
eb3b209
c801ad7
eb3b209
c801ad7
eb3b209
60457ee
689f7f1
eb3b209
 
 
 
 
 
 
 
 
3d5e9b3
 
c801ad7
eb3b209
 
 
 
9e815ad
c801ad7
 
 
 
 
 
9e815ad
c801ad7
eb3b209
c801ad7
3d5e9b3
 
c801ad7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from smolagents.tools import Tool
import requests
from typing import List, Dict

class LinkedInJobSearchTool(Tool):
    name = "linkedin_job_search"
    description = "Searches for job postings on LinkedIn based on job title, location, and work mode (remote, hybrid, in-office)."
    
    inputs = {
        "position": {"type": "string", "description": "Job title (e.g., Data Scientist)"},
        "location": {"type": "string", "description": "City or country (e.g., Germany)"},
        "work_mode": {"type": "string", "description": "remote, hybrid, in-office"}
    }
    
    output_type = "array"

    def forward(self, position: str, location: str, work_mode: str) -> List[Dict]:
        """
        Fetches job listings from LinkedIn using SerpAPI and returns structured JSON.
        """
        SERPAPI_KEY = "YOUR-API-KEY"  # Replace with actual key
        base_url = "https://serpapi.com/search"

        params = {
            "engine": "google_jobs",
            "q": f"{position} {work_mode} jobs",
            "location": location,
            "hl": "en",
            "api_key": SERPAPI_KEY
        }

        try:
            response = requests.get(base_url, params=params)
            response.raise_for_status()

            data = response.json()
            job_results = data.get("jobs_results", [])

            # ✅ Properly format job URLs
            return [
                {
                    "Title": job["title"],
                    "Company": job.get("company_name", "N/A"),
                    "Location": job.get("location", "N/A"),
                    "Posted": job.get("detected_extensions", {}).get("posted_at", "N/A"),
                    "Link": f"https://www.linkedin.com/jobs/view/{job['job_id']}" if "job_id" in job else "N/A"
                }
                for job in job_results
            ] if job_results else [{"Error": "No jobs found. Try different keywords."}]
        
        except requests.exceptions.RequestException as e:
            return [{"Error": f"Error fetching job listings: {str(e)}"}]