Automating the AI Model Migration Challenge: Building a Bulk N8N Workflow Updater
AI vendors are releasing upgraded models by the month. What do you do when you have hundreds of workflows configured with a model that's now legacy?

In with the new: when model upgrades come out, so can headaches for those managing multiple automations based on them
The pace of AI development in 2025 has been nothing short of extraordinary.
New language models are being released monthly, each promising better performance, lower costs, or enhanced capabilities.
As someone who relies heavily on N8N for workflow automation, I recently faced a familiar yet frustrating challenge: OpenAI had just released GPT-5 Mini, and I had dozens of workflows still configured to use the older GPT-4.1 Mini.
The prospect of manually updating each workflow was daunting - this was surely a task that itself could probably be automated. It also has to be a common one. While conversational AI interfaces like ChatGPT track model updates automatically, a vast number of internal AI agents and assistants are manually configured and there's no default mechanism in place for applying these updates from the vendor.
This post presents a quick script which using the N8N API programmatically iterates through the workflows and applies a replacement logic defined by the user identifying every instance of the LLM model to be replaced by its Open Router identifier and replacing it with the new update..
This can be used for simple one-to-one replacements (ie,. "take every workflow in which I'm using GPT 4.1 Mini in a node and change the model to 5.1 Mini"). Or you can configure more elaborate find and replace logic. For example: replacing all legacy GPT models from Open AI with the latest and greatest.
Of course, you have to be careful in doing this.
One pitfall that would be easy to fall into is forgetting that some workflows or some models had multimodal capabilities not present in a replacement.
But for fairly clean replacements like 4.1 to 5.1 mini this works reliably well
Gather Your Credentials
The only credential you might require for this is the N8N API key:

If you're interacting with the N8N API through tools like Windsurf IDE or MCPs, it's highly advisable to make sure that each has its own API credential (although this is a best practice in general, of course!).
Cloudflare Access Authentication Handling With Service Tokens
A huge amount of N8N users are self-hosting as opposed to using the SaaS offering. And a significant amount of users will use Cloudflare Access to protect their instances from unauthorised access and malicious intrusion.
There are two approaches to handling custom backend scripts such as this:
- Firstly, you can deploy the scripts to the server-side (in which case Cloudflare authentication isn't a concern, as the scripts will be running against local services and already behind the Access policy and the Cloudflare WAF.) This makes developing them slightly more complicated as it requires setting up a tunnel if you want to validate them directly on the server.
- The alternative approach is simply to generate a Cloudflare service token and pass those in headers when making your API calls. This is the method demonstrated and used here.
Service Tokens For N8N API Authentication (Self-Hosting)
To authenticate your API calls to a Cloudflare Access protected API endpoint with a service token, you'll need to include two header values with your requests.

Save these as secrets in your environment config and then call them in your Python script like this:
CF-Access-Client-Id': self.cf_client_id,
CF-Access-Client-Secret': self.cf_client_secret
Before adding the two header fields to your script and expecting that all will be well however, a couple more things to check or replace. You need to also validate that:
1 - The service token has authentication to access the application you've assigned to your N8N instance (Service tokens are now bound to application access).
2 - The policy protecting your N8N instance has an Allow rule or Service Auth rule allowing service tokens to pass through the access policy. Just like conventional firewall rules, access policies are evaluated sequentially. So, if this isn't working, ensure you have the ordering correct.
Final tip:
The next time you feel like giving up on Cloudflare in frustration at how safe but complex it makes everything, download Postman and become acquainted with it. It will provide an easy space to test your requests individually and figure out how, precisely, they need to be made in scripts.
Handling authentication in API calls to Cloudflare-protected zones becomes second nature after you've done it enough times (like everything else!). Postman makes getting through that initial trial and error process much quicker.
The Implementation
Now let's get back to the actual boat model updating script.
The final script handles multiple model replacements in a single run:
pythonself.models_to_replace = [ 'openai/gpt-3.5-turbo', 'openai/gpt-4o-mini', 'openai/gpt-4.1-mini', 'openai/gpt-4.1-nano']self.new_model_id = 'openai/gpt-5-mini'
If you're also using OpenRouter, there' a nice clipboard icon in the models list that will copy the correct model ID to your clipboard:

Base Variables
These are the environment variables you want to populate into your environment as secrets:
1N8N_API_KEY={{your-api-key}}
2N8N_BASE_URL={{your-n8n-instance-url}}
3# Cloudflare Access Headers
4CF_ACCESS_CLIENT_ID= {{your-cf-client-id}}
5CF_ACCESS_CLIENT_SECRET={{your-cf-secret}}
6
7# Model Configuration
8NEW_MODEL_ID={{new-openrouter-model-id}}
Model Update Script
Here's a full script that does all the legwork.
Even if you have hundreds of workflows configured, this should apply all the changes within seconds saving you the tedium of digging them out one by one and updating individual nodes.
1#!/usr/bin/env python3
2"""
3N8N LLM Workflow Bulk Updater
4
5This script connects to an N8N instance and bulk updates LLM model references
6across all workflows. Specifically designed to update OpenRouter model IDs
7from older versions to newer ones (e.g., GPT-4.1 mini to GPT-5.1 mini).
8"""
9
10import os
11import json
12import requests
13from typing import List, Dict, Any, Optional
14from dotenv import load_dotenv
15from rich.console import Console
16from rich.table import Table
17from rich.progress import Progress, TaskID
18from rich.prompt import Confirm
19
20# Load environment variables
21load_dotenv()
22
23console = Console()
24
25class N8NWorkflowUpdater:
26 def __init__(self):
27 self.api_key = os.getenv('N8N_API_KEY')
28 self.base_url = os.getenv('N8N_BASE_URL')
29 self.cf_client_id = os.getenv('CF_ACCESS_CLIENT_ID')
30 self.cf_client_secret = os.getenv('CF_ACCESS_CLIENT_SECRET')
31
32 # Model configuration - multiple models to replace
33 self.models_to_replace = [
34 'openai/gpt-3.5-turbo',
35 'openai/gpt-4o-mini',
36 'openai/gpt-4.1-mini',
37 'openai/gpt-4.1-nano'
38 ]
39 self.new_model_id = os.getenv('NEW_MODEL_ID', 'openai/gpt-5-mini')
40
41 if not self.api_key or not self.base_url:
42 raise ValueError("N8N_API_KEY and N8N_BASE_URL must be set in .env file")
43
44 # Ensure base URL has proper format
45 if not self.base_url.startswith('http'):
46 self.base_url = f"https://{self.base_url}"
47 if not self.base_url.endswith('/api/v1'):
48 self.base_url = f"{self.base_url.rstrip('/')}/api/v1"
49
50 def _get_headers(self) -> Dict[str, str]:
51 """Get headers for API requests including Cloudflare access if configured."""
52 headers = {
53 'X-N8N-API-KEY': self.api_key,
54 'Content-Type': 'application/json'
55 }
56
57 # Add Cloudflare headers if configured
58 if self.cf_client_id and self.cf_client_secret:
59 headers.update({
60 'CF-Access-Client-Id': self.cf_client_id,
61 'CF-Access-Client-Secret': self.cf_client_secret
62 })
63
64 return headers
65
66 def test_connection(self) -> bool:
67 """Test connection to N8N API."""
68 try:
69 response = requests.get(
70 f"{self.base_url}/workflows",
71 headers=self._get_headers(),
72 timeout=10
73 )
74 response.raise_for_status()
75 console.print("ā
Successfully connected to N8N API", style="green")
76 return True
77 except requests.exceptions.RequestException as e:
78 console.print(f"ā Failed to connect to N8N API: {e}", style="red")
79 return False
80
81 def get_all_workflows(self) -> List[Dict[str, Any]]:
82 """Fetch all workflows from N8N."""
83 try:
84 response = requests.get(
85 f"{self.base_url}/workflows",
86 headers=self._get_headers()
87 )
88 response.raise_for_status()
89 workflows = response.json().get('data', [])
90 console.print(f"š Found {len(workflows)} workflows", style="blue")
91 return workflows
92 except requests.exceptions.RequestException as e:
93 console.print(f"ā Failed to fetch workflows: {e}", style="red")
94 return []
95
96 def workflow_contains_model(self, workflow: Dict[str, Any]) -> tuple[bool, List[str]]:
97 """Check if workflow contains any of the old model IDs."""
98 workflow_json = json.dumps(workflow)
99 found_models = []
100 for model_id in self.models_to_replace:
101 if model_id in workflow_json:
102 found_models.append(model_id)
103 return len(found_models) > 0, found_models
104
105 def update_workflow_model(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
106 """Update model references in a workflow."""
107 # Convert to JSON string, replace all old models with new model, and convert back
108 workflow_json = json.dumps(workflow)
109 updated_json = workflow_json
110
111 for old_model_id in self.models_to_replace:
112 updated_json = updated_json.replace(old_model_id, self.new_model_id)
113
114 return json.loads(updated_json)
115
116 def save_workflow(self, workflow: Dict[str, Any]) -> bool:
117 """Save updated workflow back to N8N."""
118 try:
119 workflow_id = workflow.get('id')
120 if not workflow_id:
121 console.print("ā Workflow missing ID", style="red")
122 return False
123
124 # Create payload with required fields for N8N API
125 update_payload = {
126 'name': workflow.get('name'),
127 'nodes': workflow.get('nodes'),
128 'connections': workflow.get('connections'),
129 'settings': workflow.get('settings', {}) # Required field
130 }
131
132 response = requests.put(
133 f"{self.base_url}/workflows/{workflow_id}",
134 headers=self._get_headers(),
135 json=update_payload
136 )
137 response.raise_for_status()
138 return True
139 except requests.exceptions.RequestException as e:
140 console.print(f"ā Failed to save workflow {workflow.get('name', 'Unknown')}: {e}", style="red")
141 return False
142
143 def run_bulk_update(self, dry_run: bool = True) -> None:
144 """Run the bulk update process."""
145 console.print(f"\nš Starting bulk update: {', '.join(self.models_to_replace)} ā {self.new_model_id}")
146 console.print(f"š Target instance: {self.base_url.replace('/api/v1', '')}")
147
148 if dry_run:
149 console.print("š Running in DRY RUN mode - no changes will be made", style="yellow")
150
151 # Test connection
152 if not self.test_connection():
153 return
154
155 # Get all workflows
156 workflows = self.get_all_workflows()
157 if not workflows:
158 return
159
160 # Find workflows that need updating
161 workflows_to_update = []
162 workflow_model_map = {}
163 for workflow in workflows:
164 contains_model, found_models = self.workflow_contains_model(workflow)
165 if contains_model:
166 workflows_to_update.append(workflow)
167 workflow_model_map[workflow.get('id')] = found_models
168
169 if not workflows_to_update:
170 console.print("ā
No workflows found containing the old model ID", style="green")
171 return
172
173 # Display summary table
174 table = Table(title="Workflows to Update")
175 table.add_column("Name", style="cyan")
176 table.add_column("ID", style="magenta")
177 table.add_column("Active", style="green")
178 table.add_column("Models Found", style="yellow")
179
180 for workflow in workflows_to_update:
181 workflow_id = workflow.get('id')
182 found_models = workflow_model_map.get(workflow_id, [])
183 table.add_row(
184 workflow.get('name', 'Unnamed'),
185 str(workflow_id or 'Unknown'),
186 "Yes" if workflow.get('active', False) else "No",
187 ", ".join(found_models)
188 )
189
190 console.print(table)
191 console.print(f"\nš Found {len(workflows_to_update)} workflows to update")
192
193 if dry_run:
194 console.print("\nš DRY RUN COMPLETE - Use --live to apply changes", style="yellow")
195 return
196
197 # Confirm before proceeding
198 if not Confirm.ask(f"\nProceed with updating {len(workflows_to_update)} workflows?"):
199 console.print("ā Update cancelled", style="red")
200 return
201
202 # Update workflows
203 updated_count = 0
204 failed_count = 0
205
206 with Progress() as progress:
207 task = progress.add_task("Updating workflows...", total=len(workflows_to_update))
208
209 for workflow in workflows_to_update:
210 # Update the workflow
211 updated_workflow = self.update_workflow_model(workflow)
212
213 # Save the updated workflow
214 if self.save_workflow(updated_workflow):
215 updated_count += 1
216 console.print(f"ā
Updated: {workflow.get('name', 'Unknown')}", style="green")
217 else:
218 failed_count += 1
219
220 progress.update(task, advance=1)
221
222 # Summary
223 console.print(f"\nš Update Summary:")
224 console.print(f"ā
Successfully updated: {updated_count}")
225 console.print(f"ā Failed to update: {failed_count}")
226
227 if updated_count > 0:
228 console.print(f"š Bulk update completed! Updated {updated_count} workflows.", style="green")
229
230def main():
231 """Main entry point."""
232 import argparse
233
234 parser = argparse.ArgumentParser(description="N8N LLM Workflow Bulk Updater")
235 parser.add_argument(
236 '--live',
237 action='store_true',
238 help='Apply changes (default is dry run)'
239 )
240 parser.add_argument(
241 '--new-model',
242 help='Override new model ID from .env (default: openai/gpt-5-mini)'
243 )
244
245 args = parser.parse_args()
246
247 try:
248 updater = N8NWorkflowUpdater()
249
250 # Override new model ID if provided
251 if args.new_model:
252 updater.new_model_id = args.new_model
253
254 # Run the update
255 updater.run_bulk_update(dry_run=not args.live)
256
257 except Exception as e:
258 console.print(f"ā Error: {e}", style="red")
259 return 1
260
261 return 0
262
263if __name__ == "__main__":
264 exit(main())
265
Github
Automation specialist and technical communications professional bridging AI systems, workflow orchestration, and strategic communications for enhanced business performance.
Learn more about Daniel